feat(storage): implement RetryChunkDeadline for grpc writes (#11476) · googleapis/google-cloud-go@03575d7
@@ -17,11 +17,13 @@ package storage
1717import (
1818"bytes"
1919"context"
20+"encoding/json"
2021"errors"
2122"fmt"
2223"io"
2324"log"
2425"math/rand"
26+"net/http"
2527"net/url"
2628"os"
2729"strconv"
@@ -1859,7 +1861,7 @@ func TestRetryMaxAttemptsEmulated(t *testing.T) {
18591861instructions := map[string][]string{"storage.buckets.get": {"return-503", "return-503", "return-503", "return-503", "return-503"}}
18601862testID := createRetryTest(t, client, instructions)
18611863ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
1862-config := &retryConfig{maxAttempts: expectedAttempts(3), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
1864+config := &retryConfig{maxAttempts: intPointer(3), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
18631865_, err = client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config))
1864186618651867var ae *apierror.APIError
@@ -1910,7 +1912,7 @@ func TestRetryDeadlineExceededEmulated(t *testing.T) {
19101912instructions := map[string][]string{"storage.buckets.get": {"return-504", "return-504"}}
19111913testID := createRetryTest(t, client, instructions)
19121914ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
1913-config := &retryConfig{maxAttempts: expectedAttempts(4), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
1915+config := &retryConfig{maxAttempts: intPointer(4), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
19141916if _, err := client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config)); err != nil {
19151917t.Fatalf("GetBucket: got unexpected error %v, want nil", err)
19161918 }
@@ -2108,6 +2110,77 @@ func TestWriterChunkTransferTimeoutEmulated(t *testing.T) {
21082110 })
21092111}
211021122113+func TestWriterChunkRetryDeadlineEmulated(t *testing.T) {
2114+transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
2115+const (
2116+// Resumable upload with smallest chunksize.
2117+chunkSize = 256 * 1024
2118+fileSize = 600 * 1024
2119+// A small value for testing, but large enough that we do encounter the error.
2120+retryDeadline = time.Second
2121+errCode = 503
2122+ )
2123+2124+_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
2125+if err != nil {
2126+t.Fatalf("creating bucket: %v", err)
2127+ }
2128+2129+// Populate instructions with a lot of errors so it will take a long time
2130+// to suceed. Error only after the first chunk has been sent, as the
2131+// retry deadline does not apply to the first chunk.
2132+manyErrs := []string{fmt.Sprintf("return-%d-after-%dK", errCode, 257)}
2133+for i := 0; i < 20; i++ {
2134+manyErrs = append(manyErrs, fmt.Sprintf("return-%d", errCode))
2135+2136+ }
2137+instructions := map[string][]string{"storage.objects.insert": manyErrs}
2138+testID := createRetryTest(t, client, instructions)
2139+2140+var cancel context.CancelFunc
2141+ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
2142+ctx, cancel = context.WithTimeout(ctx, 5*time.Second)
2143+defer cancel()
2144+2145+params := &openWriterParams{
2146+attrs: &ObjectAttrs{
2147+Bucket: bucket,
2148+Name: fmt.Sprintf("object-%d", time.Now().Nanosecond()),
2149+Generation: defaultGen,
2150+ },
2151+bucket: bucket,
2152+chunkSize: chunkSize,
2153+chunkRetryDeadline: retryDeadline,
2154+ctx: ctx,
2155+donec: make(chan struct{}),
2156+setError: func(_ error) {}, // no-op
2157+progress: func(_ int64) {}, // no-op
2158+setObj: func(_ *ObjectAttrs) {},
2159+ }
2160+2161+pw, err := client.OpenWriter(params, &idempotentOption{true})
2162+if err != nil {
2163+t.Fatalf("failed to open writer: %v", err)
2164+ }
2165+buffer := bytes.Repeat([]byte("A"), fileSize)
2166+_, err = pw.Write(buffer)
2167+defer pw.Close()
2168+if !errorIsStatusCode(err, errCode, codes.Unavailable) {
2169+t.Errorf("expected err with status %d, got err: %v", errCode, err)
2170+ }
2171+2172+// Make sure there was more than one attempt.
2173+got, err := numInstructionsLeft(testID, "storage.objects.insert")
2174+if err != nil {
2175+t.Errorf("getting emulator instructions: %v", err)
2176+ }
2177+2178+if got >= len(manyErrs)-1 {
2179+t.Errorf("not enough attempts - the request may not have been retried; got %d instructions left, expected at most %d", got, len(manyErrs)-2)
2180+ }
2181+ })
2182+}
2183+21112184// createRetryTest creates a bucket in the emulator and sets up a test using the
21122185// Retry Test API for the given instructions. This is intended for emulator tests
21132186// of retry behavior that are not covered by conformance tests.
@@ -2136,6 +2209,37 @@ func createRetryTest(t *testing.T, client storageClient, instructions map[string
21362209return et.id
21372210}
213822112212+// Gets the number of unused instructions matching the method.
2213+func numInstructionsLeft(emulatorTestID, method string) (int, error) {
2214+host := os.Getenv("STORAGE_EMULATOR_HOST")
2215+endpoint, err := url.Parse(host)
2216+if err != nil {
2217+return 0, fmt.Errorf("parsing endpoint: %v", err)
2218+ }
2219+2220+endpoint.Path = strings.Join([]string{"retry_test", emulatorTestID}, "/")
2221+c := http.DefaultClient
2222+resp, err := c.Get(endpoint.String())
2223+if err != nil || resp.StatusCode != 200 {
2224+return 0, fmt.Errorf("getting retry test: err: %v, resp: %+v", err, resp)
2225+ }
2226+defer func() {
2227+closeErr := resp.Body.Close()
2228+if err == nil {
2229+err = closeErr
2230+ }
2231+ }()
2232+testRes := struct {
2233+Instructions map[string][]string
2234+Completed bool
2235+ }{}
2236+if err := json.NewDecoder(resp.Body).Decode(&testRes); err != nil {
2237+return 0, fmt.Errorf("decoding response: %v", err)
2238+ }
2239+// Subtract one because the testbench is off by one (see storage-testbench/issues/707).
2240+return len(testRes.Instructions[method]) - 1, nil
2241+}
2242+21392243// createObject creates an object in the emulator with content randomBytesToWrite and
21402244// returns its name, generation, and metageneration.
21412245func createObject(ctx context.Context, bucket string) (string, int64, int64, error) {