Skip to content

Commit 9f369f9

Browse files
authored
fix(storage): fix redirect logic in MRD (googleapis#12733)
Allows out-of-region redirects in MultiRangeDownloader. Also switches MRD integration test to use a zonal bucket.
1 parent e876d62 commit 9f369f9

File tree

2 files changed

+43
-21
lines changed

2 files changed

+43
-21
lines changed

storage/grpc_client.go

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1100,8 +1100,6 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
11001100
ReadObjectSpec: bidiObject,
11011101
}
11021102

1103-
ctx = gax.InsertMetadataIntoOutgoingContext(ctx, contextMetadataFromBidiReadObject(req)...)
1104-
11051103
openStream := func(readHandle ReadHandle) (*bidiReadStreamResponse, context.CancelFunc, error) {
11061104
if err := applyCondsProto("grpcStorageClient.BidiReadObject", params.gen, params.conds, bidiObject); err != nil {
11071105
return nil, nil, err
@@ -1111,36 +1109,53 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
11111109
Handle: readHandle,
11121110
}
11131111
}
1112+
databufs := mem.BufferSlice{}
1113+
11141114
var stream storagepb.Storage_BidiReadObjectClient
11151115
var decoder *readResponseDecoder
11161116
cc, cancel := context.WithCancel(ctx)
11171117
err = run(cc, func(ctx context.Context) error {
1118-
stream, err = c.raw.BidiReadObject(ctx, s.gax...)
1119-
if err != nil {
1118+
openAndSendReq := func() error {
1119+
mdCtx := gax.InsertMetadataIntoOutgoingContext(ctx, contextMetadataFromBidiReadObject(req)...)
1120+
1121+
stream, err = c.raw.BidiReadObject(mdCtx, s.gax...)
1122+
if err != nil {
1123+
return err
1124+
}
1125+
// If stream opened succesfully, send first message on the stream.
1126+
// First message to stream should contain read_object_spec
1127+
err = stream.Send(req)
1128+
if err != nil {
1129+
return err
1130+
}
1131+
// Use RecvMsg to get the raw buffer slice instead of Recv().
1132+
err = stream.RecvMsg(&databufs)
1133+
if err != nil {
1134+
return err
1135+
}
1136+
return nil
1137+
}
1138+
1139+
err := openAndSendReq()
1140+
1141+
// We might get a redirect error here for an out-of-region request.
1142+
// Add the routing token and read handle to the request and do one
1143+
// retry.
1144+
if st, ok := status.FromError(err); ok && st.Code() == codes.Aborted {
11201145
// BidiReadObjectRedirectedError error is only returned on initial open in case of a redirect.
11211146
// The routing token that should be used when reopening the read stream. Needs to be exported.
1122-
rpcStatus := status.Convert(err)
1123-
details := rpcStatus.Details()
1124-
for _, detail := range details {
1147+
for _, detail := range st.Details() {
11251148
if bidiError, ok := detail.(*storagepb.BidiReadObjectRedirectedError); ok {
11261149
bidiObject.ReadHandle = bidiError.ReadHandle
11271150
bidiObject.RoutingToken = bidiError.RoutingToken
1128-
req.ReadObjectSpec = bidiObject
1129-
ctx = gax.InsertMetadataIntoOutgoingContext(ctx, contextMetadataFromBidiReadObject(req)...)
1151+
databufs = mem.BufferSlice{}
1152+
err = openAndSendReq()
1153+
break
11301154
}
11311155
}
1132-
return err
11331156
}
1134-
// Incase stream opened succesfully, send first message on the stream.
1135-
// First message to stream should contain read_object_spec
1136-
err = stream.Send(req)
1137-
if err != nil {
1138-
return err
1139-
}
1140-
// Use RecvMsg to get the raw buffer slice instead of Recv().
1141-
databufs := mem.BufferSlice{}
1142-
err = stream.RecvMsg(&databufs)
11431157
if err != nil {
1158+
databufs.Free()
11441159
return err
11451160
}
11461161

storage/integration_test.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -351,11 +351,18 @@ var readCases = []readCase{
351351
}
352352

353353
func TestIntegration_MultiRangeDownloader(t *testing.T) {
354-
multiTransportTest(skipAllButBidi(context.Background(), "Bidi Read API test"), t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) {
354+
multiTransportTest(skipAllButBidi(context.Background(), "Bidi Read API test"), t, func(t *testing.T, ctx context.Context, string, prefix string, client *Client) {
355355
content := make([]byte, 5<<20)
356356
rand.New(rand.NewSource(0)).Read(content)
357357
objName := "MultiRangeDownloader"
358358

359+
h := testHelper{t}
360+
bucket := prefix + uidSpace.New()
361+
bkt := client.Bucket(bucket)
362+
363+
h.mustCreateZonalBucket(bkt, testutil.ProjID())
364+
defer h.mustDeleteBucket(bkt)
365+
359366
// Upload test data.
360367
obj := client.Bucket(bucket).Object(objName)
361368
if err := writeObject(ctx, obj, "text/plain", content); err != nil {
@@ -412,7 +419,7 @@ func TestIntegration_MultiRangeDownloader(t *testing.T) {
412419
if err = reader.Close(); err != nil {
413420
t.Fatalf("Error while closing reader %v", err)
414421
}
415-
})
422+
}, experimental.WithZonalBucketAPIs())
416423
}
417424

418425
// Test many concurrent reads on the same MultiRangeDownloader to try to detect

0 commit comments

Comments
 (0)