fix(storage): free buffers in Bidi Reader (#12839) · googleapis/google-cloud-go@bc247fd

@@ -29,6 +29,7 @@ import (

2929

"slices"

3030

"strconv"

3131

"strings"

32+

"sync"

3233

"testing"

3334

"time"

3435

@@ -43,6 +44,7 @@ import (

4344

"google.golang.org/api/option"

4445

"google.golang.org/grpc"

4546

"google.golang.org/grpc/codes"

47+

expgrpc "google.golang.org/grpc/experimental"

4648

"google.golang.org/grpc/mem"

4749

"google.golang.org/grpc/status"

4850

"google.golang.org/protobuf/proto"

@@ -3004,6 +3006,165 @@ func TestWriterChunkRetryDeadlineEmulated(t *testing.T) {

30043006

})

30053007

}

300630083009+

// Used to test gRPC buffer pool allocs and frees.

3010+

// See https://pkg.go.dev/google.golang.org/grpc/mem

3011+

type testBufferPool struct {

3012+

allocs int64

3013+

frees int64

3014+

sync.Mutex // mutex needed becuase Get/Put can be called in parallel.

3015+

}

3016+3017+

func (bp *testBufferPool) Get(length int) *[]byte {

3018+

bp.Lock()

3019+

bp.allocs += int64(length)

3020+

bp.Unlock()

3021+

return mem.DefaultBufferPool().Get(length)

3022+

}

3023+3024+

func (bp *testBufferPool) Put(b *[]byte) {

3025+

if b != nil {

3026+

bp.Lock()

3027+

bp.frees += int64(len(*b))

3028+

bp.Unlock()

3029+

}

3030+

mem.DefaultBufferPool().Put(b)

3031+

}

3032+3033+

func (bp *testBufferPool) getAllocsAndFrees() (int64, int64) {

3034+

bp.Lock()

3035+

defer bp.Unlock()

3036+

return bp.allocs, bp.frees

3037+

}

3038+3039+

// Test that successful downloads using Reader and MultiRangeDownloader free

3040+

// all of their allocated buffers.

3041+

func TestReadCodecLeaksEmulated(t *testing.T) {

3042+

checkEmulatorEnvironment(t)

3043+

ctx := context.Background()

3044+

var bp testBufferPool

3045+

client, err := NewGRPCClient(ctx, option.WithGRPCDialOption(expgrpc.WithBufferPool(&bp)), experimental.WithZonalBucketAPIs())

3046+

if err != nil {

3047+

t.Fatalf("NewGRPCClient: %v", err)

3048+

}

3049+

var (

3050+

contents = randomBytes9MiB

3051+

prefix = time.Now().Nanosecond()

3052+

bucketName = fmt.Sprintf("bucket-%d", prefix)

3053+

objName = fmt.Sprintf("%d-object", prefix)

3054+

bkt = client.Bucket(bucketName)

3055+

obj = bkt.Object(objName)

3056+

)

3057+3058+

// Upload object.

3059+

if err := bkt.Create(ctx, "project", nil); err != nil {

3060+

t.Fatalf("creating bucket: %v", err)

3061+

}

3062+

w := obj.NewWriter(ctx)

3063+

if _, err := io.Copy(w, bytes.NewReader(contents)); err != nil {

3064+

t.Fatalf("uploading object: %v", err)

3065+

}

3066+

if err := w.Close(); err != nil {

3067+

t.Fatalf("closing writer: %v", err)

3068+

}

3069+

if bp.allocs != bp.frees {

3070+

t.Errorf("upload: alloc'd bytes %v not equal to freed bytes %v", bp.allocs, bp.frees)

3071+

}

3072+3073+

// Test multiple download paths.

3074+

testCases := []struct {

3075+

name string

3076+

downloadFunc func(obj *ObjectHandle) ([]byte, error)

3077+

}{

3078+

{

3079+

name: "Reader.Read",

3080+

downloadFunc: func(obj *ObjectHandle) ([]byte, error) {

3081+

r, err := obj.NewReader(ctx)

3082+

defer r.Close()

3083+

if err != nil {

3084+

return nil, err

3085+

}

3086+

gotContents, err := io.ReadAll(r)

3087+

return gotContents, err

3088+

},

3089+

},

3090+

{

3091+

name: "Reader.WriteTo",

3092+

downloadFunc: func(obj *ObjectHandle) ([]byte, error) {

3093+

r, err := obj.NewReader(ctx)

3094+

defer r.Close()

3095+

if err != nil {

3096+

return nil, err

3097+

}

3098+

buf := bytes.NewBuffer([]byte{})

3099+

_, err = r.WriteTo(buf)

3100+

return buf.Bytes(), err

3101+

},

3102+

},

3103+

{

3104+

name: "MultiRangeDownloader 3MiB ranges",

3105+

downloadFunc: func(obj *ObjectHandle) ([]byte, error) {

3106+

mrd, err := obj.NewMultiRangeDownloader(ctx)

3107+

var bufs []*bytes.Buffer

3108+

var currOff int64

3109+

var increment int64 = 3 * MiB

3110+

for range 3 {

3111+

buf := bytes.NewBuffer([]byte{})

3112+

mrd.Add(buf, currOff, increment, func(int64, int64, error) {})

3113+

bufs = append(bufs, buf)

3114+

currOff += increment

3115+

}

3116+

mrd.Wait()

3117+

if err := mrd.Close(); err != nil {

3118+

return nil, err

3119+

}

3120+

var b []byte

3121+

for _, buf := range bufs {

3122+

b = append(b, buf.Bytes()...)

3123+

}

3124+

return b, err

3125+

}},

3126+

{

3127+

name: "MultiRangeDownloader 256k ranges",

3128+

downloadFunc: func(obj *ObjectHandle) ([]byte, error) {

3129+

mrd, err := obj.NewMultiRangeDownloader(ctx)

3130+

var bufs []*bytes.Buffer

3131+

var currOff int64

3132+

var increment int64 = 256 * 1024

3133+

for range 36 {

3134+

buf := bytes.NewBuffer([]byte{})

3135+

mrd.Add(buf, currOff, increment, func(int64, int64, error) {})

3136+

bufs = append(bufs, buf)

3137+

currOff += increment

3138+

}

3139+

mrd.Wait()

3140+

if err := mrd.Close(); err != nil {

3141+

return nil, err

3142+

}

3143+

var b []byte

3144+

for _, buf := range bufs {

3145+

b = append(b, buf.Bytes()...)

3146+

}

3147+

return b, err

3148+

}},

3149+

}

3150+3151+

for _, tc := range testCases {

3152+

t.Run(tc.name, func(t *testing.T) {

3153+

gotContents, err := tc.downloadFunc(obj)

3154+

if err != nil {

3155+

t.Fatalf("downloading content: %v", err)

3156+

}

3157+

if !bytes.Equal(gotContents, contents) {

3158+

t.Errorf("downloaded bytes did not match; got %v bytes, want %v", len(gotContents), len(contents))

3159+

}

3160+

allocs, frees := bp.getAllocsAndFrees()

3161+

if allocs != frees {

3162+

t.Errorf("download: alloc'd bytes %v not equal to freed bytes %v", allocs, frees)

3163+

}

3164+

})

3165+

}

3166+

}

3167+30073168

// createRetryTest creates a bucket in the emulator and sets up a test using the

30083169

// Retry Test API for the given instructions. This is intended for emulator tests

30093170

// of retry behavior that are not covered by conformance tests.