Skip to content

Commit 2af343b

Browse files
craig[bot]jeffswensonfqazi
committed
160627: blobs: fix resource leak in GetStream reader close r=jeffswenson a=jeffswenson Previously, closing a GetStream reader before reading all data was a no-op, leaving gRPC streams and goroutines active. This caused resource leaks and OOM errors in distributed merge operations. Now, ReadFile creates a cancelable context and passes the cancel function to the stream reader, which calls it on Close() to properly terminate the stream. Release note: none Epic: CRDB-48845 160659: workload/schemachanger: fix dependency detection for DROP COLUMN r=fqazi a=fqazi Previously, the DROP COLUMN operation in the schema changer could fail, since it detected self referential foreign key dependencies as external ones. To address this, this patch excludes them from the detection. Fixes: #160318 Fixes: #159952 Release note: None Co-authored-by: Jeff Swenson <jeffswenson@betterthannull.com> Co-authored-by: Faizan Qazi <faizan@cockroachlabs.com>
3 parents 5c0dabc + 0e5489d + 9a19eae commit 2af343b

File tree

5 files changed

+49
-12
lines changed

5 files changed

+49
-12
lines changed

pkg/blobs/client.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,16 @@ func (c *remoteClient) ReadFile(
6363
if err != nil {
6464
return nil, 0, err
6565
}
66+
ctx, cancel := context.WithCancel(ctx)
6667
stream, err := c.blobClient.GetStream(ctx, &blobspb.GetRequest{
6768
Filename: file,
6869
Offset: offset,
6970
})
70-
return newGetStreamReader(stream), st.Filesize, errors.Wrap(err, "fetching file")
71+
if err != nil {
72+
cancel()
73+
return nil, 0, errors.Wrap(err, "fetching file")
74+
}
75+
return newGetStreamReader(stream, cancel), st.Filesize, nil
7176
}
7277

7378
type streamWriter struct {

pkg/blobs/client_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,3 +543,32 @@ func TestBlobClientStat(t *testing.T) {
543543
})
544544
}
545545
}
546+
547+
type mockClient struct {
548+
blobspb.RPCBlobClient
549+
ctx context.Context
550+
}
551+
552+
func (m *mockClient) GetStream(
553+
ctx context.Context, in *blobspb.GetRequest,
554+
) (blobspb.RPCBlob_GetStreamClient, error) {
555+
m.ctx = ctx
556+
return nil, nil
557+
}
558+
559+
func (m *mockClient) Stat(ctx context.Context, in *blobspb.StatRequest) (*blobspb.BlobStat, error) {
560+
return &blobspb.BlobStat{}, nil
561+
}
562+
563+
// TestBlobClientReadFileClose tests that closing a reader cancels the context
564+
// passed to the rpc stream.
565+
func TestBlobClientReadFileClose(t *testing.T) {
566+
mock := &mockClient{}
567+
r := remoteClient{blobClient: mock}
568+
569+
reader, _, err := r.ReadFile(context.Background(), "doesn't matter", 0)
570+
require.NoError(t, err)
571+
require.NoError(t, reader.Close(context.Background()))
572+
573+
<-mock.ctx.Done()
574+
}

pkg/blobs/stream.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,22 +39,24 @@ type streamReceiver interface {
3939
Recv() (*blobspb.StreamChunk, error)
4040
}
4141

42-
// nopSendAndClose creates a GetStreamClient that has a nop SendAndClose function.
43-
// This is needed as Blob_GetStreamClient does not have a Close() function, whereas
44-
// the other sender, Blob_PutStreamServer, does.
45-
type nopSendAndClose struct {
42+
// closeFuncSendAndClose creates a GetStreamClient that calls a close function
43+
// when SendAndClose is called. The close function is expected to cancel the context
44+
// and cause the stream to clean itself up.
45+
type closeFuncSendAndClose struct {
4646
blobspb.RPCBlob_GetStreamClient
47+
close func()
4748
}
4849

49-
func (*nopSendAndClose) SendAndClose(*blobspb.StreamResponse) error {
50+
func (c *closeFuncSendAndClose) SendAndClose(*blobspb.StreamResponse) error {
51+
c.close()
5052
return nil
5153
}
5254

5355
// newGetStreamReader creates an io.ReadCloser that uses gRPC's streaming API
5456
// to read chunks of data.
55-
func newGetStreamReader(client blobspb.RPCBlob_GetStreamClient) ioctx.ReadCloserCtx {
57+
func newGetStreamReader(client blobspb.RPCBlob_GetStreamClient, close func()) ioctx.ReadCloserCtx {
5658
return &blobStreamReader{
57-
stream: &nopSendAndClose{client},
59+
stream: &closeFuncSendAndClose{client, close},
5860
}
5961
}
6062

pkg/workload/schemachange/error_screening.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ func (og *operationGenerator) tableHasDependencies(
150150
}
151151

152152
func (og *operationGenerator) columnIsDependedOn(
153-
ctx context.Context, tx pgx.Tx, tableName *tree.TableName, columnName tree.Name,
153+
ctx context.Context, tx pgx.Tx, tableName *tree.TableName, columnName tree.Name, includeFKs bool,
154154
) (bool, error) {
155155
// To see if a column is depended on, the ordinal_position of the column is looked up in
156156
// information_schema.columns. Then, this position is used to see if that column has view dependencies
@@ -187,6 +187,7 @@ func (og *operationGenerator) columnIsDependedOn(
187187
WHERE fd.descriptor_id
188188
= $1::REGCLASS
189189
AND fd.dependedonby_type != 'sequence'
190+
AND ($5::BOOL || fd.dependedonby_type != 'fk')
190191
)
191192
UNION (
192193
SELECT unnest(confkey) AS column_id
@@ -208,7 +209,7 @@ func (og *operationGenerator) columnIsDependedOn(
208209
AND column_name = $4
209210
) AS source ON source.column_id = cons.column_id
210211
)
211-
`, tableName.String(), tableName.Schema(), tableName.Object(), columnName)
212+
`, tableName.String(), tableName.Schema(), tableName.Object(), columnName, includeFKs)
212213
}
213214

214215
// colIsRefByComputed determines if a column is referenced by a computed column.

pkg/workload/schemachange/operation_generator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1713,7 +1713,7 @@ func (og *operationGenerator) dropColumn(ctx context.Context, tx pgx.Tx) (*opStm
17131713
if err != nil {
17141714
return nil, err
17151715
}
1716-
columnIsDependedOn, err := og.columnIsDependedOn(ctx, tx, tableName, columnName)
1716+
columnIsDependedOn, err := og.columnIsDependedOn(ctx, tx, tableName, columnName, false /* includeFKs */)
17171717
if err != nil {
17181718
return nil, err
17191719
}
@@ -2675,7 +2675,7 @@ func (og *operationGenerator) setColumnType(ctx context.Context, tx pgx.Tx) (*op
26752675
return nil, err
26762676
}
26772677

2678-
columnHasDependencies, err := og.columnIsDependedOn(ctx, tx, tableName, columnForTypeChange.name)
2678+
columnHasDependencies, err := og.columnIsDependedOn(ctx, tx, tableName, columnForTypeChange.name, true /* includeFKs */)
26792679
if err != nil {
26802680
return nil, err
26812681
}

0 commit comments

Comments
 (0)