Skip to content

Commit 7029127

Browse files
committed
sql: fix rare race around concurrent remote flows setup
A few years ago in 0c1095e we changed the way we set up distributed query plans. Namely, we now start by setting up the gateway (i.e. local) flow first, and then we'll issue SetupFlowRequest RPCs concurrently to set up remote flows without actually blocking on the gateway until the setup is complete. We have seen about 5 occurrences where the protobuf marshaling code crashed when handling those concurrent RPCs. I have a hypothesis is that this is due to the main goroutine of the gateway flow not waiting until after RPCs are done. In particular, we put `PhysicalInfrastructure` objects into `sync.Pool` and they are released by executing `PlanningCtx.getCleanupFunc` function. That function is executed in a defer after `Run`ning the local flow completes. However, it's possible that it'll be executed _before_ concurrent SetupFlowRequest RPCs (evaluated via the distsql worker goroutines) are performed, and I'm guessing the flow specs might get corrupted because of that. In order to prevent this race, we now will block execution of `Flow.Cleanup` function of the gateway flow until all concurrent RPCs are done. I tried injecting the sleep right before executing the concurrent RPCs but still was unable to reproduce the problem on the gceworker. Given that we've only seen this a handful of times, I decided to omit the release note. Release note: None
1 parent 8e5af1e commit 7029127

File tree

1 file changed

+27
-31
lines changed

1 file changed

+27
-31
lines changed

pkg/sql/distsql_running.go

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -568,10 +568,14 @@ func (dsp *DistSQLPlanner) setupFlows(
568568
var runnerSpan *tracing.Span
569569
// This span is necessary because it can outlive its parent.
570570
runnerCtx, runnerSpan = tracing.ChildSpan(runnerCtx, "setup-flow-async" /* opName */)
571+
// runnerDone will be closed whenever all concurrent SetupFlowRequest RPCs
572+
// are complete.
573+
runnerDone := make(chan struct{})
571574
// runnerCleanup can only be executed _after_ all issued RPCs are complete.
572575
runnerCleanup := func() {
573576
cancelRunnerCtx()
574577
runnerSpan.Finish()
578+
close(runnerDone)
575579
}
576580
// Make sure that we call runnerCleanup unless a new goroutine takes that
577581
// responsibility.
@@ -642,24 +646,20 @@ func (dsp *DistSQLPlanner) setupFlows(
642646
// setup of a remote flow fails, we want to eagerly cancel all the flows,
643647
// and we do so in a separate goroutine.
644648
//
645-
// We need to synchronize the new goroutine with flow.Cleanup() being called
646-
// since flow.Cleanup() is the last thing before DistSQLPlanner.Run returns
647-
// at which point the rowResultWriter is no longer protected by the mutex of
648-
// the DistSQLReceiver.
649-
// TODO(yuzefovich): think through this comment - we no longer have mutex
650-
// protection in place, so perhaps this can be simplified.
651-
cleanupCalledMu := struct {
652-
syncutil.Mutex
653-
called bool
654-
}{}
649+
// We need to ensure that the local flow Cleanup() is blocked until all RPCs
650+
// are performed. This is needed since we release the PhysicalInfrastructure
651+
// back into the sync.Pool, and FlowSpecs for concurrent RPCs might
652+
// reference that infra.
653+
var cleanupCalled atomic.Bool
655654
flow.AddOnCleanupStart(func() {
656-
cleanupCalledMu.Lock()
657-
defer cleanupCalledMu.Unlock()
658-
cleanupCalledMu.called = true
659-
// Cancel any outstanding RPCs while holding the lock to protect from
660-
// the context canceled error (the result of the RPC) being set on the
661-
// DistSQLReceiver by the listener goroutine below.
655+
// Indicate that the local flow Cleanup has started - at this point we
656+
// can ignore any errors communicated by the listener goroutine below.
657+
cleanupCalled.Store(true)
658+
// Cancel any outstanding RPCs.
662659
cancelRunnerCtx()
660+
// Now block the cleanup of the local flow until all concurrent RPCs are
661+
// complete.
662+
<-runnerDone
663663
})
664664
err = dsp.stopper.RunAsyncTask(origCtx, "distsql-remote-flows-setup-listener", func(ctx context.Context) {
665665
// Note that in the loop below we always receive from the result channel
@@ -673,21 +673,17 @@ func (dsp *DistSQLPlanner) setupFlows(
673673
if res.err != nil && !seenError {
674674
// The setup of at least one remote flow failed.
675675
seenError = true
676-
func() {
677-
cleanupCalledMu.Lock()
678-
defer cleanupCalledMu.Unlock()
679-
if cleanupCalledMu.called {
680-
// Cleanup of the local flow has already been performed,
681-
// so there is nothing to do.
682-
return
683-
}
684-
// First, we send the error to the DistSQL receiver to be
685-
// returned to the client eventually (which will happen on
686-
// the next Push or PushBatch call).
687-
recv.concurrentErrorCh <- res.err
688-
// Now explicitly cancel the local flow.
689-
flow.Cancel()
690-
}()
676+
if cleanupCalled.Load() {
677+
// Cleanup of the local flow has already started, so there
678+
// is nothing to communicate to the local flow.
679+
continue
680+
}
681+
// First, we send the error to the DistSQL receiver to be
682+
// returned to the client eventually (which will happen on the
683+
// next Push or PushBatch call).
684+
recv.concurrentErrorCh <- res.err
685+
// Now explicitly cancel the local flow.
686+
flow.Cancel()
691687
}
692688
}
693689
})

0 commit comments

Comments
 (0)