Skip to content

Commit b801752

Browse files
craig[bot]mgartner
andcommitted
Merge #138692
138692: sql: refactor `checkScanParallelizationIfLocal` r=mgartner a=mgartner #### sql: replace `walkPlan` in `checkScanParallelizationIfLocal` `checkScanParallelizationIfLocal` now traverses a `planNode` tree using the `Input` and `InputCount` methods instead of `walkPlan`. Epic: None Release note: None #### sql: fix checkScanParallelizationIfLocal for groupNode This commit fixes a bug in `checkScanParallelizationIfLocal` that caused it to ignore all but the last aggregate function in a `groupNode`. This has been fixed and a test has been added. In addition, an early return has been added to the `renderNode` case. Release note: None Co-authored-by: Marcus Gartner <[email protected]>
2 parents 8203d6b + af08334 commit b801752

File tree

3 files changed

+88
-77
lines changed

3 files changed

+88
-77
lines changed

pkg/sql/distsql_physical_planner.go

Lines changed: 66 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -5289,7 +5289,7 @@ func logAndSanitizeExportDestination(ctx context.Context, dest string) error {
52895289
// processors eagerly move into the draining state which will cancel the context
52905290
// of parallel TableReaders which might "poison" the transaction.
52915291
func checkScanParallelizationIfLocal(
5292-
ctx context.Context, plan *planComponents, c *localScanParallelizationChecker,
5292+
ctx context.Context, plan *planComponents,
52935293
) (prohibitParallelization, hasScanNodeToParallelize bool) {
52945294
if plan.main.planNode == nil || len(plan.cascades) != 0 ||
52955295
len(plan.checkPlans) != 0 || len(plan.triggers) != 0 {
@@ -5298,82 +5298,76 @@ func checkScanParallelizationIfLocal(
52985298
// the scan parallelization.
52995299
return true, false
53005300
}
5301-
*c = localScanParallelizationChecker{}
5302-
o := planObserver{enterNode: c.enterNode}
5303-
_ = walkPlan(ctx, plan.main.planNode, o)
5304-
for _, s := range plan.subqueryPlans {
5305-
_ = walkPlan(ctx, s.plan.planNode, o)
5306-
}
5307-
return c.prohibitParallelization, c.hasScanNodeToParallelize
5308-
}
5309-
5310-
type localScanParallelizationChecker struct {
5311-
prohibitParallelization bool
5312-
hasScanNodeToParallelize bool
5313-
}
53145301

5315-
func (c *localScanParallelizationChecker) enterNode(
5316-
ctx context.Context, _ string, plan planNode,
5317-
) (bool, error) {
5318-
if c.prohibitParallelization {
5319-
return false, nil
5320-
}
5321-
switch n := plan.(type) {
5322-
case *distinctNode:
5323-
return true, nil
5324-
case *explainPlanNode:
5325-
// walkPlan doesn't recurse into explainPlanNode, so we have to manually
5326-
// walk over the wrapped plan.
5327-
plan := n.plan.WrappedPlan.(*planComponents)
5328-
prohibit, has := checkScanParallelizationIfLocal(ctx, plan, c)
5329-
c.prohibitParallelization = c.prohibitParallelization || prohibit
5330-
c.hasScanNodeToParallelize = c.hasScanNodeToParallelize || has
5331-
return false, nil
5332-
case *explainVecNode:
5333-
return true, nil
5334-
case *filterNode:
5335-
// Some filter expressions might be handled by falling back to the
5336-
// wrapped processors, so we choose to be safe.
5337-
c.prohibitParallelization = true
5338-
return false, nil
5339-
case *groupNode:
5340-
for _, f := range n.funcs {
5341-
c.prohibitParallelization = f.hasFilter()
5302+
var checkRec func(p planNode)
5303+
checkRec = func(p planNode) {
5304+
if prohibitParallelization {
5305+
return
53425306
}
5343-
return true, nil
5344-
case *indexJoinNode:
5345-
return true, nil
5346-
case *joinNode:
5347-
c.prohibitParallelization = n.pred.onCond != nil
5348-
return true, nil
5349-
case *limitNode:
5350-
return true, nil
5351-
case *ordinalityNode:
5352-
return true, nil
5353-
case *renderNode:
5354-
// Only support projections since render expressions might be handled
5355-
// via a wrapped row-by-row processor.
5356-
for _, e := range n.render {
5357-
if _, isIVar := e.(*tree.IndexedVar); !isIVar {
5358-
c.prohibitParallelization = true
5307+
switch n := p.(type) {
5308+
case *explainPlanNode:
5309+
// explainPlanNode is a zeroInputPlanNode, so we have to manually
5310+
// recurse.
5311+
plan := n.plan.WrappedPlan.(*planComponents)
5312+
prohibit, has := checkScanParallelizationIfLocal(ctx, plan)
5313+
prohibitParallelization = prohibitParallelization || prohibit
5314+
hasScanNodeToParallelize = hasScanNodeToParallelize || has
5315+
// Do not recurse.
5316+
return
5317+
case *filterNode:
5318+
// Some filter expressions might be handled by falling back to the
5319+
// wrapped processors, so we choose to be safe.
5320+
prohibitParallelization = true
5321+
// Do not recurse.
5322+
return
5323+
case *groupNode:
5324+
for _, f := range n.funcs {
5325+
if f.hasFilter() {
5326+
prohibitParallelization = true
5327+
// Do not recurse.
5328+
return
5329+
}
5330+
}
5331+
case *joinNode:
5332+
prohibitParallelization = n.pred.onCond != nil
5333+
case *renderNode:
5334+
// Only support projections since render expressions might be
5335+
// handled via a wrapped row-by-row processor.
5336+
for _, e := range n.render {
5337+
if _, isIVar := e.(*tree.IndexedVar); !isIVar {
5338+
prohibitParallelization = true
5339+
// Do not recurse.
5340+
return
5341+
}
53595342
}
5343+
case *scanNode:
5344+
if len(n.reqOrdering) == 0 && n.parallelize {
5345+
hasScanNodeToParallelize = true
5346+
}
5347+
case *distinctNode, *explainVecNode, *indexJoinNode, *limitNode,
5348+
*ordinalityNode, *sortNode, *unionNode, *valuesNode:
5349+
default:
5350+
prohibitParallelization = true
5351+
// Do not recurse into any unmatched nodes.
5352+
return
53605353
}
5361-
return true, nil
5362-
case *scanNode:
5363-
if len(n.reqOrdering) == 0 && n.parallelize {
5364-
c.hasScanNodeToParallelize = true
5354+
// Recurse into input nodes.
5355+
for i, n := 0, p.InputCount(); i < n; i++ {
5356+
input, err := p.Input(i)
5357+
if err != nil {
5358+
// This error should never occur with correct usage of Input
5359+
// and InputCount. If it does, ignore it and prohibit
5360+
// parallelization.
5361+
prohibitParallelization = true
5362+
}
5363+
checkRec(input)
53655364
}
5366-
return true, nil
5367-
case *sortNode:
5368-
return true, nil
5369-
case *unionNode:
5370-
return true, nil
5371-
case *valuesNode:
5372-
return true, nil
5373-
default:
5374-
c.prohibitParallelization = true
5375-
return false, nil
53765365
}
5366+
checkRec(plan.main.planNode)
5367+
for _, s := range plan.subqueryPlans {
5368+
checkRec(s.plan.planNode)
5369+
}
5370+
return prohibitParallelization, hasScanNodeToParallelize
53775371
}
53785372

53795373
// NewPlanningCtx returns a new PlanningCtx. When distribute is false, a
@@ -5435,7 +5429,7 @@ func (dsp *DistSQLPlanner) NewPlanningCtxWithOracle(
54355429
return planCtx
54365430
}
54375431
prohibitParallelization, hasScanNodeToParallelize := checkScanParallelizationIfLocal(
5438-
ctx, &planner.curPlan.planComponents, &planner.parallelizationChecker,
5432+
ctx, &planner.curPlan.planComponents,
54395433
)
54405434
if prohibitParallelization || !hasScanNodeToParallelize {
54415435
return planCtx

pkg/sql/distsql_physical_planner_test.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1994,6 +1994,17 @@ func TestCheckScanParallelizationIfLocal(t *testing.T) {
19941994
// Filtering aggregation is not natively supported.
19951995
prohibitParallelization: true,
19961996
},
1997+
{
1998+
plan: planComponents{main: planMaybePhysical{planNode: &groupNode{
1999+
singleInputPlanNode: singleInputPlanNode{scanToParallelize},
2000+
funcs: []*aggregateFuncHolder{
2001+
{filterRenderIdx: 0},
2002+
{filterRenderIdx: tree.NoColumnIdx},
2003+
}},
2004+
}},
2005+
// Filtering aggregation is not natively supported.
2006+
prohibitParallelization: true,
2007+
},
19972008
{
19982009
plan: planComponents{main: planMaybePhysical{planNode: &indexJoinNode{
19992010
singleInputPlanNode: singleInputPlanNode{scanToParallelize},
@@ -2028,6 +2039,16 @@ func TestCheckScanParallelizationIfLocal(t *testing.T) {
20282039
// prohibit the parallelization for all non-IndexedVar expressions).
20292040
prohibitParallelization: true,
20302041
},
2042+
{
2043+
plan: planComponents{main: planMaybePhysical{planNode: &renderNode{
2044+
singleInputPlanNode: singleInputPlanNode{scanToParallelize},
2045+
render: []tree.TypedExpr{&tree.IndexedVar{Idx: 0}, &tree.IsNullExpr{}},
2046+
}}},
2047+
// Not a simple projection (some expressions might be handled by
2048+
// wrapping a row-execution processor, so we choose to be safe and
2049+
// prohibit the parallelization for all non-IndexedVar expressions).
2050+
prohibitParallelization: true,
2051+
},
20312052
{
20322053
plan: planComponents{main: planMaybePhysical{planNode: &sortNode{singleInputPlanNode: singleInputPlanNode{scanToParallelize}}}},
20332054
hasScanNodeToParallelize: true,
@@ -2064,8 +2085,7 @@ func TestCheckScanParallelizationIfLocal(t *testing.T) {
20642085
prohibitParallelization: true,
20652086
},
20662087
} {
2067-
var c localScanParallelizationChecker
2068-
prohibitParallelization, hasScanNodeToParallize := checkScanParallelizationIfLocal(context.Background(), &tc.plan, &c)
2088+
prohibitParallelization, hasScanNodeToParallize := checkScanParallelizationIfLocal(context.Background(), &tc.plan)
20692089
require.Equal(t, tc.prohibitParallelization, prohibitParallelization)
20702090
require.Equal(t, tc.hasScanNodeToParallelize, hasScanNodeToParallize)
20712091
}

pkg/sql/planner.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,9 +285,6 @@ type planner struct {
285285
// This field is embedded into the planner to avoid an allocation in
286286
// checkExprForDistSQL.
287287
distSQLVisitor distSQLExprCheckVisitor
288-
// This field is embedded into the planner to avoid an allocation in
289-
// checkScanParallelizationIfLocal.
290-
parallelizationChecker localScanParallelizationChecker
291288

292289
// datumAlloc is used when decoding datums and running subqueries.
293290
datumAlloc *tree.DatumAlloc

0 commit comments

Comments
 (0)