@@ -754,7 +754,7 @@ func (dsp *DistSQLPlanner) Run(
754754 // the line.
755755 localState .EvalContext = evalCtx
756756 localState .IsLocal = planCtx .isLocal
757- localState .MustUseLeaf = planCtx .mustUseLeafTxn
757+ localState .AddConcurrency ( planCtx .flowConcurrency )
758758 localState .Txn = txn
759759 localState .LocalProcs = plan .LocalProcessors
760760 localState .LocalVectorSources = plan .LocalVectorSources
@@ -777,15 +777,19 @@ func (dsp *DistSQLPlanner) Run(
777777 // cannot create a LeafTxn, so we cannot parallelize scans.
778778 planCtx .parallelizeScansIfLocal = false
779779 for _ , flow := range flows {
780- localState .HasConcurrency = localState .HasConcurrency || execinfra .HasParallelProcessors (flow )
780+ if execinfra .HasParallelProcessors (flow ) {
781+ localState .AddConcurrency (distsql .ConcurrencyHasParallelProcessors )
782+ }
781783 }
782784 } else {
783785 if planCtx .isLocal && noMutations && planCtx .parallelizeScansIfLocal {
784786 // Even though we have a single flow on the gateway node, we might
785787 // have decided to parallelize the scans. If that's the case, we
786788 // will need to use the Leaf txn.
787789 for _ , flow := range flows {
788- localState .HasConcurrency = localState .HasConcurrency || execinfra .HasParallelProcessors (flow )
790+ if execinfra .HasParallelProcessors (flow ) {
791+ localState .AddConcurrency (distsql .ConcurrencyHasParallelProcessors )
792+ }
789793 }
790794 }
791795 if noMutations {
@@ -886,7 +890,7 @@ func (dsp *DistSQLPlanner) Run(
886890 // Both index and lookup joins, with and without
887891 // ordering, are executed via the Streamer API that has
888892 // concurrency.
889- localState .HasConcurrency = true
893+ localState .AddConcurrency ( distsql . ConcurrencyStreamer )
890894 break
891895 }
892896 }
@@ -1009,11 +1013,37 @@ func (dsp *DistSQLPlanner) Run(
10091013 return
10101014 }
10111015
1012- if len (flows ) == 1 && evalCtx .Txn != nil && evalCtx .Txn .Type () == kv .RootTxn {
1013- // If we have a fully local plan and a RootTxn, we don't expect any
1014- // concurrency, so it's safe to use the DistSQLReceiver to push the
1015- // metadata into directly from routines.
1016- if planCtx .planner != nil {
1016+ if len (flows ) == 1 && planCtx .planner != nil {
1017+ // We have a fully local plan, so check whether it'll be safe to use the
1018+ // DistSQLReceiver to push the metadata into directly from routines
1019+ // (which is the case when we don't have any concurrency between
1020+ // routines themselves as well as a routine and the "head" processor -
1021+ // the one pushing into the DistSQLReceiver).
1022+ var safe bool
1023+ if evalCtx .Txn != nil && evalCtx .Txn .Type () == kv .RootTxn {
1024+ // We have a RootTxn, so we don't expect any concurrency whatsoever.
1025+ safe = true
1026+ } else {
1027+ // We have a LeafTxn, so we need to examine what kind of concurrency
1028+ // is present in the flow.
1029+ var safeConcurrency distsql.ConcurrencyKind
1030+ // We don't care whether we use the Streamer API - it has
1031+ // concurrency only at the KV client level and below.
1032+ safeConcurrency |= distsql .ConcurrencyStreamer
1033+ // If we have "outer plan" concurrency, the "inner" and the
1034+ // "outer" plans have their own DistSQLReceivers.
1035+ //
1036+ // Note that the same is the case with parallel CHECKs concurrency,
1037+ // but then planCtx.planner is shared between goroutines, so we'll
1038+ // avoid mutating it. (We can't have routines in post-query CHECKs
1039+ // since only FK and UNIQUE checks are run in parallel.)
1040+ safeConcurrency |= distsql .ConcurrencyWithOuterPlan
1041+ unsafeConcurrency := ^ safeConcurrency
1042+ if localState .GetConcurrency ()& unsafeConcurrency == 0 {
1043+ safe = true
1044+ }
1045+ }
1046+ if safe {
10171047 planCtx .planner .routineMetadataForwarder = recv
10181048 }
10191049 }
@@ -1989,7 +2019,7 @@ func (dsp *DistSQLPlanner) PlanAndRunAll(
19892019 // Skip the diagram generation since on this "main" query path we
19902020 // can get it via the statement bundle.
19912021 true , /* skipDistSQLDiagramGeneration */
1992- false , /* mustUseLeafTxn */
2022+ false , /* innerPlansMustUseLeafTxn */
19932023 ) {
19942024 return recv .commErr
19952025 }
@@ -2056,7 +2086,7 @@ func (dsp *DistSQLPlanner) PlanAndRunSubqueries(
20562086 recv * DistSQLReceiver ,
20572087 subqueryResultMemAcc * mon.BoundAccount ,
20582088 skipDistSQLDiagramGeneration bool ,
2059- mustUseLeafTxn bool ,
2089+ innerPlansMustUseLeafTxn bool ,
20602090) bool {
20612091 for planIdx , subqueryPlan := range subqueryPlans {
20622092 if err := dsp .planAndRunSubquery (
@@ -2069,7 +2099,7 @@ func (dsp *DistSQLPlanner) PlanAndRunSubqueries(
20692099 recv ,
20702100 subqueryResultMemAcc ,
20712101 skipDistSQLDiagramGeneration ,
2072- mustUseLeafTxn ,
2102+ innerPlansMustUseLeafTxn ,
20732103 ); err != nil {
20742104 recv .SetError (err )
20752105 return false
@@ -2093,7 +2123,7 @@ func (dsp *DistSQLPlanner) planAndRunSubquery(
20932123 recv * DistSQLReceiver ,
20942124 subqueryResultMemAcc * mon.BoundAccount ,
20952125 skipDistSQLDiagramGeneration bool ,
2096- mustUseLeafTxn bool ,
2126+ innerPlansMustUseLeafTxn bool ,
20972127) error {
20982128 subqueryDistribution , distSQLProhibitedErr := planner .getPlanDistribution (ctx , subqueryPlan .plan )
20992129 distribute := DistributionType (LocalDistribution )
@@ -2105,7 +2135,9 @@ func (dsp *DistSQLPlanner) planAndRunSubquery(
21052135 subqueryPlanCtx .stmtType = tree .Rows
21062136 subqueryPlanCtx .skipDistSQLDiagramGeneration = skipDistSQLDiagramGeneration
21072137 subqueryPlanCtx .subOrPostQuery = true
2108- subqueryPlanCtx .mustUseLeafTxn = mustUseLeafTxn
2138+ if innerPlansMustUseLeafTxn {
2139+ subqueryPlanCtx .flowConcurrency = distsql .ConcurrencyWithOuterPlan
2140+ }
21092141 if planner .instrumentation .ShouldSaveFlows () {
21102142 subqueryPlanCtx .saveFlows = getDefaultSaveFlowsFunc (ctx , planner , planComponentTypeSubquery )
21112143 }
@@ -2724,7 +2756,9 @@ func (dsp *DistSQLPlanner) planAndRunPostquery(
27242756 }
27252757 postqueryPlanCtx .associateNodeWithComponents = associateNodeWithComponents
27262758 postqueryPlanCtx .collectExecStats = planner .instrumentation .ShouldCollectExecStats ()
2727- postqueryPlanCtx .mustUseLeafTxn = parallelCheck
2759+ if parallelCheck {
2760+ postqueryPlanCtx .flowConcurrency = distsql .ConcurrencyParallelChecks
2761+ }
27282762
27292763 postqueryPhysPlan , physPlanCleanup , err := dsp .createPhysPlan (ctx , postqueryPlanCtx , postqueryPlan )
27302764 defer physPlanCleanup ()
0 commit comments