feat(storage): implement RetryChunkDeadline for grpc writes (#11476) · googleapis/google-cloud-go@03575d7

@@ -17,11 +17,13 @@ package storage

1717

import (

1818

"bytes"

1919

"context"

20+

"encoding/json"

2021

"errors"

2122

"fmt"

2223

"io"

2324

"log"

2425

"math/rand"

26+

"net/http"

2527

"net/url"

2628

"os"

2729

"strconv"

@@ -1859,7 +1861,7 @@ func TestRetryMaxAttemptsEmulated(t *testing.T) {

18591861

instructions := map[string][]string{"storage.buckets.get": {"return-503", "return-503", "return-503", "return-503", "return-503"}}

18601862

testID := createRetryTest(t, client, instructions)

18611863

ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)

1862-

config := &retryConfig{maxAttempts: expectedAttempts(3), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}

1864+

config := &retryConfig{maxAttempts: intPointer(3), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}

18631865

_, err = client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config))

1864186618651867

var ae *apierror.APIError

@@ -1910,7 +1912,7 @@ func TestRetryDeadlineExceededEmulated(t *testing.T) {

19101912

instructions := map[string][]string{"storage.buckets.get": {"return-504", "return-504"}}

19111913

testID := createRetryTest(t, client, instructions)

19121914

ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)

1913-

config := &retryConfig{maxAttempts: expectedAttempts(4), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}

1915+

config := &retryConfig{maxAttempts: intPointer(4), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}

19141916

if _, err := client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config)); err != nil {

19151917

t.Fatalf("GetBucket: got unexpected error %v, want nil", err)

19161918

}

@@ -2108,6 +2110,77 @@ func TestWriterChunkTransferTimeoutEmulated(t *testing.T) {

21082110

})

21092111

}

211021122113+

func TestWriterChunkRetryDeadlineEmulated(t *testing.T) {

2114+

transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {

2115+

const (

2116+

// Resumable upload with smallest chunksize.

2117+

chunkSize = 256 * 1024

2118+

fileSize = 600 * 1024

2119+

// A small value for testing, but large enough that we do encounter the error.

2120+

retryDeadline = time.Second

2121+

errCode = 503

2122+

)

2123+2124+

_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)

2125+

if err != nil {

2126+

t.Fatalf("creating bucket: %v", err)

2127+

}

2128+2129+

// Populate instructions with a lot of errors so it will take a long time

2130+

// to suceed. Error only after the first chunk has been sent, as the

2131+

// retry deadline does not apply to the first chunk.

2132+

manyErrs := []string{fmt.Sprintf("return-%d-after-%dK", errCode, 257)}

2133+

for i := 0; i < 20; i++ {

2134+

manyErrs = append(manyErrs, fmt.Sprintf("return-%d", errCode))

2135+2136+

}

2137+

instructions := map[string][]string{"storage.objects.insert": manyErrs}

2138+

testID := createRetryTest(t, client, instructions)

2139+2140+

var cancel context.CancelFunc

2141+

ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)

2142+

ctx, cancel = context.WithTimeout(ctx, 5*time.Second)

2143+

defer cancel()

2144+2145+

params := &openWriterParams{

2146+

attrs: &ObjectAttrs{

2147+

Bucket: bucket,

2148+

Name: fmt.Sprintf("object-%d", time.Now().Nanosecond()),

2149+

Generation: defaultGen,

2150+

},

2151+

bucket: bucket,

2152+

chunkSize: chunkSize,

2153+

chunkRetryDeadline: retryDeadline,

2154+

ctx: ctx,

2155+

donec: make(chan struct{}),

2156+

setError: func(_ error) {}, // no-op

2157+

progress: func(_ int64) {}, // no-op

2158+

setObj: func(_ *ObjectAttrs) {},

2159+

}

2160+2161+

pw, err := client.OpenWriter(params, &idempotentOption{true})

2162+

if err != nil {

2163+

t.Fatalf("failed to open writer: %v", err)

2164+

}

2165+

buffer := bytes.Repeat([]byte("A"), fileSize)

2166+

_, err = pw.Write(buffer)

2167+

defer pw.Close()

2168+

if !errorIsStatusCode(err, errCode, codes.Unavailable) {

2169+

t.Errorf("expected err with status %d, got err: %v", errCode, err)

2170+

}

2171+2172+

// Make sure there was more than one attempt.

2173+

got, err := numInstructionsLeft(testID, "storage.objects.insert")

2174+

if err != nil {

2175+

t.Errorf("getting emulator instructions: %v", err)

2176+

}

2177+2178+

if got >= len(manyErrs)-1 {

2179+

t.Errorf("not enough attempts - the request may not have been retried; got %d instructions left, expected at most %d", got, len(manyErrs)-2)

2180+

}

2181+

})

2182+

}

2183+21112184

// createRetryTest creates a bucket in the emulator and sets up a test using the

21122185

// Retry Test API for the given instructions. This is intended for emulator tests

21132186

// of retry behavior that are not covered by conformance tests.

@@ -2136,6 +2209,37 @@ func createRetryTest(t *testing.T, client storageClient, instructions map[string

21362209

return et.id

21372210

}

213822112212+

// Gets the number of unused instructions matching the method.

2213+

func numInstructionsLeft(emulatorTestID, method string) (int, error) {

2214+

host := os.Getenv("STORAGE_EMULATOR_HOST")

2215+

endpoint, err := url.Parse(host)

2216+

if err != nil {

2217+

return 0, fmt.Errorf("parsing endpoint: %v", err)

2218+

}

2219+2220+

endpoint.Path = strings.Join([]string{"retry_test", emulatorTestID}, "/")

2221+

c := http.DefaultClient

2222+

resp, err := c.Get(endpoint.String())

2223+

if err != nil || resp.StatusCode != 200 {

2224+

return 0, fmt.Errorf("getting retry test: err: %v, resp: %+v", err, resp)

2225+

}

2226+

defer func() {

2227+

closeErr := resp.Body.Close()

2228+

if err == nil {

2229+

err = closeErr

2230+

}

2231+

}()

2232+

testRes := struct {

2233+

Instructions map[string][]string

2234+

Completed bool

2235+

}{}

2236+

if err := json.NewDecoder(resp.Body).Decode(&testRes); err != nil {

2237+

return 0, fmt.Errorf("decoding response: %v", err)

2238+

}

2239+

// Subtract one because the testbench is off by one (see storage-testbench/issues/707).

2240+

return len(testRes.Instructions[method]) - 1, nil

2241+

}

2242+21392243

// createObject creates an object in the emulator with content randomBytesToWrite and

21402244

// returns its name, generation, and metageneration.

21412245

func createObject(ctx context.Context, bucket string) (string, int64, int64, error) {