fix(storage): remove object length limit for unfinalized reads (#12489) · googleapis/google-cloud-go@5566d7d
@@ -34,13 +34,18 @@ import (
34343535"cloud.google.com/go/iam/apiv1/iampb"
3636"cloud.google.com/go/storage/experimental"
37+"cloud.google.com/go/storage/internal/apiv2/storagepb"
3738"github.com/google/go-cmp/cmp"
3839"github.com/googleapis/gax-go/v2"
3940"github.com/googleapis/gax-go/v2/apierror"
4041"github.com/googleapis/gax-go/v2/callctx"
4142"google.golang.org/api/iterator"
43+"google.golang.org/api/option"
44+"google.golang.org/grpc"
4245"google.golang.org/grpc/codes"
46+"google.golang.org/grpc/mem"
4347"google.golang.org/grpc/status"
48+"google.golang.org/protobuf/proto"
4449)
45504651var emulatorClients map[string]storageClient
@@ -1443,6 +1448,184 @@ func TestWriterSmallFlushEmulated(t *testing.T) {
14431448 })
14441449}
144514501451+// customObjSizeReadStream intercepts BidiReadObjectResponse messages and
1452+// changes the object size in the BidiReadObjectResponse.Metadata to
1453+// customRecvSize.
1454+type customObjSizeReadStream struct {
1455+ grpc.ClientStream
1456+hasReceivedMsg bool
1457+customRecvSize int64
1458+}
1459+1460+func (s *customObjSizeReadStream) RecvMsg(m any) error {
1461+// The first message should contain the object info. After that, we can
1462+// receive and parse messages as usual.
1463+if s.hasReceivedMsg {
1464+return s.ClientStream.RecvMsg(m)
1465+ }
1466+s.hasReceivedMsg = true
1467+1468+// Unmarshal the message to get easy access to the metadata to modify.
1469+// We then modify the msg, marshal it again, and convert back to mem.BufferSlice.
1470+// This is an easy way for us to intercept and change the message.
1471+err := s.ClientStream.RecvMsg(m)
1472+1473+databufs, ok := m.(*mem.BufferSlice)
1474+if !ok {
1475+return errors.New("unable to cast received message to mem.BufferSlice")
1476+ }
1477+1478+var bidiResp storagepb.BidiReadObjectResponse
1479+if uErr := proto.Unmarshal(databufs.Materialize(), &bidiResp); uErr != nil {
1480+return fmt.Errorf("failed to unmarshal BidiReadObjectResponse: %w", uErr)
1481+ }
1482+1483+if bidiResp.Metadata == nil {
1484+return fmt.Errorf("nil metadata in the first message")
1485+ }
1486+1487+// Modify the message.
1488+bidiResp.Metadata.Size = s.customRecvSize
1489+1490+// Marshal the modified message.
1491+marshalled, mErr := proto.Marshal(&bidiResp)
1492+if mErr != nil {
1493+return fmt.Errorf("failed to marshal modified BidiReadObjectResponse: %w", mErr)
1494+ }
1495+1496+// Set the message on the pointer to be decoded by the normal read flow.
1497+updatedMsg := mem.BufferSlice{mem.SliceBuffer(marshalled)}
1498+if ptr, ok := m.(*mem.BufferSlice); ok {
1499+*ptr = updatedMsg
1500+ }
1501+1502+return err
1503+}
1504+1505+func TestNewRangeReaderUnfinalizedEmulated(t *testing.T) {
1506+checkEmulatorEnvironment(t)
1507+1508+ctx := context.Background()
1509+var receivedObjectSize int64 = 10
1510+1511+streamInterceptor := grpc.WithStreamInterceptor(
1512+func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1513+clientStream, err := streamer(ctx, desc, cc, method, opts...)
1514+1515+if method == "/google.storage.v2.Storage/BidiReadObject" {
1516+// Intercept the message and set the object size.
1517+clientStream = &customObjSizeReadStream{ClientStream: clientStream, customRecvSize: receivedObjectSize}
1518+ }
1519+return clientStream, err
1520+ })
1521+1522+client, err := NewGRPCClient(ctx, option.WithGRPCDialOption(streamInterceptor), experimental.WithGRPCBidiReads())
1523+if err != nil {
1524+t.Fatalf("NewGRPCClient: %v", err)
1525+ }
1526+1527+var (
1528+contents = randomBytes9MiB
1529+prefix = time.Now().Nanosecond()
1530+bucket = fmt.Sprintf("bucket-%d", prefix)
1531+objName = fmt.Sprintf("%d-object", prefix)
1532+o = client.Bucket(bucket).Object(objName)
1533+ )
1534+1535+// Upload contents to unfinalized object.
1536+if err := client.Bucket(bucket).Create(ctx, "project", nil); err != nil {
1537+t.Fatalf("creating test bucket: %v", err)
1538+ }
1539+w := o.NewWriter(ctx)
1540+w.Append = true
1541+if _, err = w.Write(contents); err != nil {
1542+t.Fatalf("writing test data: got %v; want ok", err)
1543+ }
1544+if err := w.Close(); err != nil {
1545+t.Fatalf("closing test data writer: got %v; want ok", err)
1546+ }
1547+1548+// Download content and validate.
1549+buf := bytes.NewBuffer([]byte{})
1550+r, err := o.NewRangeReader(ctx, 0, -1)
1551+if err != nil {
1552+t.Errorf("NewRangeReader: %v", err)
1553+ }
1554+1555+// Copy exactly the same amount of bytes as object size. This would
1556+// trigger a seen == size check for the next copy.
1557+if _, err := io.CopyN(buf, r, receivedObjectSize); err != nil {
1558+t.Fatalf("io.CopyN: %v", err)
1559+ }
1560+1561+if _, err := io.Copy(buf, r); err != nil {
1562+t.Fatalf("io.Copy: %v", err)
1563+ }
1564+1565+if err := r.Close(); err != nil {
1566+t.Errorf("r.Close: %v", err)
1567+ }
1568+1569+if gotBytes := buf.Bytes(); !bytes.Equal(gotBytes, contents) {
1570+t.Errorf("content mismatch: got %v bytes, want %v bytes", len(gotBytes), len(contents))
1571+ }
1572+1573+// Download with an offset and length.
1574+buf.Truncate(0)
1575+off := receivedObjectSize
1576+var length int64 = 1024 * 1024 * 3
1577+r, err = o.NewRangeReader(ctx, off, length)
1578+if err != nil {
1579+t.Errorf("NewRangeReader: %v", err)
1580+ }
1581+if _, err := io.CopyN(buf, r, receivedObjectSize); err != nil {
1582+t.Fatalf("io.CopyN: %v", err)
1583+ }
1584+1585+if _, err := io.Copy(buf, r); err != nil {
1586+t.Fatalf("io.Copy: %v", err)
1587+ }
1588+if err := r.Close(); err != nil {
1589+t.Errorf("r.Close: %v", err)
1590+ }
1591+1592+if got, want := buf.Bytes(), contents[off:off+length]; !bytes.Equal(got, want) {
1593+t.Errorf("content mismatch: got %v bytes, want %v bytes", len(got), len(want))
1594+ }
1595+1596+// Download with a negative offset.
1597+buf.Truncate(0)
1598+off = receivedObjectSize
1599+r, err = o.NewRangeReader(ctx, -off, -1)
1600+if err != nil {
1601+t.Errorf("NewRangeReader: %v", err)
1602+ }
1603+if _, err := io.CopyN(buf, r, receivedObjectSize); err != nil {
1604+t.Fatalf("io.CopyN: %v", err)
1605+ }
1606+if _, err := io.Copy(buf, r); err != nil {
1607+t.Fatalf("io.Copy: %v", err)
1608+ }
1609+if err := r.Close(); err != nil {
1610+t.Errorf("r.Close: %v", err)
1611+ }
1612+1613+// We get a smaller object size on the first call, so the offset and
1614+// length are based on that. Ensure data integrity - we shouldn't get
1615+// more bytes than expected, and we shouldn't get the wrong bytes.
1616+wantOff := receivedObjectSize - off
1617+wantEnd := receivedObjectSize
1618+1619+// The bytes we get from the emulator are incorrect since the intercept only
1620+// changes the obj length post receive; the request will have already requested the incorrect bytes.
1621+if got, want := buf.Bytes(), contents[wantOff:wantEnd]; len(got) != len(want) {
1622+t.Errorf("content mismatch: got %v bytes, want %v bytes", len(got), len(want))
1623+ }
1624+1625+// Retries.
1626+// metadata.remain is likely inaccurate
1627+}
1628+14461629func TestListNotificationsEmulated(t *testing.T) {
14471630transportClientTest(skipGRPC("notifications not implemented"), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
14481631// Populate test object.