Skip to content

Commit b8ce362

Browse files
committed
sql: disallow pausable portal for transaction control statement
Statement that is a transaction control statement is no longer allowed with pausable portals. Specifically, the following won't be allowed: ``` CALL PAUSE JOB PAUSE SCHEDULE PAUSE JOBS PAUSE JOBS COMMIT PREPARED DEALLOCATE DISCARD EXECUTE ROLLBACK PREPARED SET TRANSACTION UNLISTEN ``` Ideally, we should only disallow the CALL statements for procedures that contains mutations. However, the `CanMutate` is not fully propagated from the function body yet. (Issue #147568) For now, we just disallow any TCL statements with pausable portal. Release note (sql change): Previously, using pausable portal with procedure call caused panic, depending on the function body. Now statements that are transaction control statement, such as procedure call(e.g. `CALL myfunc()`) are not allowed with pausable portal.
1 parent e59d88e commit b8ce362

File tree

2 files changed

+203
-1
lines changed

2 files changed

+203
-1
lines changed

pkg/sql/conn_executor_exec.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ func (ex *connExecutor) execPortal(
322322
return nil, nil, nil
323323
}
324324
ev, payload, retErr = ex.execStmt(ctx, portal.Stmt.Statement, &portal, pinfo, stmtRes, canAutoCommit)
325+
325326
// For a non-pausable portal, it is considered exhausted regardless of the
326327
// fact whether an error occurred or not - if it did, we still don't want
327328
// to re-execute the portal from scratch.
@@ -2934,12 +2935,25 @@ func (ex *connExecutor) dispatchToExecutionEngine(
29342935
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
29352936
if !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
29362937
ctx, err = ex.makeExecPlan(ctx, planner)
2937-
if flags := planner.curPlan.flags; err == nil && (flags.IsSet(planFlagContainsMutation) || flags.IsSet(planFlagIsDDL)) {
2938+
// TODO(janexing): This is a temporary solution to disallow procedure
2939+
// call statements that contain mutations for pausable portals. Since
2940+
// relational.CanMutate is not yet propagated from the function body
2941+
// via builder.BuildCall(), we must temporarily disallow all
2942+
// TCL statements, which includes the CALL statements.
2943+
// This should be removed once CanMutate is fully propagated.
2944+
// (pending https://github.com/cockroachdb/cockroach/issues/147568)
2945+
isTCL := planner.curPlan.stmt.AST.StatementType() == tree.TypeTCL
2946+
if flags := planner.curPlan.flags; err == nil && (isTCL || flags.IsSet(planFlagContainsMutation) || flags.IsSet(planFlagIsDDL)) {
29382947
telemetry.Inc(sqltelemetry.NotReadOnlyStmtsTriedWithPausablePortals)
29392948
// We don't allow mutations in a pausable portal. Set it back to
29402949
// an un-pausable (normal) portal.
29412950
planner.pausablePortal.pauseInfo = nil
29422951
err = res.RevokePortalPausability()
2952+
// If this plan is a transaction control statement, we don't
2953+
// even execute it but just early exit.
2954+
if isTCL {
2955+
err = errors.CombineErrors(err, ErrStmtNotSupportedForPausablePortal)
2956+
}
29432957
defer planner.curPlan.close(ctx)
29442958
} else {
29452959
ppInfo.dispatchToExecutionEngine.planTop = planner.curPlan

pkg/sql/pgwire/testdata/pgtest/multiple_active_portals

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1441,3 +1441,191 @@ ReadyForQuery
14411441
{"Type":"ReadyForQuery","TxStatus":"I"}
14421442

14431443
subtest end
1444+
1445+
subtest disallow_call_procedure_with_pausable_portal
1446+
1447+
send crdb_only
1448+
Query {"String": "SET multiple_active_portals_enabled = 'true';"}
1449+
----
1450+
1451+
until crdb_only
1452+
ReadyForQuery
1453+
----
1454+
{"Type":"CommandComplete","CommandTag":"SET"}
1455+
{"Type":"ReadyForQuery","TxStatus":"I"}
1456+
1457+
send
1458+
Query {"String": "CREATE TABLE IF NOT EXISTS tbl (x timestamp, y int);"}
1459+
Query {"String": "TRUNCATE tbl;"}
1460+
Query {"String": "CREATE OR REPLACE PROCEDURE insert_one() LANGUAGE PLpgSQL AS $$ BEGIN INSERT INTO tbl (x, y) VALUES (now(), 1); END; $$;"}
1461+
----
1462+
1463+
until ignore=NoticeResponse
1464+
ReadyForQuery
1465+
ReadyForQuery
1466+
ReadyForQuery
1467+
----
1468+
{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
1469+
{"Type":"ReadyForQuery","TxStatus":"I"}
1470+
{"Type":"CommandComplete","CommandTag":"TRUNCATE"}
1471+
{"Type":"ReadyForQuery","TxStatus":"I"}
1472+
{"Type":"CommandComplete","CommandTag":"CREATE PROCEDURE"}
1473+
{"Type":"ReadyForQuery","TxStatus":"I"}
1474+
1475+
send
1476+
Parse {"Name": "foo1", "Query": "CALL insert_one()"}
1477+
Bind {"DestinationPortal": "foo1", "PreparedStatement": "foo1"}
1478+
Execute {"Portal": "foo1", "MaxRows": 100}
1479+
Sync
1480+
----
1481+
1482+
# PG allows procedure with pausable portal, but CRDB does not.
1483+
until noncrdb_only
1484+
ReadyForQuery
1485+
----
1486+
{"Type":"ParseComplete"}
1487+
{"Type":"BindComplete"}
1488+
{"Type":"CommandComplete","CommandTag":"CALL"}
1489+
{"Type":"ReadyForQuery","TxStatus":"I"}
1490+
1491+
until crdb_only keepErrMessage
1492+
ErrorResponse
1493+
ReadyForQuery
1494+
----
1495+
{"Type":"ParseComplete"}
1496+
{"Type":"BindComplete"}
1497+
{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/dev"}
1498+
{"Type":"ReadyForQuery","TxStatus":"I"}
1499+
1500+
# If calling this procedure without `"MaxRows": 100` (i.e. make it a non-pausable portal),
1501+
# it will work.
1502+
send
1503+
Query {"String": "TRUNCATE tbl;"}
1504+
Bind {"DestinationPortal": "foo1", "PreparedStatement": "foo1"}
1505+
Execute {"Portal": "foo1"}
1506+
Query {"String": "SELECT y FROM tbl"}
1507+
Sync
1508+
----
1509+
1510+
until ignore=RowDescription
1511+
ReadyForQuery
1512+
ReadyForQuery
1513+
ReadyForQuery
1514+
----
1515+
{"Type":"CommandComplete","CommandTag":"TRUNCATE"}
1516+
{"Type":"ReadyForQuery","TxStatus":"I"}
1517+
{"Type":"BindComplete"}
1518+
{"Type":"CommandComplete","CommandTag":"CALL"}
1519+
{"Type":"DataRow","Values":[{"text":"1"}]}
1520+
{"Type":"CommandComplete","CommandTag":"SELECT 1"}
1521+
{"Type":"ReadyForQuery","TxStatus":"I"}
1522+
{"Type":"ReadyForQuery","TxStatus":"I"}
1523+
1524+
send crdb_only
1525+
Query {"String": "SET multiple_active_portals_enabled = 'true';"}
1526+
----
1527+
1528+
until crdb_only
1529+
ReadyForQuery
1530+
----
1531+
{"Type":"CommandComplete","CommandTag":"SET"}
1532+
{"Type":"ReadyForQuery","TxStatus":"I"}
1533+
1534+
send
1535+
Query {"String": "CREATE TABLE IF NOT EXISTS sensors (id TEXT NOT NULL, ts TIMESTAMPTZ NOT NULL, val FLOAT8 NULL, CONSTRAINT s_pk PRIMARY KEY (id, ts));"}
1536+
Query {"String": "TRUNCATE sensors;"}
1537+
Query {"String": "INSERT INTO sensors values ('a', now(), 0.1);"}
1538+
Query {"String": "CREATE TABLE IF NOT EXISTS sensors_summary (id TEXT NOT NULL, val FLOAT8 NULL, CONSTRAINT ss_pk PRIMARY KEY (id));"}
1539+
Query {"String": "TRUNCATE sensors_summary;"}
1540+
Query {"String": "CREATE TYPE IF NOT EXISTS k AS (x TEXT, y FLOAT8);"}
1541+
----
1542+
1543+
until ignore=NoticeResponse
1544+
ReadyForQuery
1545+
ReadyForQuery
1546+
ReadyForQuery
1547+
ReadyForQuery
1548+
ReadyForQuery
1549+
ReadyForQuery
1550+
----
1551+
{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
1552+
{"Type":"ReadyForQuery","TxStatus":"I"}
1553+
{"Type":"CommandComplete","CommandTag":"TRUNCATE"}
1554+
{"Type":"ReadyForQuery","TxStatus":"I"}
1555+
{"Type":"CommandComplete","CommandTag":"INSERT 0 1"}
1556+
{"Type":"ReadyForQuery","TxStatus":"I"}
1557+
{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
1558+
{"Type":"ReadyForQuery","TxStatus":"I"}
1559+
{"Type":"CommandComplete","CommandTag":"TRUNCATE"}
1560+
{"Type":"ReadyForQuery","TxStatus":"I"}
1561+
{"Type":"CommandComplete","CommandTag":"CREATE TYPE"}
1562+
{"Type":"ReadyForQuery","TxStatus":"I"}
1563+
1564+
send noncrdb_only
1565+
Query {"String": "CREATE OR REPLACE PROCEDURE fab5() LANGUAGE plpgsql AS $$ DECLARE id_array k[]; BEGIN SELECT array_agg(row(id, avg_val)::k) INTO id_array FROM (SELECT id, avg(val) AS avg_val FROM sensors WHERE now() - INTERVAL '3 days' < ts GROUP BY id) AS sub; INSERT INTO sensors_summary (id, val) SELECT (arr).x, (arr).y FROM unnest(id_array) AS arr; RAISE NOTICE 'Inserted % rows', array_length(id_array, 1); END; $$;"}
1566+
----
1567+
1568+
send crdb_only
1569+
Query {"String": "CREATE OR REPLACE PROCEDURE fab5() LANGUAGE PLpgSQL AS $$ DECLARE i k := ('', 0.0 ); id_array k[]; BEGIN begin commit; SELECT array_agg((id, avg_val)) FROM (SELECT id, avg(val) as avg_val FROM sensors where now() - INTERVAL '3 days' < ts group by id) INTO id_array; end; commit; INSERT INTO sensors_summary (id, val) select (arr).x, (arr).y from (select unnest(id_array) as arr); RAISE NOTICE 'Inserted % rows', array_length(id_array, 1); END $$;"}
1570+
----
1571+
1572+
until
1573+
ReadyForQuery
1574+
----
1575+
{"Type":"CommandComplete","CommandTag":"CREATE PROCEDURE"}
1576+
{"Type":"ReadyForQuery","TxStatus":"I"}
1577+
1578+
send
1579+
Parse {"Name": "foo", "Query": "CALL fab5()"}
1580+
Bind {"DestinationPortal": "foo", "PreparedStatement": "foo"}
1581+
Bind {"DestinationPortal": "foo2", "PreparedStatement": "foo"}
1582+
Execute {"Portal": "foo", "MaxRows": 100}
1583+
Sync
1584+
----
1585+
1586+
# PG allows procedure with pausable portal, but CRDB does not.
1587+
until noncrdb_only ignore=RowDescription ignore=NoticeResponse
1588+
ReadyForQuery
1589+
----
1590+
{"Type":"ParseComplete"}
1591+
{"Type":"BindComplete"}
1592+
{"Type":"BindComplete"}
1593+
{"Type":"CommandComplete","CommandTag":"CALL"}
1594+
{"Type":"ReadyForQuery","TxStatus":"I"}
1595+
1596+
until crdb_only keepErrMessage
1597+
ErrorResponse
1598+
ReadyForQuery
1599+
----
1600+
{"Type":"ParseComplete"}
1601+
{"Type":"BindComplete"}
1602+
{"Type":"BindComplete"}
1603+
{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/dev"}
1604+
{"Type":"ReadyForQuery","TxStatus":"I"}
1605+
1606+
# Both PG and CRDB allows procedure with non-pausable portal.
1607+
send
1608+
Query {"String": "TRUNCATE sensors_summary;"}
1609+
Bind {"DestinationPortal": "foo", "PreparedStatement": "foo"}
1610+
Bind {"DestinationPortal": "foo2", "PreparedStatement": "foo"}
1611+
Execute {"Portal": "foo"}
1612+
Query {"String": "SELECT id FROM sensors_summary ORDER BY id"}
1613+
Sync
1614+
----
1615+
1616+
until ignore=RowDescription ignore=NoticeResponse
1617+
ReadyForQuery
1618+
ReadyForQuery
1619+
ReadyForQuery
1620+
----
1621+
{"Type":"CommandComplete","CommandTag":"TRUNCATE"}
1622+
{"Type":"ReadyForQuery","TxStatus":"I"}
1623+
{"Type":"BindComplete"}
1624+
{"Type":"BindComplete"}
1625+
{"Type":"CommandComplete","CommandTag":"CALL"}
1626+
{"Type":"DataRow","Values":[{"text":"a"}]}
1627+
{"Type":"CommandComplete","CommandTag":"SELECT 1"}
1628+
{"Type":"ReadyForQuery","TxStatus":"I"}
1629+
{"Type":"ReadyForQuery","TxStatus":"I"}
1630+
1631+
subtest end

0 commit comments

Comments
 (0)