Skip to content

Commit 0e5489d

Browse files
committed
blobs: fix resource leak in GetStream reader close
Previously, closing a GetStream reader before reading all data was a no-op, leaving gRPC streams and goroutines active. This caused resource leaks and OOM errors in distributed merge operations. Now, ReadFile creates a cancelable context and passes the cancel function to the stream reader, which calls it on Close() to properly terminate the stream. Release note: none Epic: CRDB-48845
1 parent 288bc25 commit 0e5489d

File tree

3 files changed

+44
-8
lines changed

3 files changed

+44
-8
lines changed

pkg/blobs/client.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,16 @@ func (c *remoteClient) ReadFile(
6363
if err != nil {
6464
return nil, 0, err
6565
}
66+
ctx, cancel := context.WithCancel(ctx)
6667
stream, err := c.blobClient.GetStream(ctx, &blobspb.GetRequest{
6768
Filename: file,
6869
Offset: offset,
6970
})
70-
return newGetStreamReader(stream), st.Filesize, errors.Wrap(err, "fetching file")
71+
if err != nil {
72+
cancel()
73+
return nil, 0, errors.Wrap(err, "fetching file")
74+
}
75+
return newGetStreamReader(stream, cancel), st.Filesize, nil
7176
}
7277

7378
type streamWriter struct {

pkg/blobs/client_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,3 +543,32 @@ func TestBlobClientStat(t *testing.T) {
543543
})
544544
}
545545
}
546+
547+
type mockClient struct {
548+
blobspb.RPCBlobClient
549+
ctx context.Context
550+
}
551+
552+
func (m *mockClient) GetStream(
553+
ctx context.Context, in *blobspb.GetRequest,
554+
) (blobspb.RPCBlob_GetStreamClient, error) {
555+
m.ctx = ctx
556+
return nil, nil
557+
}
558+
559+
func (m *mockClient) Stat(ctx context.Context, in *blobspb.StatRequest) (*blobspb.BlobStat, error) {
560+
return &blobspb.BlobStat{}, nil
561+
}
562+
563+
// TestBlobClientReadFileClose tests that closing a reader cancels the context
564+
// passed to the rpc stream.
565+
func TestBlobClientReadFileClose(t *testing.T) {
566+
mock := &mockClient{}
567+
r := remoteClient{blobClient: mock}
568+
569+
reader, _, err := r.ReadFile(context.Background(), "doesn't matter", 0)
570+
require.NoError(t, err)
571+
require.NoError(t, reader.Close(context.Background()))
572+
573+
<-mock.ctx.Done()
574+
}

pkg/blobs/stream.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,22 +39,24 @@ type streamReceiver interface {
3939
Recv() (*blobspb.StreamChunk, error)
4040
}
4141

42-
// nopSendAndClose creates a GetStreamClient that has a nop SendAndClose function.
43-
// This is needed as Blob_GetStreamClient does not have a Close() function, whereas
44-
// the other sender, Blob_PutStreamServer, does.
45-
type nopSendAndClose struct {
42+
// closeFuncSendAndClose creates a GetStreamClient that calls a close function
43+
// when SendAndClose is called. The close function is expected to cancel the context
44+
// and cause the stream to clean itself up.
45+
type closeFuncSendAndClose struct {
4646
blobspb.RPCBlob_GetStreamClient
47+
close func()
4748
}
4849

49-
func (*nopSendAndClose) SendAndClose(*blobspb.StreamResponse) error {
50+
func (c *closeFuncSendAndClose) SendAndClose(*blobspb.StreamResponse) error {
51+
c.close()
5052
return nil
5153
}
5254

5355
// newGetStreamReader creates an io.ReadCloser that uses gRPC's streaming API
5456
// to read chunks of data.
55-
func newGetStreamReader(client blobspb.RPCBlob_GetStreamClient) ioctx.ReadCloserCtx {
57+
func newGetStreamReader(client blobspb.RPCBlob_GetStreamClient, close func()) ioctx.ReadCloserCtx {
5658
return &blobStreamReader{
57-
stream: &nopSendAndClose{client},
59+
stream: &closeFuncSendAndClose{client, close},
5860
}
5961
}
6062

0 commit comments

Comments
 (0)