Skip to content

Commit caa221d

Browse files
craig[bot]ZhouXing19mgartneryuzefovich
committed
151153: sql: disallow pausable portal for procedure call statement r=ZhouXing19 a=ZhouXing19 Fixes #147568 Statement that is a transaction control statement is no longer allowed with pausable portals. Specifically, the following won't be allowed: ``` CALL PAUSE JOB PAUSE SCHEDULE PAUSE JOBS PAUSE JOBS COMMIT PREPARED DEALLOCATE DISCARD EXECUTE ROLLBACK PREPARED SET TRANSACTION UNLISTEN ``` Ideally, we should only disallow the CALL statements for procedures that contains mutations. However, the `CanMutate` is not fully propagated from the function body yet. (Issue #147568) For now, we just disallow any TCL statements with pausable portal. Release note (sql change): Previously, using pausable portal with procedure call caused panic, depending on the function body. Now statements that are transaction control statement, such as procedure call(e.g. `CALL myfunc()`) are not allowed with pausable portal. 151338: opt: return RelExpr from (*memo).RootExpr r=mgartner a=mgartner `(*memo).RootExpr` now returns a `RelExpr` instead of an `Expr`, helping eliminate some type assertions. Release note: None 151491: colexecerror: further optimize BenchmarkSQLCatchVectorizedRuntimeError r=yuzefovich a=yuzefovich Rather than creating and closing connections for each test case, we can just reuse the same connections in the benchmark, which speeds it up significantly (from about 4 min to 1 min). Epic: None Release note: None Co-authored-by: ZhouXing19 <[email protected]> Co-authored-by: Marcus Gartner <[email protected]> Co-authored-by: Yahor Yuzefovich <[email protected]>
4 parents 85bc357 + b8ce362 + a211fba + b42a0ce commit caa221d

File tree

13 files changed

+251
-49
lines changed

13 files changed

+251
-49
lines changed

pkg/sql/colexecerror/error_test.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -184,22 +184,32 @@ func BenchmarkSQLCatchVectorizedRuntimeError(b *testing.B) {
184184
// crdb_test-build behavior.
185185
defer colexecerror.ProductionBehaviorForTests()()
186186

187-
for _, parallelism := range []int{1, 8, 32} {
187+
const maxParallelism = 32
188+
maxNumConns := runtime.GOMAXPROCS(0) * maxParallelism
189+
// Create as many warm connections as we will need for the benchmark. These
190+
// will be reused across all test cases to avoid the connection churn.
191+
connsPool := make([]*gosql.DB, maxNumConns)
192+
for i := range connsPool {
193+
conn := s.ApplicationLayer().SQLConn(b, serverutils.DBName(""))
194+
// Make sure we're using local, vectorized execution.
195+
sqlDB := sqlutils.MakeSQLRunner(conn)
196+
sqlDB.Exec(b, "SET distsql = off")
197+
sqlDB.Exec(b, "SET vectorize = on")
198+
connsPool[i] = conn
199+
}
200+
201+
for _, parallelism := range []int{1, 8, maxParallelism} {
188202
numConns := runtime.GOMAXPROCS(0) * parallelism
189203
b.Run(fmt.Sprintf("conns=%d", numConns), func(b *testing.B) {
190204
for _, tc := range cases {
191205
stmt := fmt.Sprintf(sqlFmt, tc.builtin)
192206
b.Run(tc.name, func(b *testing.B) {
193-
// Create as many warm connections as we will need for the benchmark.
194207
conns := make(chan *gosql.DB, numConns)
195208
for i := 0; i < numConns; i++ {
196-
conn := s.ApplicationLayer().SQLConn(b, serverutils.DBName(""))
197-
// Make sure we're using local, vectorized execution.
198-
sqlDB := sqlutils.MakeSQLRunner(conn)
199-
sqlDB.Exec(b, "SET distsql = off")
200-
sqlDB.Exec(b, "SET vectorize = on")
201-
// Warm up the connection by executing the statement once. We should
202-
// always go through the query plan cache after this.
209+
conn := connsPool[i]
210+
// Warm up the connection by executing the statement
211+
// once. We should always go through the query plan
212+
// cache after this.
203213
_, _ = conn.Exec(stmt)
204214
conns <- conn
205215
}
@@ -209,7 +219,6 @@ func BenchmarkSQLCatchVectorizedRuntimeError(b *testing.B) {
209219
var conn *gosql.DB
210220
select {
211221
case conn = <-conns:
212-
defer conn.Close()
213222
default:
214223
b.Fatal("not enough warm connections")
215224
}

pkg/sql/conn_executor_exec.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ func (ex *connExecutor) execPortal(
322322
return nil, nil, nil
323323
}
324324
ev, payload, retErr = ex.execStmt(ctx, portal.Stmt.Statement, &portal, pinfo, stmtRes, canAutoCommit)
325+
325326
// For a non-pausable portal, it is considered exhausted regardless of the
326327
// fact whether an error occurred or not - if it did, we still don't want
327328
// to re-execute the portal from scratch.
@@ -2936,12 +2937,25 @@ func (ex *connExecutor) dispatchToExecutionEngine(
29362937
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
29372938
if !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
29382939
ctx, err = ex.makeExecPlan(ctx, planner)
2939-
if flags := planner.curPlan.flags; err == nil && (flags.IsSet(planFlagContainsMutation) || flags.IsSet(planFlagIsDDL)) {
2940+
// TODO(janexing): This is a temporary solution to disallow procedure
2941+
// call statements that contain mutations for pausable portals. Since
2942+
// relational.CanMutate is not yet propagated from the function body
2943+
// via builder.BuildCall(), we must temporarily disallow all
2944+
// TCL statements, which includes the CALL statements.
2945+
// This should be removed once CanMutate is fully propagated.
2946+
// (pending https://github.com/cockroachdb/cockroach/issues/147568)
2947+
isTCL := planner.curPlan.stmt.AST.StatementType() == tree.TypeTCL
2948+
if flags := planner.curPlan.flags; err == nil && (isTCL || flags.IsSet(planFlagContainsMutation) || flags.IsSet(planFlagIsDDL)) {
29402949
telemetry.Inc(sqltelemetry.NotReadOnlyStmtsTriedWithPausablePortals)
29412950
// We don't allow mutations in a pausable portal. Set it back to
29422951
// an un-pausable (normal) portal.
29432952
planner.pausablePortal.pauseInfo = nil
29442953
err = res.RevokePortalPausability()
2954+
// If this plan is a transaction control statement, we don't
2955+
// even execute it but just early exit.
2956+
if isTCL {
2957+
err = errors.CombineErrors(err, ErrStmtNotSupportedForPausablePortal)
2958+
}
29452959
defer planner.curPlan.close(ctx)
29462960
} else {
29472961
ppInfo.dispatchToExecutionEngine.planTop = planner.curPlan

pkg/sql/opt/exec/execbuilder/scalar.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -675,7 +675,7 @@ func (b *Builder) buildExistsSubquery(
675675
stmtProps := []*physical.Required{{Presentation: physical.Presentation{aliasedCol}}}
676676

677677
// Create an wrapRootExprFn that wraps input in a Limit and a Project.
678-
wrapRootExpr := func(f *norm.Factory, e memo.RelExpr) opt.Expr {
678+
wrapRootExpr := func(f *norm.Factory, e memo.RelExpr) memo.RelExpr {
679679
return f.ConstructProject(
680680
f.ConstructLimit(
681681
e,
@@ -1115,7 +1115,7 @@ func (b *Builder) initRoutineExceptionHandler(
11151115
blockState.ExceptionHandler = exceptionHandler
11161116
}
11171117

1118-
type wrapRootExprFn func(f *norm.Factory, e memo.RelExpr) opt.Expr
1118+
type wrapRootExprFn func(f *norm.Factory, e memo.RelExpr) memo.RelExpr
11191119

11201120
// buildRoutinePlanGenerator returns a tree.RoutinePlanFn that can plan the
11211121
// statements in a routine that has one or more arguments.
@@ -1249,7 +1249,7 @@ func (b *Builder) buildRoutinePlanGenerator(
12491249
f.CopyAndReplace(originalMemo, stmt, props, replaceFn)
12501250

12511251
if wrapRootExpr != nil {
1252-
wrapped := wrapRootExpr(f, f.Memo().RootExpr().(memo.RelExpr)).(memo.RelExpr)
1252+
wrapped := wrapRootExpr(f, f.Memo().RootExpr())
12531253
f.Memo().SetRoot(wrapped, props)
12541254
}
12551255

pkg/sql/opt/memo/memo.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ type Memo struct {
132132
// rootExpr is the root expression of the memo expression forest. It is set
133133
// via a call to SetRoot. After optimization, it is set to be the root of the
134134
// lowest cost tree in the forest.
135-
rootExpr opt.Expr
135+
rootExpr RelExpr
136136

137137
// rootProps are the physical properties required of the root memo expression.
138138
// It is set via a call to SetRoot.
@@ -366,7 +366,7 @@ func (m *Memo) Metadata() *opt.Metadata {
366366

367367
// RootExpr returns the root memo expression previously set via a call to
368368
// SetRoot.
369-
func (m *Memo) RootExpr() opt.Expr {
369+
func (m *Memo) RootExpr() RelExpr {
370370
return m.rootExpr
371371
}
372372

@@ -395,12 +395,7 @@ func (m *Memo) SetRoot(e RelExpr, phys *physical.Required) {
395395
// HasPlaceholders returns true if the memo contains at least one placeholder
396396
// operator.
397397
func (m *Memo) HasPlaceholders() bool {
398-
rel, ok := m.rootExpr.(RelExpr)
399-
if !ok {
400-
panic(errors.AssertionFailedf("placeholders only supported when memo root is relational"))
401-
}
402-
403-
return rel.Relational().HasPlaceholder
398+
return m.rootExpr.Relational().HasPlaceholder
404399
}
405400

406401
// IsStale returns true if the memo has been invalidated by changes to any of
@@ -555,8 +550,7 @@ func (m *Memo) ResetCost(e RelExpr, cost Cost) {
555550
func (m *Memo) IsOptimized() bool {
556551
// The memo is optimized once the root expression has its physical properties
557552
// assigned.
558-
rel, ok := m.rootExpr.(RelExpr)
559-
return ok && rel.RequiredPhysical() != nil
553+
return m.rootExpr != nil && m.rootExpr.RequiredPhysical() != nil
560554
}
561555

562556
// OptimizationCost returns a rough estimate of the cost of optimization of the

pkg/sql/opt/memo/memo_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -691,15 +691,15 @@ func TestStatsAvailable(t *testing.T) {
691691

692692
// Stats should not be available for any expression.
693693
opttestutils.BuildQuery(t, &o, catalog, &evalCtx, "SELECT * FROM t WHERE a=1")
694-
testNotAvailable(o.Memo().RootExpr().(memo.RelExpr))
694+
testNotAvailable(o.Memo().RootExpr())
695695

696696
opttestutils.BuildQuery(t, &o, catalog, &evalCtx, "SELECT sum(a), b FROM t GROUP BY b")
697-
testNotAvailable(o.Memo().RootExpr().(memo.RelExpr))
697+
testNotAvailable(o.Memo().RootExpr())
698698

699699
opttestutils.BuildQuery(t, &o, catalog, &evalCtx,
700700
"SELECT * FROM t AS t1, t AS t2 WHERE t1.a = t2.a AND t1.b = 5",
701701
)
702-
testNotAvailable(o.Memo().RootExpr().(memo.RelExpr))
702+
testNotAvailable(o.Memo().RootExpr())
703703

704704
if _, err := catalog.ExecuteDDL(
705705
`ALTER TABLE t INJECT STATISTICS '[
@@ -729,15 +729,15 @@ func TestStatsAvailable(t *testing.T) {
729729

730730
// Stats should be available for all expressions.
731731
opttestutils.BuildQuery(t, &o, catalog, &evalCtx, "SELECT * FROM t WHERE a=1")
732-
testAvailable(o.Memo().RootExpr().(memo.RelExpr))
732+
testAvailable(o.Memo().RootExpr())
733733

734734
opttestutils.BuildQuery(t, &o, catalog, &evalCtx, "SELECT sum(a), b FROM t GROUP BY b")
735-
testAvailable(o.Memo().RootExpr().(memo.RelExpr))
735+
testAvailable(o.Memo().RootExpr())
736736

737737
opttestutils.BuildQuery(t, &o, catalog, &evalCtx,
738738
"SELECT * FROM t AS t1, t AS t2 WHERE t1.a = t2.a AND t1.b = 5",
739739
)
740-
testAvailable(o.Memo().RootExpr().(memo.RelExpr))
740+
testAvailable(o.Memo().RootExpr())
741741
}
742742

743743
// traverseExpr is a helper function to recursively traverse a relational

pkg/sql/opt/norm/factory.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ func (f *Factory) AssignPlaceholders(from *memo.Memo) (err error) {
420420
}
421421
return f.CopyAndReplaceDefault(e, replaceFn)
422422
}
423-
f.CopyAndReplace(from, from.RootExpr().(memo.RelExpr), from.RootProps(), replaceFn)
423+
f.CopyAndReplace(from, from.RootExpr(), from.RootProps(), replaceFn)
424424

425425
return nil
426426
}

pkg/sql/opt/norm/factory_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func TestCopyAndReplace(t *testing.T) {
101101
}
102102
return o.Factory().CopyAndReplaceDefault(e, replaceFn)
103103
}
104-
o.Factory().CopyAndReplace(m, m.RootExpr().(memo.RelExpr), m.RootProps(), replaceFn)
104+
o.Factory().CopyAndReplace(m, m.RootExpr(), m.RootProps(), replaceFn)
105105

106106
if e, err := o.Optimize(); err != nil {
107107
t.Fatal(err)
@@ -146,7 +146,7 @@ func TestCopyAndReplaceWithScan(t *testing.T) {
146146
replaceFn = func(e opt.Expr) opt.Expr {
147147
return o.Factory().CopyAndReplaceDefault(e, replaceFn)
148148
}
149-
o.Factory().CopyAndReplace(m, m.RootExpr().(memo.RelExpr), m.RootProps(), replaceFn)
149+
o.Factory().CopyAndReplace(m, m.RootExpr(), m.RootProps(), replaceFn)
150150

151151
if _, err := o.Optimize(); err != nil {
152152
t.Fatal(err)

pkg/sql/opt/xform/optimizer.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ func (o *Optimizer) Optimize() (_ opt.Expr, err error) {
261261
o.optimizeRootWithProps()
262262

263263
// Now optimize the entire expression tree.
264-
root := o.mem.RootExpr().(memo.RelExpr)
264+
root := o.mem.RootExpr()
265265
rootProps := o.mem.RootProps()
266266
o.optimizeGroup(root, rootProps)
267267

@@ -922,10 +922,7 @@ func (o *Optimizer) ensureOptState(grp memo.RelExpr, required *physical.Required
922922
// properties required of it. This may trigger the creation of a new root and
923923
// new properties.
924924
func (o *Optimizer) optimizeRootWithProps() {
925-
root, ok := o.mem.RootExpr().(memo.RelExpr)
926-
if !ok {
927-
panic(errors.AssertionFailedf("Optimize can only be called on relational root expressions"))
928-
}
925+
root := o.mem.RootExpr()
929926
rootProps := o.mem.RootProps()
930927

931928
// [SimplifyRootOrdering]

pkg/sql/opt/xform/optimizer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func TestDetachMemoRace(t *testing.T) {
113113
// Rewrite the filter to use a different column, which will trigger creation
114114
// of new table statistics. If the statistics object is aliased, this will
115115
// be racy.
116-
f.CopyAndReplace(mem, mem.RootExpr().(memo.RelExpr), mem.RootProps(), replaceFn)
116+
f.CopyAndReplace(mem, mem.RootExpr(), mem.RootProps(), replaceFn)
117117
wg.Done()
118118
}()
119119
}

pkg/sql/opt/xform/placeholder_fast_path.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func (o *Optimizer) TryPlaceholderFastPath() (ok bool, err error) {
4545
}
4646
}()
4747

48-
root := o.mem.RootExpr().(memo.RelExpr)
48+
root := o.mem.RootExpr()
4949

5050
rootRelProps := root.Relational()
5151
// We are dealing with a memo that still contains placeholders. The statistics

0 commit comments

Comments
 (0)