fix(storage): remove object length limit for unfinalized reads (#12489) · googleapis/google-cloud-go@5566d7d

@@ -34,13 +34,18 @@ import (

34343535

"cloud.google.com/go/iam/apiv1/iampb"

3636

"cloud.google.com/go/storage/experimental"

37+

"cloud.google.com/go/storage/internal/apiv2/storagepb"

3738

"github.com/google/go-cmp/cmp"

3839

"github.com/googleapis/gax-go/v2"

3940

"github.com/googleapis/gax-go/v2/apierror"

4041

"github.com/googleapis/gax-go/v2/callctx"

4142

"google.golang.org/api/iterator"

43+

"google.golang.org/api/option"

44+

"google.golang.org/grpc"

4245

"google.golang.org/grpc/codes"

46+

"google.golang.org/grpc/mem"

4347

"google.golang.org/grpc/status"

48+

"google.golang.org/protobuf/proto"

4449

)

45504651

var emulatorClients map[string]storageClient

@@ -1443,6 +1448,184 @@ func TestWriterSmallFlushEmulated(t *testing.T) {

14431448

})

14441449

}

144514501451+

// customObjSizeReadStream intercepts BidiReadObjectResponse messages and

1452+

// changes the object size in the BidiReadObjectResponse.Metadata to

1453+

// customRecvSize.

1454+

type customObjSizeReadStream struct {

1455+

grpc.ClientStream

1456+

hasReceivedMsg bool

1457+

customRecvSize int64

1458+

}

1459+1460+

func (s *customObjSizeReadStream) RecvMsg(m any) error {

1461+

// The first message should contain the object info. After that, we can

1462+

// receive and parse messages as usual.

1463+

if s.hasReceivedMsg {

1464+

return s.ClientStream.RecvMsg(m)

1465+

}

1466+

s.hasReceivedMsg = true

1467+1468+

// Unmarshal the message to get easy access to the metadata to modify.

1469+

// We then modify the msg, marshal it again, and convert back to mem.BufferSlice.

1470+

// This is an easy way for us to intercept and change the message.

1471+

err := s.ClientStream.RecvMsg(m)

1472+1473+

databufs, ok := m.(*mem.BufferSlice)

1474+

if !ok {

1475+

return errors.New("unable to cast received message to mem.BufferSlice")

1476+

}

1477+1478+

var bidiResp storagepb.BidiReadObjectResponse

1479+

if uErr := proto.Unmarshal(databufs.Materialize(), &bidiResp); uErr != nil {

1480+

return fmt.Errorf("failed to unmarshal BidiReadObjectResponse: %w", uErr)

1481+

}

1482+1483+

if bidiResp.Metadata == nil {

1484+

return fmt.Errorf("nil metadata in the first message")

1485+

}

1486+1487+

// Modify the message.

1488+

bidiResp.Metadata.Size = s.customRecvSize

1489+1490+

// Marshal the modified message.

1491+

marshalled, mErr := proto.Marshal(&bidiResp)

1492+

if mErr != nil {

1493+

return fmt.Errorf("failed to marshal modified BidiReadObjectResponse: %w", mErr)

1494+

}

1495+1496+

// Set the message on the pointer to be decoded by the normal read flow.

1497+

updatedMsg := mem.BufferSlice{mem.SliceBuffer(marshalled)}

1498+

if ptr, ok := m.(*mem.BufferSlice); ok {

1499+

*ptr = updatedMsg

1500+

}

1501+1502+

return err

1503+

}

1504+1505+

func TestNewRangeReaderUnfinalizedEmulated(t *testing.T) {

1506+

checkEmulatorEnvironment(t)

1507+1508+

ctx := context.Background()

1509+

var receivedObjectSize int64 = 10

1510+1511+

streamInterceptor := grpc.WithStreamInterceptor(

1512+

func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {

1513+

clientStream, err := streamer(ctx, desc, cc, method, opts...)

1514+1515+

if method == "/google.storage.v2.Storage/BidiReadObject" {

1516+

// Intercept the message and set the object size.

1517+

clientStream = &customObjSizeReadStream{ClientStream: clientStream, customRecvSize: receivedObjectSize}

1518+

}

1519+

return clientStream, err

1520+

})

1521+1522+

client, err := NewGRPCClient(ctx, option.WithGRPCDialOption(streamInterceptor), experimental.WithGRPCBidiReads())

1523+

if err != nil {

1524+

t.Fatalf("NewGRPCClient: %v", err)

1525+

}

1526+1527+

var (

1528+

contents = randomBytes9MiB

1529+

prefix = time.Now().Nanosecond()

1530+

bucket = fmt.Sprintf("bucket-%d", prefix)

1531+

objName = fmt.Sprintf("%d-object", prefix)

1532+

o = client.Bucket(bucket).Object(objName)

1533+

)

1534+1535+

// Upload contents to unfinalized object.

1536+

if err := client.Bucket(bucket).Create(ctx, "project", nil); err != nil {

1537+

t.Fatalf("creating test bucket: %v", err)

1538+

}

1539+

w := o.NewWriter(ctx)

1540+

w.Append = true

1541+

if _, err = w.Write(contents); err != nil {

1542+

t.Fatalf("writing test data: got %v; want ok", err)

1543+

}

1544+

if err := w.Close(); err != nil {

1545+

t.Fatalf("closing test data writer: got %v; want ok", err)

1546+

}

1547+1548+

// Download content and validate.

1549+

buf := bytes.NewBuffer([]byte{})

1550+

r, err := o.NewRangeReader(ctx, 0, -1)

1551+

if err != nil {

1552+

t.Errorf("NewRangeReader: %v", err)

1553+

}

1554+1555+

// Copy exactly the same amount of bytes as object size. This would

1556+

// trigger a seen == size check for the next copy.

1557+

if _, err := io.CopyN(buf, r, receivedObjectSize); err != nil {

1558+

t.Fatalf("io.CopyN: %v", err)

1559+

}

1560+1561+

if _, err := io.Copy(buf, r); err != nil {

1562+

t.Fatalf("io.Copy: %v", err)

1563+

}

1564+1565+

if err := r.Close(); err != nil {

1566+

t.Errorf("r.Close: %v", err)

1567+

}

1568+1569+

if gotBytes := buf.Bytes(); !bytes.Equal(gotBytes, contents) {

1570+

t.Errorf("content mismatch: got %v bytes, want %v bytes", len(gotBytes), len(contents))

1571+

}

1572+1573+

// Download with an offset and length.

1574+

buf.Truncate(0)

1575+

off := receivedObjectSize

1576+

var length int64 = 1024 * 1024 * 3

1577+

r, err = o.NewRangeReader(ctx, off, length)

1578+

if err != nil {

1579+

t.Errorf("NewRangeReader: %v", err)

1580+

}

1581+

if _, err := io.CopyN(buf, r, receivedObjectSize); err != nil {

1582+

t.Fatalf("io.CopyN: %v", err)

1583+

}

1584+1585+

if _, err := io.Copy(buf, r); err != nil {

1586+

t.Fatalf("io.Copy: %v", err)

1587+

}

1588+

if err := r.Close(); err != nil {

1589+

t.Errorf("r.Close: %v", err)

1590+

}

1591+1592+

if got, want := buf.Bytes(), contents[off:off+length]; !bytes.Equal(got, want) {

1593+

t.Errorf("content mismatch: got %v bytes, want %v bytes", len(got), len(want))

1594+

}

1595+1596+

// Download with a negative offset.

1597+

buf.Truncate(0)

1598+

off = receivedObjectSize

1599+

r, err = o.NewRangeReader(ctx, -off, -1)

1600+

if err != nil {

1601+

t.Errorf("NewRangeReader: %v", err)

1602+

}

1603+

if _, err := io.CopyN(buf, r, receivedObjectSize); err != nil {

1604+

t.Fatalf("io.CopyN: %v", err)

1605+

}

1606+

if _, err := io.Copy(buf, r); err != nil {

1607+

t.Fatalf("io.Copy: %v", err)

1608+

}

1609+

if err := r.Close(); err != nil {

1610+

t.Errorf("r.Close: %v", err)

1611+

}

1612+1613+

// We get a smaller object size on the first call, so the offset and

1614+

// length are based on that. Ensure data integrity - we shouldn't get

1615+

// more bytes than expected, and we shouldn't get the wrong bytes.

1616+

wantOff := receivedObjectSize - off

1617+

wantEnd := receivedObjectSize

1618+1619+

// The bytes we get from the emulator are incorrect since the intercept only

1620+

// changes the obj length post receive; the request will have already requested the incorrect bytes.

1621+

if got, want := buf.Bytes(), contents[wantOff:wantEnd]; len(got) != len(want) {

1622+

t.Errorf("content mismatch: got %v bytes, want %v bytes", len(got), len(want))

1623+

}

1624+1625+

// Retries.

1626+

// metadata.remain is likely inaccurate

1627+

}

1628+14461629

func TestListNotificationsEmulated(t *testing.T) {

14471630

transportClientTest(skipGRPC("notifications not implemented"), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {

14481631

// Populate test object.