Skip to content

Commit 875e04a

Browse files
craig[bot]yuzefovichangles-n-daemons
committed
149800: distsql: fix recently introduced leak of CancelFunc r=yuzefovich a=yuzefovich In 27a65c9 we introduced the logic to make sure that remote flows respect the quiesce signal. However, we consciusly ignored the returned CancelFunc since the lifetime of the context is non-trivial. This introduced a leak of that CancelFunc since we keep the reference to it in the stopper even when the flow exits, so every time a remote flow runs on a node, the leak would slowly grow. This leak is now fixed by ensuring the cancellation function is always called. This is achieved by passing the function as another thing to do on the "flow cleanup" (we already always need to close the reserved memory account, so we have the necessary infrastructure set up). Fixes: #149658. Release note (bug fix): In 25.1.8, 25.2.1, 25.2.2, and 25.3 betas, a slow memory leak was introduced that would accumulate whenever a node executes a part of the distributed plan (the gateway node of the plan is not affected). The leak can only be mitigated by restarting the node and is now fixed. 149891: jobs: update hot ranges logger details r=angles-n-daemons a=angles-n-daemons Two issues with the hot ranges logging job is that it is not marked as internal, and its description is capitalized, which does not follow convention. This PR adds the job to the AutomaticJobTypes which ensures its hidden from the default view, in addition to updating the description to be more focused and appropriate. Fixes: none Epic: none Release note: none Co-authored-by: Yahor Yuzefovich <[email protected]> Co-authored-by: Brian Dillmann <[email protected]>
3 parents e074bcf + 1b23422 + cf520e9 commit 875e04a

File tree

4 files changed

+31
-14
lines changed

4 files changed

+31
-14
lines changed

pkg/jobs/jobspb/wrap.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ var AutomaticJobTypes = [...]Type{
179179
TypeMVCCStatisticsUpdate,
180180
TypeUpdateTableMetadataCache,
181181
TypeSQLActivityFlush,
182+
TypeHotRangesLogger,
182183
}
183184

184185
// DetailsType returns the type for a payload detail.

pkg/kv/kvclient/kvstreamer/streamer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,9 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []kvpb.RequestUnion) (retEr
532532
},
533533
s.coordinator.mainLoop,
534534
); err != nil {
535+
// The server is shutting down. It's ok to not call
536+
// s.coordinatorCtxCancel in this case.
537+
//
535538
// The new goroutine wasn't spun up, so mainLoop won't get executed
536539
// and we have to decrement the wait group ourselves.
537540
s.waitGroup.Done()

pkg/sql/distsql/server.go

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -167,14 +167,22 @@ func (ds *ServerImpl) setDraining(drain bool) error {
167167
return nil
168168
}
169169

170+
type onFlowCleanupFn func()
171+
172+
func (f onFlowCleanupFn) Do() {
173+
if f != nil {
174+
f()
175+
}
176+
}
177+
170178
// setupFlow creates a Flow.
171-
//
172-
// - reserved: specifies the upfront memory reservation that the flow takes
173-
// ownership of. This account is already closed if an error is returned or
174-
// will be closed through Flow.Cleanup.
175-
//
176-
// - localState: specifies if the flow runs entirely on this node and, if it
177-
// does, specifies the txn and other attributes.
179+
// - reserved: specifies the upfront memory reservation that the flow takes
180+
// ownership of. This account is already closed if an error is returned or will
181+
// be closed through Flow.Cleanup.
182+
// - localState: specifies if the flow runs entirely on this node and, if it
183+
// does, specifies the txn and other attributes.
184+
// - onFlowCleanup, if non-nil, will be called at the end of Flow.Cleanup. It'll
185+
// also be called if this method returns an error.
178186
//
179187
// Note: unless an error is returned, the returned context contains a span that
180188
// must be finished through Flow.Cleanup.
@@ -187,6 +195,7 @@ func (ds *ServerImpl) setupFlow(
187195
rowSyncFlowConsumer execinfra.RowReceiver,
188196
batchSyncFlowConsumer execinfra.BatchReceiver,
189197
localState LocalState,
198+
onFlowCleanup onFlowCleanupFn,
190199
) (retCtx context.Context, _ flowinfra.Flow, _ execopnode.OpChains, retErr error) {
191200
var sp *tracing.Span // will be Finish()ed by Flow.Cleanup()
192201
var monitor, diskMonitor *mon.BytesMonitor // will be closed in Flow.Cleanup()
@@ -205,6 +214,7 @@ func (ds *ServerImpl) setupFlow(
205214
onFlowCleanupEnd(ctx)
206215
} else {
207216
reserved.Close(ctx)
217+
onFlowCleanup.Do()
208218
}
209219
// We finish the span after performing other cleanup in case that
210220
// cleanup accesses the context with the span.
@@ -301,18 +311,21 @@ func (ds *ServerImpl) setupFlow(
301311
onFlowCleanupEnd = func(ctx context.Context) {
302312
localEvalCtx.Txn = origTxn
303313
reserved.Close(ctx)
314+
onFlowCleanup.Do()
304315
}
305316
// Update the Txn field early (before f.SetTxn() below) since some
306317
// processors capture the field in their constructor (see #41992).
307318
localEvalCtx.Txn = leafTxn
308319
} else {
309320
onFlowCleanupEnd = func(ctx context.Context) {
310321
reserved.Close(ctx)
322+
onFlowCleanup.Do()
311323
}
312324
}
313325
} else {
314326
onFlowCleanupEnd = func(ctx context.Context) {
315327
reserved.Close(ctx)
328+
onFlowCleanup.Do()
316329
}
317330
if localState.IsLocal {
318331
return nil, nil, nil, errors.AssertionFailedf(
@@ -580,7 +593,7 @@ func (ds *ServerImpl) SetupLocalSyncFlow(
580593
) (context.Context, flowinfra.Flow, execopnode.OpChains, error) {
581594
return ds.setupFlow(
582595
ctx, tracing.SpanFromContext(ctx), parentMonitor, &mon.BoundAccount{}, /* reserved */
583-
req, output, batchOutput, localState,
596+
req, output, batchOutput, localState, nil, /* onFlowCleanup */
584597
)
585598
}
586599

@@ -640,10 +653,10 @@ func (ds *ServerImpl) SetupFlow(
640653
// Note: the passed context will be canceled when this RPC completes, so we
641654
// can't associate it with the flow since it outlives the RPC.
642655
ctx = ds.AnnotateCtx(context.Background())
643-
// Ensure that the flow respects the node being shut down. Note that since
644-
// the flow outlives the RPC, we cannot defer the cancel function, so we
645-
// simply ignore it.
646-
ctx, _ = ds.Stopper.WithCancelOnQuiesce(ctx)
656+
// Ensure that the flow respects the node being shut down. We can only call
657+
// the cancellation function once the flow exits.
658+
var cancel context.CancelFunc
659+
ctx, cancel = ds.Stopper.WithCancelOnQuiesce(ctx)
647660
if err := func() error {
648661
// Reserve some memory for this remote flow which is a poor man's
649662
// admission control based on the RAM usage.
@@ -655,7 +668,7 @@ func (ds *ServerImpl) SetupFlow(
655668
var f flowinfra.Flow
656669
ctx, f, _, err = ds.setupFlow(
657670
ctx, rpcSpan, ds.memMonitor, &reserved, req, nil, /* rowSyncFlowConsumer */
658-
nil /* batchSyncFlowConsumer */, LocalState{},
671+
nil /* batchSyncFlowConsumer */, LocalState{}, onFlowCleanupFn(cancel),
659672
)
660673
// Check whether the RPC context has been canceled indicating that we
661674
// actually don't need to run this flow. This can happen when the

pkg/upgrade/upgrades/v25_3_add_hot_range_logger_job.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func createHotRangesLoggerJob(
3333
return d.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
3434
jr := jobs.Record{
3535
JobID: jobs.HotRangesLoggerJobID,
36-
Description: jobspb.TypeHotRangesLogger.String(),
36+
Description: "the background hot ranges logging job that runs on sql nodes",
3737
Details: jobspb.HotRangesLoggerDetails{},
3838
Progress: jobspb.HotRangesLoggerProgress{},
3939
CreatedBy: &jobs.CreatedByInfo{Name: username.NodeUser, ID: username.NodeUserID},

0 commit comments

Comments
 (0)