Skip to content

Commit 01de598

Browse files
authored
Revert "fix(storage): fix stream termination in MRD. (googleapis#11432)" (googleapis#11778)
This reverts commit 3d4e62f. We still need to do some debugging on an integration test failure for this. Fixes googleapis#11769
1 parent 4502528 commit 01de598

File tree

2 files changed

+19
-189
lines changed

2 files changed

+19
-189
lines changed

storage/grpc_client.go

Lines changed: 19 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1185,8 +1185,6 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
11851185
done: false,
11861186
activeTask: 0,
11871187
streamRecreation: false,
1188-
endReceiver: false,
1189-
endSender: false,
11901188
}
11911189

11921190
// streamManager goroutine runs in background where we send message to gcs and process response.
@@ -1197,21 +1195,18 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
11971195
case <-rr.ctx.Done():
11981196
rr.mu.Lock()
11991197
rr.done = true
1200-
rr.endSender = true
1201-
if rr.stream != nil {
1202-
rr.stream.CloseSend()
1203-
}
12041198
rr.mu.Unlock()
12051199
return
12061200
case <-rr.managerRetry:
1207-
// We are not closing stream here as it is already closed and we are retring it.
12081201
return
12091202
case <-rr.closeManager:
12101203
rr.mu.Lock()
1211-
if rr.stream != nil {
1212-
rr.stream.CloseSend()
1204+
if len(rr.mp) != 0 {
1205+
for key := range rr.mp {
1206+
rr.mp[key].callback(rr.mp[key].offset, rr.mp[key].totalBytesWritten, fmt.Errorf("stream closed early"))
1207+
delete(rr.mp, key)
1208+
}
12131209
}
1214-
rr.endSender = true
12151210
rr.activeTask = 0
12161211
rr.mu.Unlock()
12171212
return
@@ -1260,29 +1255,11 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
12601255
for {
12611256
select {
12621257
case <-rr.ctx.Done():
1263-
rr.mu.Lock()
1264-
rr.endReceiver = true
12651258
rr.done = true
1266-
if len(rr.mp) != 0 {
1267-
drainInboundReadStream(rr.stream)
1268-
}
1269-
for key := range rr.mp {
1270-
rr.mp[key].callback(rr.mp[key].offset, rr.mp[key].totalBytesWritten, rr.ctx.Err())
1271-
delete(rr.mp, key)
1272-
}
1273-
rr.activeTask = 0
1274-
rr.mu.Unlock()
12751259
return
12761260
case <-rr.receiverRetry:
1277-
// We are not draining from stream here as it is already closed and we are retring it.
12781261
return
12791262
case <-rr.closeReceiver:
1280-
rr.mu.Lock()
1281-
if len(rr.mp) != 0 {
1282-
drainInboundReadStream(rr.stream)
1283-
}
1284-
rr.endReceiver = true
1285-
rr.mu.Unlock()
12861263
return
12871264
default:
12881265
// This function reads the data sent for a particular range request and has a callback
@@ -1292,9 +1269,7 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
12921269
rr.readHandle = resp.GetReadHandle().GetHandle()
12931270
}
12941271
if err == io.EOF {
1295-
rr.mu.Lock()
1296-
rr.endReceiver = true
1297-
rr.mu.Unlock()
1272+
err = nil
12981273
}
12991274
if err != nil {
13001275
// cancel stream and reopen the stream again.
@@ -1366,8 +1341,6 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
13661341
err = rr.retryStream(err)
13671342
if err != nil {
13681343
rr.mu.Lock()
1369-
rr.endReceiver = true
1370-
rr.endSender = true
13711344
for key := range rr.mp {
13721345
rr.mp[key].callback(rr.mp[key].offset, rr.mp[key].totalBytesWritten, err)
13731346
delete(rr.mp, key)
@@ -1377,10 +1350,6 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
13771350
rr.mu.Unlock()
13781351
rr.close()
13791352
} else {
1380-
rr.mu.Lock()
1381-
rr.endReceiver = false
1382-
rr.endSender = false
1383-
rr.mu.Unlock()
13841353
// If stream recreation happened successfully lets again start
13851354
// both the goroutine making the whole flow asynchronous again.
13861355
if thread == "receiver" {
@@ -1514,39 +1483,18 @@ func (mr *gRPCBidiReader) wait() {
15141483

15151484
// Close will notify stream manager goroutine that the reader has been closed, if it's still running.
15161485
func (mr *gRPCBidiReader) close() error {
1517-
mr.closeManager <- true
1518-
mr.closeReceiver <- true
1519-
mr.mu.Lock()
1520-
for key := range mr.mp {
1521-
mr.mp[key].callback(mr.mp[key].offset, mr.mp[key].totalBytesWritten, fmt.Errorf("stream closed early"))
1522-
delete(mr.mp, key)
1486+
if mr.cancel != nil {
1487+
mr.cancel()
15231488
}
1489+
mr.mu.Lock()
15241490
mr.done = true
15251491
mr.activeTask = 0
15261492
mr.mu.Unlock()
1527-
mr.mu.Lock()
1528-
tryClosing := !(mr.endReceiver && mr.endSender)
1529-
mr.mu.Unlock()
1530-
1531-
for tryClosing {
1532-
mr.mu.Lock()
1533-
tryClosing = !(mr.endReceiver && mr.endSender)
1534-
mr.mu.Unlock()
1535-
}
1536-
defer mr.cancel()
1493+
mr.closeReceiver <- true
1494+
mr.closeManager <- true
15371495
return nil
15381496
}
15391497

1540-
// drainInboundReadStream calls stream.Recv() repeatedly until an error is returned.
1541-
// drainInboundReadStream always returns a non-nil error. io.EOF indicates all
1542-
// messages were successfully read.
1543-
func drainInboundReadStream(stream storagepb.Storage_BidiReadObjectClient) (err error) {
1544-
for err == nil {
1545-
_, err = stream.Recv()
1546-
}
1547-
return err
1548-
}
1549-
15501498
func (mrr *gRPCBidiReader) getHandle() []byte {
15511499
return mrr.readHandle
15521500
}
@@ -1977,8 +1925,6 @@ type gRPCBidiReader struct {
19771925
objectSize int64 // always use the mutex when accessing this variable
19781926
retrier func(error, string)
19791927
streamRecreation bool // This helps us identify if stream recreation is in progress or not. If stream recreation gets called from two goroutine then this will stop second one.
1980-
endReceiver bool
1981-
endSender bool
19821928
}
19831929

19841930
// gRPCReader is used by storage.Reader if the experimental option WithGRPCBidiReads is passed.
@@ -2707,11 +2653,11 @@ func bucketContext(ctx context.Context, bucket string) context.Context {
27072653
return gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
27082654
}
27092655

2710-
// drainInboundWriteStream calls stream.Recv() repeatedly until an error is returned.
2656+
// drainInboundStream calls stream.Recv() repeatedly until an error is returned.
27112657
// It returns the last Resource received on the stream, or nil if no Resource
2712-
// was returned. drainInboundWriteStream always returns a non-nil error. io.EOF
2658+
// was returned. drainInboundStream always returns a non-nil error. io.EOF
27132659
// indicates all messages were successfully read.
2714-
func drainInboundWriteStream(stream storagepb.Storage_BidiWriteObjectClient) (object *storagepb.Object, err error) {
2660+
func drainInboundStream(stream storagepb.Storage_BidiWriteObjectClient) (object *storagepb.Object, err error) {
27152661
for err == nil {
27162662
var resp *storagepb.BidiWriteObjectResponse
27172663
resp, err = stream.Recv()
@@ -2791,7 +2737,7 @@ func (s *gRPCOneshotBidiWriteBufferSender) sendBuffer(ctx context.Context, buf [
27912737

27922738
sendErr := s.stream.Send(req)
27932739
if sendErr != nil {
2794-
obj, err = drainInboundWriteStream(s.stream)
2740+
obj, err = drainInboundStream(s.stream)
27952741
s.stream = nil
27962742
if sendErr != io.EOF {
27972743
err = sendErr
@@ -2804,7 +2750,7 @@ func (s *gRPCOneshotBidiWriteBufferSender) sendBuffer(ctx context.Context, buf [
28042750
s.stream.CloseSend()
28052751
// Oneshot uploads only read from the response stream on completion or
28062752
// failure
2807-
obj, err = drainInboundWriteStream(s.stream)
2753+
obj, err = drainInboundStream(s.stream)
28082754
s.stream = nil
28092755
if err == io.EOF {
28102756
err = nil
@@ -2916,7 +2862,7 @@ func (s *gRPCResumableBidiWriteBufferSender) sendBuffer(ctx context.Context, buf
29162862

29172863
sendErr := s.stream.Send(req)
29182864
if sendErr != nil {
2919-
obj, err = drainInboundWriteStream(s.stream)
2865+
obj, err = drainInboundStream(s.stream)
29202866
s.stream = nil
29212867
if err == io.EOF {
29222868
// This is unexpected - we got an error on Send(), but not on Recv().
@@ -2928,7 +2874,7 @@ func (s *gRPCResumableBidiWriteBufferSender) sendBuffer(ctx context.Context, buf
29282874

29292875
if finishWrite {
29302876
s.stream.CloseSend()
2931-
obj, err = drainInboundWriteStream(s.stream)
2877+
obj, err = drainInboundStream(s.stream)
29322878
s.stream = nil
29332879
if err == io.EOF {
29342880
err = nil
@@ -3025,5 +2971,6 @@ func checkCanceled(err error) error {
30252971
if status.Code(err) == codes.Canceled {
30262972
return context.Canceled
30272973
}
2974+
30282975
return err
30292976
}

storage/integration_test.go

Lines changed: 0 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -597,123 +597,6 @@ func TestIntegration_MRDWithNonRetriableError(t *testing.T) {
597597
})
598598
}
599599

600-
// Test that context cancellation correctly stops a multi range download before completion.
601-
func TestIntegration_MultiRangeDownloaderContextCancel(t *testing.T) {
602-
multiTransportTest(skipHTTP("gRPC implementation specific test"), t, func(t *testing.T, ctx context.Context, bucket, _ string, client *Client) {
603-
ctx, close := context.WithDeadline(ctx, time.Now().Add(time.Second*30))
604-
defer close()
605-
content := make([]byte, 5<<20)
606-
rand.New(rand.NewSource(0)).Read(content)
607-
objName := "mrdnonretry"
608-
// Upload test data.
609-
obj := client.Bucket(bucket).Object(objName)
610-
if err := writeObject(ctx, obj, "text/plain", content); err != nil {
611-
t.Fatal(err)
612-
}
613-
defer func() {
614-
if err := obj.Delete(ctx); err != nil {
615-
log.Printf("failed to delete test object: %v", err)
616-
}
617-
}()
618-
// Create a multi-range-reader and then cancel the context before completing the reads.
619-
readerCtx, cancel := context.WithCancel(ctx)
620-
reader, err := obj.NewMultiRangeDownloader(readerCtx)
621-
if err != nil {
622-
t.Fatalf("NewMultiRangeDownloader: %v", err)
623-
}
624-
res := make([]multiRangeDownloaderOutput, 3)
625-
callback := func(x, y int64, err error) {
626-
res[0].offset = x
627-
res[0].limit = y
628-
res[0].err = err
629-
}
630-
callback1 := func(x, y int64, err error) {
631-
res[1].offset = x
632-
res[1].limit = y
633-
res[1].err = err
634-
}
635-
callback2 := func(x, y int64, err error) {
636-
res[2].offset = x
637-
res[2].limit = y
638-
res[2].err = err
639-
}
640-
// Add one range on the reader, and then cancel the context.
641-
reader.Add(&res[0].buf, 0, int64(len(content)), callback)
642-
// As context is cancelled remaining ranges would result in context cancelled error or stream is closed errors.
643-
cancel()
644-
reader.Add(&res[1].buf, -10, 0, callback1)
645-
reader.Add(&res[2].buf, 0, 10, callback2)
646-
reader.Wait()
647-
// we can get stream is closed, can't add range error in case process is over before we add the range.
648-
expErr := fmt.Errorf("stream is closed, can't add range")
649-
for i, k := range res {
650-
// if we get nil error for any callback other than first, that should be an error.
651-
if i == 0 && k.err == nil && !bytes.Equal(content, k.buf.Bytes()) {
652-
t.Errorf("Error in read range offset %v, limit %v, got: %v; want: %v",
653-
k.offset, k.limit, len(k.buf.Bytes()), len(content))
654-
}
655-
if k.err == nil && k.err.Error() != expErr.Error() && !errors.Is(err, context.Canceled) && !(status.Code(err) == codes.Canceled) {
656-
t.Fatalf("read range %v to %v: got error %v, want nil, context.Canceled or stream is closed error", k.offset, k.limit, k.err)
657-
}
658-
}
659-
if err = reader.Close(); err != nil {
660-
t.Fatalf("Error while closing reader %v", err)
661-
}
662-
})
663-
}
664-
665-
func TestIntegration_MultiRangeDownloaderSuddenClose(t *testing.T) {
666-
multiTransportTest(skipHTTP("gRPC implementation specific test"), t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) {
667-
content := make([]byte, 5<<20)
668-
rand.New(rand.NewSource(0)).Read(content)
669-
objName := "MultiRangeDownloader"
670-
671-
// Upload test data.
672-
obj := client.Bucket(bucket).Object(objName)
673-
if err := writeObject(ctx, obj, "text/plain", content); err != nil {
674-
t.Fatal(err)
675-
}
676-
defer func() {
677-
if err := obj.Delete(ctx); err != nil {
678-
log.Printf("failed to delete test object: %v", err)
679-
}
680-
}()
681-
reader, err := obj.NewMultiRangeDownloader(ctx)
682-
if err != nil {
683-
t.Fatalf("NewMultiRangeDownloader: %v", err)
684-
}
685-
res := make([]multiRangeDownloaderOutput, 3)
686-
callback := func(x, y int64, err error) {
687-
res[0].offset = x
688-
res[0].limit = y
689-
res[0].err = err
690-
}
691-
callback1 := func(x, y int64, err error) {
692-
res[1].offset = x
693-
res[1].limit = y
694-
res[1].err = err
695-
}
696-
callback2 := func(x, y int64, err error) {
697-
res[2].offset = x
698-
res[2].limit = y
699-
res[2].err = err
700-
}
701-
// Add three ranges on the reader, and then do a sudden close.
702-
reader.Add(&res[0].buf, 0, int64(len(content)), callback)
703-
reader.Close()
704-
reader.Add(&res[1].buf, -10, 0, callback1)
705-
reader.Add(&res[2].buf, 0, 10, callback2)
706-
// we can get stream is closed, can't add range error in case process is over before we add the range.
707-
expErr := fmt.Errorf("stream is closed, can't add range")
708-
expErr2 := fmt.Errorf("stream closed early")
709-
for _, k := range res {
710-
if k.err.Error() != expErr.Error() && k.err.Error() != expErr2.Error() {
711-
t.Fatalf("read range %v to %v: got error %v, want stream closed error", k.offset, k.limit, k.err)
712-
}
713-
}
714-
})
715-
}
716-
717600
// Test in a GCE environment expected to be located in one of:
718601
// - us-west1-a, us-west1-b, us-west-c
719602
//

0 commit comments

Comments
 (0)