fix(storage): free buffers in Bidi Reader (#12839) · googleapis/google-cloud-go@bc247fd
@@ -29,6 +29,7 @@ import (
2929"slices"
3030"strconv"
3131"strings"
32+"sync"
3233"testing"
3334"time"
3435@@ -43,6 +44,7 @@ import (
4344"google.golang.org/api/option"
4445"google.golang.org/grpc"
4546"google.golang.org/grpc/codes"
47+ expgrpc "google.golang.org/grpc/experimental"
4648"google.golang.org/grpc/mem"
4749"google.golang.org/grpc/status"
4850"google.golang.org/protobuf/proto"
@@ -3004,6 +3006,165 @@ func TestWriterChunkRetryDeadlineEmulated(t *testing.T) {
30043006 })
30053007}
300630083009+// Used to test gRPC buffer pool allocs and frees.
3010+// See https://pkg.go.dev/google.golang.org/grpc/mem
3011+type testBufferPool struct {
3012+allocs int64
3013+frees int64
3014+ sync.Mutex // mutex needed becuase Get/Put can be called in parallel.
3015+}
3016+3017+func (bp *testBufferPool) Get(length int) *[]byte {
3018+bp.Lock()
3019+bp.allocs += int64(length)
3020+bp.Unlock()
3021+return mem.DefaultBufferPool().Get(length)
3022+}
3023+3024+func (bp *testBufferPool) Put(b *[]byte) {
3025+if b != nil {
3026+bp.Lock()
3027+bp.frees += int64(len(*b))
3028+bp.Unlock()
3029+ }
3030+mem.DefaultBufferPool().Put(b)
3031+}
3032+3033+func (bp *testBufferPool) getAllocsAndFrees() (int64, int64) {
3034+bp.Lock()
3035+defer bp.Unlock()
3036+return bp.allocs, bp.frees
3037+}
3038+3039+// Test that successful downloads using Reader and MultiRangeDownloader free
3040+// all of their allocated buffers.
3041+func TestReadCodecLeaksEmulated(t *testing.T) {
3042+checkEmulatorEnvironment(t)
3043+ctx := context.Background()
3044+var bp testBufferPool
3045+client, err := NewGRPCClient(ctx, option.WithGRPCDialOption(expgrpc.WithBufferPool(&bp)), experimental.WithZonalBucketAPIs())
3046+if err != nil {
3047+t.Fatalf("NewGRPCClient: %v", err)
3048+ }
3049+var (
3050+contents = randomBytes9MiB
3051+prefix = time.Now().Nanosecond()
3052+bucketName = fmt.Sprintf("bucket-%d", prefix)
3053+objName = fmt.Sprintf("%d-object", prefix)
3054+bkt = client.Bucket(bucketName)
3055+obj = bkt.Object(objName)
3056+ )
3057+3058+// Upload object.
3059+if err := bkt.Create(ctx, "project", nil); err != nil {
3060+t.Fatalf("creating bucket: %v", err)
3061+ }
3062+w := obj.NewWriter(ctx)
3063+if _, err := io.Copy(w, bytes.NewReader(contents)); err != nil {
3064+t.Fatalf("uploading object: %v", err)
3065+ }
3066+if err := w.Close(); err != nil {
3067+t.Fatalf("closing writer: %v", err)
3068+ }
3069+if bp.allocs != bp.frees {
3070+t.Errorf("upload: alloc'd bytes %v not equal to freed bytes %v", bp.allocs, bp.frees)
3071+ }
3072+3073+// Test multiple download paths.
3074+testCases := []struct {
3075+name string
3076+downloadFunc func(obj *ObjectHandle) ([]byte, error)
3077+ }{
3078+ {
3079+name: "Reader.Read",
3080+downloadFunc: func(obj *ObjectHandle) ([]byte, error) {
3081+r, err := obj.NewReader(ctx)
3082+defer r.Close()
3083+if err != nil {
3084+return nil, err
3085+ }
3086+gotContents, err := io.ReadAll(r)
3087+return gotContents, err
3088+ },
3089+ },
3090+ {
3091+name: "Reader.WriteTo",
3092+downloadFunc: func(obj *ObjectHandle) ([]byte, error) {
3093+r, err := obj.NewReader(ctx)
3094+defer r.Close()
3095+if err != nil {
3096+return nil, err
3097+ }
3098+buf := bytes.NewBuffer([]byte{})
3099+_, err = r.WriteTo(buf)
3100+return buf.Bytes(), err
3101+ },
3102+ },
3103+ {
3104+name: "MultiRangeDownloader 3MiB ranges",
3105+downloadFunc: func(obj *ObjectHandle) ([]byte, error) {
3106+mrd, err := obj.NewMultiRangeDownloader(ctx)
3107+var bufs []*bytes.Buffer
3108+var currOff int64
3109+var increment int64 = 3 * MiB
3110+for range 3 {
3111+buf := bytes.NewBuffer([]byte{})
3112+mrd.Add(buf, currOff, increment, func(int64, int64, error) {})
3113+bufs = append(bufs, buf)
3114+currOff += increment
3115+ }
3116+mrd.Wait()
3117+if err := mrd.Close(); err != nil {
3118+return nil, err
3119+ }
3120+var b []byte
3121+for _, buf := range bufs {
3122+b = append(b, buf.Bytes()...)
3123+ }
3124+return b, err
3125+ }},
3126+ {
3127+name: "MultiRangeDownloader 256k ranges",
3128+downloadFunc: func(obj *ObjectHandle) ([]byte, error) {
3129+mrd, err := obj.NewMultiRangeDownloader(ctx)
3130+var bufs []*bytes.Buffer
3131+var currOff int64
3132+var increment int64 = 256 * 1024
3133+for range 36 {
3134+buf := bytes.NewBuffer([]byte{})
3135+mrd.Add(buf, currOff, increment, func(int64, int64, error) {})
3136+bufs = append(bufs, buf)
3137+currOff += increment
3138+ }
3139+mrd.Wait()
3140+if err := mrd.Close(); err != nil {
3141+return nil, err
3142+ }
3143+var b []byte
3144+for _, buf := range bufs {
3145+b = append(b, buf.Bytes()...)
3146+ }
3147+return b, err
3148+ }},
3149+ }
3150+3151+for _, tc := range testCases {
3152+t.Run(tc.name, func(t *testing.T) {
3153+gotContents, err := tc.downloadFunc(obj)
3154+if err != nil {
3155+t.Fatalf("downloading content: %v", err)
3156+ }
3157+if !bytes.Equal(gotContents, contents) {
3158+t.Errorf("downloaded bytes did not match; got %v bytes, want %v", len(gotContents), len(contents))
3159+ }
3160+allocs, frees := bp.getAllocsAndFrees()
3161+if allocs != frees {
3162+t.Errorf("download: alloc'd bytes %v not equal to freed bytes %v", allocs, frees)
3163+ }
3164+ })
3165+ }
3166+}
3167+30073168// createRetryTest creates a bucket in the emulator and sets up a test using the
30083169// Retry Test API for the given instructions. This is intended for emulator tests
30093170// of retry behavior that are not covered by conformance tests.