Skip to content

Commit 971c63b

Browse files
committed
sql: audit DistSQL supportability for streamer
In c17591e we made so that if we have a plan that cannot be distributed for any reason, we would disable the streamer out of caution. This was known to be a super-set of problematic scenarios, and this commit addresses that oversight. In particular, it revamps `checkSupportForPlanNode` function so that we accumulate all possible causes for DistSQL being prohibited, and then we examine each one to decide whether it's safe to use the streamer given the causes found. The causes are accumulated into a bit-map, and the methods have been adjusted to do a full walk over the planNode tree (as well as over all expressions) to capture all causes - this should have negligible overhead. An additional improvement is that we now replace errors with different bits and we can log all causes for DistSQL not being supported. What prompted this work was a customer query where we didn't use the streamer because of "unsupported planNode" distsql error. This was attempted to be fixed in f6038a1, but the fix was incomplete, and now it is. Release note: None
1 parent 474dc62 commit 971c63b

17 files changed

+648
-258
lines changed

pkg/gen/stringer.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ STRINGER_SRCS = [
4646
"//pkg/sql/sem/tree:statementreturntype_string.go",
4747
"//pkg/sql/sem/tree:statementtype_string.go",
4848
"//pkg/sql:advancecode_string.go",
49+
"//pkg/sql:distsqlprohibitedcause_string.go",
4950
"//pkg/sql:nodestatus_string.go",
5051
"//pkg/sql:txneventtype_string.go",
5152
"//pkg/sql:txntype_string.go",

pkg/sql/BUILD.bazel

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ go_library(
298298
"zone_config.go",
299299
"zone_config_helper.go",
300300
":gen-advancecode-stringer", # keep
301+
":gen-distsqlprohibitedcause-stringer", # keep
301302
":gen-nodestatus-stringer", # keep
302303
":gen-txneventtype-stringer", # keep
303304
":gen-txntype-stringer", # keep
@@ -680,6 +681,7 @@ go_test(
680681
"delete_preserving_index_test.go",
681682
"descriptor_mutation_test.go",
682683
"descriptor_test.go",
684+
"distsql_check_test.go",
683685
"distsql_leaf_txn_test.go",
684686
"distsql_physical_planner_test.go",
685687
"distsql_plan_backfill_test.go",
@@ -871,6 +873,7 @@ go_test(
871873
"//pkg/sql/execstats",
872874
"//pkg/sql/flowinfra",
873875
"//pkg/sql/gcjob",
876+
"//pkg/sql/inverted",
874877
"//pkg/sql/isql",
875878
"//pkg/sql/lexbase",
876879
"//pkg/sql/mutations",
@@ -1025,6 +1028,12 @@ stringer(
10251028
typ = "advanceCode",
10261029
)
10271030

1031+
stringer(
1032+
name = "gen-distsqlprohibitedcause-stringer",
1033+
src = "distsql_check.go",
1034+
typ = "distSQLProhibitedCause",
1035+
)
1036+
10281037
stringer(
10291038
name = "gen-nodestatus-stringer",
10301039
src = "distsql_physical_planner.go",

pkg/sql/apply_join.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -333,14 +333,14 @@ func runPlanInsidePlan(
333333
}
334334
}
335335

336-
distributePlan, distSQLProhibitedErr := plannerCopy.getPlanDistribution(ctx, plan.main, notPostquery)
336+
distributePlan, noDistSQLCauses := plannerCopy.getPlanDistribution(ctx, plan.main, notPostquery)
337337
distributeType := DistributionType(LocalDistribution)
338338
if distributePlan.WillDistribute() {
339339
distributeType = FullDistribution
340340
}
341341
evalCtx := evalCtxFactory()
342342
planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, &plannerCopy, plannerCopy.txn, distributeType)
343-
planCtx.distSQLProhibitedErr = distSQLProhibitedErr
343+
planCtx.distSQLProhibitedCauses = noDistSQLCauses
344344
planCtx.stmtType = recv.stmtType
345345
if sqlStatsBuilder != nil && plannerCopy.instrumentation.ShouldSaveFlows() {
346346
planCtx.collectExecStats = true

pkg/sql/conn_executor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3339,7 +3339,7 @@ func (ex *connExecutor) execCopyIn(
33393339
defer p.curPlan.close(ctx)
33403340
_, err := ex.execWithDistSQLEngine(
33413341
ctx, p, tree.RowsAffected, res, LocalDistribution,
3342-
nil /* progressAtomic */, nil, /* distSQLProhibitedErr */
3342+
nil /* progressAtomic */, 0, /* distSQLProhibitedCauses */
33433343
)
33443344
return err
33453345
},

pkg/sql/conn_executor_exec.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2879,7 +2879,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
28792879
ex.sessionData().DistSQLMode = origDistSQLMode
28802880
}
28812881
}
2882-
distributePlan, distSQLProhibitedErr := planner.getPlanDistribution(ctx, planner.curPlan.main, notPostquery)
2882+
distributePlan, noDistSQLCauses := planner.getPlanDistribution(ctx, planner.curPlan.main, notPostquery)
28832883
if afterGetPlanDistribution != nil {
28842884
afterGetPlanDistribution()
28852885
}
@@ -2927,7 +2927,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
29272927
}
29282928
ex.sessionTracing.TraceExecStart(ctx, "distributed")
29292929
stats, err := ex.execWithDistSQLEngine(
2930-
ctx, planner, stmt.AST.StatementReturnType(), res, distribute, progAtomic, distSQLProhibitedErr,
2930+
ctx, planner, stmt.AST.StatementReturnType(), res, distribute, progAtomic, noDistSQLCauses,
29312931
)
29322932
if ppInfo := getPausablePortalInfo(planner); ppInfo != nil {
29332933
// For pausable portals, we log the stats when closing the portal, so we need
@@ -3379,7 +3379,7 @@ func (ex *connExecutor) execWithDistSQLEngine(
33793379
res RestrictedCommandResult,
33803380
distribute DistributionType,
33813381
progressAtomic *uint64,
3382-
distSQLProhibitedErr error,
3382+
distSQLProhibitedCauses distSQLProhibitedCauses,
33833383
) (topLevelQueryStats, error) {
33843384
defer planner.curPlan.savePlanInfo()
33853385
recv := MakeDistSQLReceiver(
@@ -3404,7 +3404,7 @@ func (ex *connExecutor) execWithDistSQLEngine(
34043404
evalCtx := planner.ExtendedEvalContext()
34053405
planCtx := ex.server.cfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, distribute)
34063406
planCtx.setUpForMainQuery(ctx, planner, recv)
3407-
planCtx.distSQLProhibitedErr = distSQLProhibitedErr
3407+
planCtx.distSQLProhibitedCauses = distSQLProhibitedCauses
34083408

34093409
var evalCtxFactory func(usedConcurrently bool) *extendedEvalContext
34103410
if len(planner.curPlan.subqueryPlans) != 0 ||

pkg/sql/create_stats.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,7 @@ func createStatsDefaultColumns(
661661
}
662662
var distSQLVisitor distSQLExprCheckVisitor
663663
for i, col := range desc.PublicColumns() {
664-
cannotDistribute[i] = col.IsVirtual() && checkExprForDistSQL(exprs[i], &distSQLVisitor) != nil
664+
cannotDistribute[i] = col.IsVirtual() && checkExprForDistSQL(exprs[i], &distSQLVisitor) != 0
665665
}
666666
}
667667

0 commit comments

Comments
 (0)