@@ -166,14 +166,22 @@ func (ds *ServerImpl) setDraining(drain bool) error {
166
166
return nil
167
167
}
168
168
169
+ type onFlowCleanupFn func ()
170
+
171
+ func (f onFlowCleanupFn ) Do () {
172
+ if f != nil {
173
+ f ()
174
+ }
175
+ }
176
+
169
177
// setupFlow creates a Flow.
170
- //
171
- // - reserved: specifies the upfront memory reservation that the flow takes
172
- // ownership of. This account is already closed if an error is returned or
173
- // will be closed through Flow.Cleanup.
174
- //
175
- // - localState: specifies if the flow runs entirely on this node and, if it
176
- // does, specifies the txn and other attributes .
178
+ // - reserved: specifies the upfront memory reservation that the flow takes
179
+ // ownership of. This account is already closed if an error is returned or will
180
+ // be closed through Flow.Cleanup.
181
+ // - localState: specifies if the flow runs entirely on this node and, if it
182
+ // does, specifies the txn and other attributes.
183
+ // - onFlowCleanup, if non-nil, will be called at the end of Flow.Cleanup. It'll
184
+ // also be called if this method returns an error .
177
185
//
178
186
// Note: unless an error is returned, the returned context contains a span that
179
187
// must be finished through Flow.Cleanup.
@@ -186,6 +194,7 @@ func (ds *ServerImpl) setupFlow(
186
194
rowSyncFlowConsumer execinfra.RowReceiver ,
187
195
batchSyncFlowConsumer execinfra.BatchReceiver ,
188
196
localState LocalState ,
197
+ onFlowCleanup onFlowCleanupFn ,
189
198
) (retCtx context.Context , _ flowinfra.Flow , _ execopnode.OpChains , retErr error ) {
190
199
var sp * tracing.Span // will be Finish()ed by Flow.Cleanup()
191
200
var monitor , diskMonitor * mon.BytesMonitor // will be closed in Flow.Cleanup()
@@ -204,6 +213,7 @@ func (ds *ServerImpl) setupFlow(
204
213
onFlowCleanupEnd (ctx )
205
214
} else {
206
215
reserved .Close (ctx )
216
+ onFlowCleanup .Do ()
207
217
}
208
218
// We finish the span after performing other cleanup in case that
209
219
// cleanup accesses the context with the span.
@@ -300,18 +310,21 @@ func (ds *ServerImpl) setupFlow(
300
310
onFlowCleanupEnd = func (ctx context.Context ) {
301
311
localEvalCtx .Txn = origTxn
302
312
reserved .Close (ctx )
313
+ onFlowCleanup .Do ()
303
314
}
304
315
// Update the Txn field early (before f.SetTxn() below) since some
305
316
// processors capture the field in their constructor (see #41992).
306
317
localEvalCtx .Txn = leafTxn
307
318
} else {
308
319
onFlowCleanupEnd = func (ctx context.Context ) {
309
320
reserved .Close (ctx )
321
+ onFlowCleanup .Do ()
310
322
}
311
323
}
312
324
} else {
313
325
onFlowCleanupEnd = func (ctx context.Context ) {
314
326
reserved .Close (ctx )
327
+ onFlowCleanup .Do ()
315
328
}
316
329
if localState .IsLocal {
317
330
return nil , nil , nil , errors .AssertionFailedf (
@@ -579,7 +592,7 @@ func (ds *ServerImpl) SetupLocalSyncFlow(
579
592
) (context.Context , flowinfra.Flow , execopnode.OpChains , error ) {
580
593
return ds .setupFlow (
581
594
ctx , tracing .SpanFromContext (ctx ), parentMonitor , & mon.BoundAccount {}, /* reserved */
582
- req , output , batchOutput , localState ,
595
+ req , output , batchOutput , localState , nil , /* onFlowCleanup */
583
596
)
584
597
}
585
598
@@ -639,10 +652,6 @@ func (ds *ServerImpl) SetupFlow(
639
652
// Note: the passed context will be canceled when this RPC completes, so we
640
653
// can't associate it with the flow since it outlives the RPC.
641
654
ctx = ds .AnnotateCtx (context .Background ())
642
- // Ensure that the flow respects the node being shut down. Note that since
643
- // the flow outlives the RPC, we cannot defer the cancel function, so we
644
- // simply ignore it.
645
- ctx , _ = ds .Stopper .WithCancelOnQuiesce (ctx )
646
655
if err := func () error {
647
656
// Reserve some memory for this remote flow which is a poor man's
648
657
// admission control based on the RAM usage.
@@ -651,10 +660,18 @@ func (ds *ServerImpl) SetupFlow(
651
660
if err != nil {
652
661
return err
653
662
}
663
+ // Ensure that the flow respects the node being shut down. We can only
664
+ // call the cancellation function once the flow exits.
665
+ //
666
+ // setupFlow will either call 'cancel' if an error is returned, or the
667
+ // cancellation function is taken over by the flow, and it'll be called
668
+ // in Flow.Cleanup.
669
+ var cancel context.CancelFunc
670
+ ctx , cancel = ds .Stopper .WithCancelOnQuiesce (ctx )
654
671
var f flowinfra.Flow
655
672
ctx , f , _ , err = ds .setupFlow (
656
673
ctx , rpcSpan , ds .memMonitor , & reserved , req , nil , /* rowSyncFlowConsumer */
657
- nil /* batchSyncFlowConsumer */ , LocalState {},
674
+ nil /* batchSyncFlowConsumer */ , LocalState {}, onFlowCleanupFn ( cancel ),
658
675
)
659
676
// Check whether the RPC context has been canceled indicating that we
660
677
// actually don't need to run this flow. This can happen when the
0 commit comments