Skip to content

Commit 6676559

Browse files
committed
sql: audit DistSQL supportability for streamer
In c17591e we made so that if we have a plan that cannot be distributed for any reason, we would disable the streamer out of caution. This was known to be a super-set of problematic scenarios, and this commit addresses that oversight. In particular, it revamps `checkSupportForPlanNode` function so that we accumulate all possible blockers for DistSQL, and then we examine each one to decide whether it's safe to use the streamer given the blockers found. The blockers are accumulated into a bit-map, and the methods have been adjusted to do a full walk over the planNode tree (as well as over all expressions) to capture all blockers - this should have negligible overhead. An additional improvement is that we now replace errors with different bits and we can log all blockers for DistSQL not being supported. What prompted this work was a customer query where we didn't use the streamer because of "unsupported planNode" distsql error. This was attempted to be fixed in f6038a1, but the fix was incomplete, and now it is. Release note: None
1 parent 5c6b9af commit 6676559

18 files changed

+650
-259
lines changed

pkg/gen/stringer.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ STRINGER_SRCS = [
4646
"//pkg/sql/sem/tree:statementreturntype_string.go",
4747
"//pkg/sql/sem/tree:statementtype_string.go",
4848
"//pkg/sql:advancecode_string.go",
49+
"//pkg/sql:distsqlblocker_string.go",
4950
"//pkg/sql:nodestatus_string.go",
5051
"//pkg/sql:txneventtype_string.go",
5152
"//pkg/sql:txntype_string.go",

pkg/sql/BUILD.bazel

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ go_library(
298298
"zone_config.go",
299299
"zone_config_helper.go",
300300
":gen-advancecode-stringer", # keep
301+
":gen-distsqlblocker-stringer", # keep
301302
":gen-nodestatus-stringer", # keep
302303
":gen-txneventtype-stringer", # keep
303304
":gen-txntype-stringer", # keep
@@ -680,6 +681,7 @@ go_test(
680681
"delete_preserving_index_test.go",
681682
"descriptor_mutation_test.go",
682683
"descriptor_test.go",
684+
"distsql_check_test.go",
683685
"distsql_leaf_txn_test.go",
684686
"distsql_physical_planner_test.go",
685687
"distsql_plan_backfill_test.go",
@@ -871,6 +873,7 @@ go_test(
871873
"//pkg/sql/execstats",
872874
"//pkg/sql/flowinfra",
873875
"//pkg/sql/gcjob",
876+
"//pkg/sql/inverted",
874877
"//pkg/sql/isql",
875878
"//pkg/sql/lexbase",
876879
"//pkg/sql/mutations",
@@ -1025,6 +1028,12 @@ stringer(
10251028
typ = "advanceCode",
10261029
)
10271030

1031+
stringer(
1032+
name = "gen-distsqlblocker-stringer",
1033+
src = "distsql_check.go",
1034+
typ = "distSQLBlocker",
1035+
)
1036+
10281037
stringer(
10291038
name = "gen-nodestatus-stringer",
10301039
src = "distsql_physical_planner.go",

pkg/sql/apply_join.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -333,14 +333,14 @@ func runPlanInsidePlan(
333333
}
334334
}
335335

336-
distributePlan, distSQLProhibitedErr := plannerCopy.getPlanDistribution(ctx, plan.main, notPostquery)
336+
distributePlan, blockers := plannerCopy.getPlanDistribution(ctx, plan.main, notPostquery)
337337
distributeType := DistributionType(LocalDistribution)
338338
if distributePlan.WillDistribute() {
339339
distributeType = FullDistribution
340340
}
341341
evalCtx := evalCtxFactory()
342342
planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, &plannerCopy, plannerCopy.txn, distributeType)
343-
planCtx.distSQLProhibitedErr = distSQLProhibitedErr
343+
planCtx.distSQLBlockers = blockers
344344
planCtx.stmtType = recv.stmtType
345345
if sqlStatsBuilder != nil && plannerCopy.instrumentation.ShouldSaveFlows() {
346346
planCtx.collectExecStats = true

pkg/sql/conn_executor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3339,7 +3339,7 @@ func (ex *connExecutor) execCopyIn(
33393339
defer p.curPlan.close(ctx)
33403340
_, err := ex.execWithDistSQLEngine(
33413341
ctx, p, tree.RowsAffected, res, LocalDistribution,
3342-
nil /* progressAtomic */, nil, /* distSQLProhibitedErr */
3342+
nil /* progressAtomic */, 0, /* distSQLBlockers */
33433343
)
33443344
return err
33453345
},

pkg/sql/conn_executor_exec.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2893,7 +2893,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
28932893
ex.sessionData().DistSQLMode = origDistSQLMode
28942894
}
28952895
}
2896-
distributePlan, distSQLProhibitedErr := planner.getPlanDistribution(ctx, planner.curPlan.main, notPostquery)
2896+
distributePlan, blockers := planner.getPlanDistribution(ctx, planner.curPlan.main, notPostquery)
28972897
if afterGetPlanDistribution != nil {
28982898
afterGetPlanDistribution()
28992899
}
@@ -2941,7 +2941,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
29412941
}
29422942
ex.sessionTracing.TraceExecStart(ctx, "distributed")
29432943
stats, err := ex.execWithDistSQLEngine(
2944-
ctx, planner, stmt.AST.StatementReturnType(), res, distribute, progAtomic, distSQLProhibitedErr,
2944+
ctx, planner, stmt.AST.StatementReturnType(), res, distribute, progAtomic, blockers,
29452945
)
29462946
if ppInfo := getPausablePortalInfo(planner); ppInfo != nil {
29472947
// For pausable portals, we log the stats when closing the portal, so we need
@@ -3393,7 +3393,7 @@ func (ex *connExecutor) execWithDistSQLEngine(
33933393
res RestrictedCommandResult,
33943394
distribute DistributionType,
33953395
progressAtomic *uint64,
3396-
distSQLProhibitedErr error,
3396+
distSQLBlockers distSQLBlockers,
33973397
) (topLevelQueryStats, error) {
33983398
defer planner.curPlan.savePlanInfo()
33993399
recv := MakeDistSQLReceiver(
@@ -3418,7 +3418,7 @@ func (ex *connExecutor) execWithDistSQLEngine(
34183418
evalCtx := planner.ExtendedEvalContext()
34193419
planCtx := ex.server.cfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, distribute)
34203420
planCtx.setUpForMainQuery(ctx, planner, recv)
3421-
planCtx.distSQLProhibitedErr = distSQLProhibitedErr
3421+
planCtx.distSQLBlockers = distSQLBlockers
34223422

34233423
var evalCtxFactory func(usedConcurrently bool) *extendedEvalContext
34243424
if len(planner.curPlan.subqueryPlans) != 0 ||

pkg/sql/create_stats.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,7 @@ func createStatsDefaultColumns(
661661
}
662662
var distSQLVisitor distSQLExprCheckVisitor
663663
for i, col := range desc.PublicColumns() {
664-
cannotDistribute[i] = col.IsVirtual() && checkExprForDistSQL(exprs[i], &distSQLVisitor) != nil
664+
cannotDistribute[i] = col.IsVirtual() && checkExprForDistSQL(exprs[i], &distSQLVisitor) != 0
665665
}
666666
}
667667

0 commit comments

Comments
 (0)