Skip to content

Commit efcbc07

Browse files
craig[bot]DrewKimball
andcommitted
Merge #147923
147923: sql: do not attempt to re-execute portal after PL/pgSQL txn control r=yuzefovich,michae2 a=DrewKimball Previously, the PL/pgSQL transaction control statements (COMMIT/ROLLBACK) did not work with the extended wire protocol. If the `CALL` statement was executed via portal, we would attempt to re-execute the portal after the original transaction ended. This would result in an error like `unknown portal ""`. This commit fixes the bug by replacing the original command with a dummy `ExecStmt` command when resuming stored proc execution in a new transaction. This allows the portal to be cleaned up with the first transaction without attempting to resolve it after the fact. The fix is controled by the session var `use_proc_txn_control_extended_protocol_fix`, which is on by default. Informs #147701 Release note (bug fix): Fixed a bug that would cause a CALL statement executed via a portal in the extended wire protocol to result in an error like `unknown portal ""` if the stored procedure contained `COMMIT` or `ROLLBACK` statements. The bug has existed since PL/pgSQL transaction control statements were introduced in v24.1. Co-authored-by: Drew Kimball <[email protected]>
2 parents 77e13d8 + a94de9c commit efcbc07

File tree

10 files changed

+137
-3
lines changed

10 files changed

+137
-3
lines changed

pkg/sql/conn_executor.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1651,6 +1651,12 @@ type connExecutor struct {
16511651
// checks for resumeProc in buildExecMemo, and executes it as the next
16521652
// statement if it is non-nil.
16531653
resumeProc *memo.Memo
1654+
1655+
// resumeStmt is set if resumeProc is set. If the stored procedure was
1656+
// executed via a portal in the extended wire protocol, this statement
1657+
// will be used to synthesize an ExecStmt command to avoid attempting to
1658+
// execute the portal twice.
1659+
resumeStmt statements.Statement[tree.Statement]
16541660
}
16551661

16561662
// shouldExecuteOnTxnRestart indicates that ex.onTxnRestart will be
@@ -2300,6 +2306,15 @@ func (ex *connExecutor) execCmd() (retErr error) {
23002306
return err // err could be io.EOF
23012307
}
23022308

2309+
// Special handling for COMMIT/ROLLBACK in PL/pgSQL stored procedures. See the
2310+
// makeCmdForStoredProcResume comment for details.
2311+
if ex.extraTxnState.storedProcTxnState.resumeProc != nil {
2312+
cmd, err = ex.makeCmdForStoredProcResume(cmd)
2313+
if err != nil {
2314+
return err
2315+
}
2316+
}
2317+
23032318
if log.ExpensiveLogEnabled(ctx, 2) {
23042319
ex.sessionEventf(ctx, "[%s pos:%d] executing %s",
23052320
ex.machine.CurState(), pos, cmd)
@@ -2716,6 +2731,7 @@ func (ex *connExecutor) execCmd() (retErr error) {
27162731
// and transaction modes. The stored procedure has finished execution,
27172732
// either successfully or with an error.
27182733
ex.extraTxnState.storedProcTxnState.resumeProc = nil
2734+
ex.extraTxnState.storedProcTxnState.resumeStmt = statements.Statement[tree.Statement]{}
27192735
ex.extraTxnState.storedProcTxnState.txnModes = nil
27202736
}
27212737

@@ -2892,6 +2908,35 @@ func stateToTxnStatusIndicator(s fsm.State) TransactionStatusIndicator {
28922908
}
28932909
}
28942910

2911+
// makeCmdForStoredProcResume creates a Command that can be used to resume
2912+
// execution of a stored procedure that has ended the previous transaction via
2913+
// COMMIT or ROLLBACK. It is not enough to just process the original command
2914+
// again, because it may have been an ExecPortal statement, and the portal will
2915+
// already have been closed after the first phase of execution.
2916+
func (ex *connExecutor) makeCmdForStoredProcResume(curCmd Command) (Command, error) {
2917+
if !ex.sessionData().UseProcTxnControlExtendedProtocolFix {
2918+
// The fix is not enabled, so return the original command.
2919+
return curCmd, nil
2920+
}
2921+
var timeReceived crtime.Mono
2922+
switch t := curCmd.(type) {
2923+
case ExecStmt:
2924+
// NOTE: it is not strictly necessary to replace ExecStmt. However, it seems
2925+
// best to handle the commands in a consistent way.
2926+
timeReceived = t.TimeReceived
2927+
case ExecPortal:
2928+
timeReceived = t.TimeReceived
2929+
default:
2930+
return nil, errors.AssertionFailedf(
2931+
"unexpected command type %T for stored procedure resume", t,
2932+
)
2933+
}
2934+
return ExecStmt{
2935+
Statement: ex.extraTxnState.storedProcTxnState.resumeStmt,
2936+
TimeReceived: timeReceived,
2937+
}, nil
2938+
}
2939+
28952940
// isCopyToExternalStorage returns true if the CopyIn command is writing to an
28962941
// ExternalStorage such as nodelocal or userfile. It does so by checking the
28972942
// target table/schema names against the sentinel, internal table/schema names

pkg/sql/conn_io.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,8 @@ type ExecStmt struct {
143143

144144
// LastInBatch indicates if this command contains the last query in a
145145
// simple protocol Query message that contains a batch of 1 or more queries.
146+
// This is used to determine whether autocommit can be applied to the
147+
// transaction, and need not be set for correctness.
146148
LastInBatch bool
147149
// LastInBatchBeforeShowCommitTimestamp indicates that this command contains
148150
// the second-to-last query in a simple protocol Query message that contains
@@ -151,7 +153,9 @@ type ExecStmt struct {
151153
// such that the SHOW COMMIT TIMESTAMP statement can return the timestamp of
152154
// the transaction which applied to all the other statements in the batch.
153155
// Note that SHOW COMMIT TIMESTAMP is not permitted in any other position in
154-
// such a multi-statement implicit transaction.
156+
// such a multi-statement implicit transaction. This is used to determine
157+
// whether autocommit can be applied to the transaction, and need not be set
158+
// for correctness.
155159
LastInBatchBeforeShowCommitTimestamp bool
156160
}
157161

pkg/sql/exec_util.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4221,6 +4221,10 @@ func (m *sessionDataMutator) SetDistSQLUseReducedLeafWriteSets(val bool) {
42214221
m.data.DistSQLUseReducedLeafWriteSets = val
42224222
}
42234223

4224+
func (m *sessionDataMutator) SetUseProcTxnControlExtendedProtocolFix(val bool) {
4225+
m.data.UseProcTxnControlExtendedProtocolFix = val
4226+
}
4227+
42244228
// Utility functions related to scrubbing sensitive information on SQL Stats.
42254229

42264230
// quantizeCounts ensures that the Count field in the

pkg/sql/logictest/testdata/logic_test/information_schema

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4137,6 +4137,7 @@ unsafe_allow_triggers_modifying_cascades off
41374137
use_cputs_on_non_unique_indexes off
41384138
use_improved_routine_dependency_tracking on
41394139
use_pre_25_2_variadic_builtins off
4140+
use_proc_txn_control_extended_protocol_fix on
41404141
variable_inequality_lookup_join_enabled on
41414142
vector_search_beam_size 32
41424143
xmloption content

pkg/sql/logictest/testdata/logic_test/pg_catalog

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3132,6 +3132,7 @@ use_cputs_on_non_unique_indexes off
31323132
use_declarative_schema_changer on NULL NULL NULL string
31333133
use_improved_routine_dependency_tracking on NULL NULL NULL string
31343134
use_pre_25_2_variadic_builtins off NULL NULL NULL string
3135+
use_proc_txn_control_extended_protocol_fix on NULL NULL NULL string
31353136
variable_inequality_lookup_join_enabled on NULL NULL NULL string
31363137
vector_search_beam_size 32 NULL NULL NULL string
31373138
vectorize on NULL NULL NULL string
@@ -3366,6 +3367,7 @@ use_cputs_on_non_unique_indexes off
33663367
use_declarative_schema_changer on NULL user NULL on on
33673368
use_improved_routine_dependency_tracking on NULL user NULL on on
33683369
use_pre_25_2_variadic_builtins off NULL user NULL off off
3370+
use_proc_txn_control_extended_protocol_fix on NULL user NULL on on
33693371
variable_inequality_lookup_join_enabled on NULL user NULL on on
33703372
vector_search_beam_size 32 NULL user NULL 32 32
33713373
vectorize on NULL user NULL on on
@@ -3592,6 +3594,7 @@ use_cputs_on_non_unique_indexes NULL NULL
35923594
use_declarative_schema_changer NULL NULL NULL NULL NULL
35933595
use_improved_routine_dependency_tracking NULL NULL NULL NULL NULL
35943596
use_pre_25_2_variadic_builtins NULL NULL NULL NULL NULL
3597+
use_proc_txn_control_extended_protocol_fix NULL NULL NULL NULL NULL
35953598
variable_inequality_lookup_join_enabled NULL NULL NULL NULL NULL
35963599
vector_search_beam_size NULL NULL NULL NULL NULL
35973600
vectorize NULL NULL NULL NULL NULL

pkg/sql/logictest/testdata/logic_test/show_source

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ use_cputs_on_non_unique_indexes off
246246
use_declarative_schema_changer on
247247
use_improved_routine_dependency_tracking on
248248
use_pre_25_2_variadic_builtins off
249+
use_proc_txn_control_extended_protocol_fix on
249250
variable_inequality_lookup_join_enabled on
250251
vector_search_beam_size 32
251252
vectorize on

pkg/sql/pgwire/testdata/pgtest/procedure

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,51 @@ ReadyForQuery
4646
{"Type":"ReadyForQuery","TxStatus":"I"}
4747
{"Type":"CommandComplete","CommandTag":"DROP TABLE"}
4848
{"Type":"ReadyForQuery","TxStatus":"I"}
49+
50+
# Regression test for #147701: correctly handle a PL/pgSQL procedure that
51+
# commits or rolls back the transaction.
52+
send
53+
Query {"String": "CREATE OR REPLACE PROCEDURE p() LANGUAGE PLpgSQL AS $$ BEGIN RAISE NOTICE 'foo'; COMMIT; RAISE NOTICE 'bar'; ROLLBACK; RAISE NOTICE 'baz'; END $$;"}
54+
----
55+
56+
until
57+
ReadyForQuery
58+
----
59+
{"Type":"CommandComplete","CommandTag":"CREATE PROCEDURE"}
60+
{"Type":"ReadyForQuery","TxStatus":"I"}
61+
62+
send
63+
Query {"String": "CALL p()"}
64+
----
65+
66+
until
67+
ReadyForQuery
68+
----
69+
{"Type":"RowDescription","Fields":null}
70+
{"Severity":"NOTICE","SeverityUnlocalized":"NOTICE","Code":"00000","Message":"foo","Detail":"","Hint":"","Position":0,"InternalPosition":0,"InternalQuery":"","Where":"","SchemaName":"","TableName":"","ColumnName":"","DataTypeName":"","ConstraintName":"","File":"builtins.go","Line":0,"Routine":"func378","UnknownFields":null}
71+
{"Type":"RowDescription","Fields":null}
72+
{"Severity":"NOTICE","SeverityUnlocalized":"NOTICE","Code":"00000","Message":"bar","Detail":"","Hint":"","Position":0,"InternalPosition":0,"InternalQuery":"","Where":"","SchemaName":"","TableName":"","ColumnName":"","DataTypeName":"","ConstraintName":"","File":"builtins.go","Line":0,"Routine":"func378","UnknownFields":null}
73+
{"Type":"RowDescription","Fields":null}
74+
{"Severity":"NOTICE","SeverityUnlocalized":"NOTICE","Code":"00000","Message":"baz","Detail":"","Hint":"","Position":0,"InternalPosition":0,"InternalQuery":"","Where":"","SchemaName":"","TableName":"","ColumnName":"","DataTypeName":"","ConstraintName":"","File":"builtins.go","Line":0,"Routine":"func378","UnknownFields":null}
75+
{"Type":"CommandComplete","CommandTag":"CALL"}
76+
{"Type":"ReadyForQuery","TxStatus":"I"}
77+
78+
send
79+
Parse {"Name": "foo", "Query": "CALL p()"}
80+
Bind {"DestinationPortal": "foo", "PreparedStatement": "foo"}
81+
Execute {"Portal": "foo"}
82+
Sync
83+
----
84+
85+
until
86+
ReadyForQuery
87+
----
88+
{"Type":"ParseComplete"}
89+
{"Type":"BindComplete"}
90+
{"Severity":"NOTICE","SeverityUnlocalized":"NOTICE","Code":"00000","Message":"foo","Detail":"","Hint":"","Position":0,"InternalPosition":0,"InternalQuery":"","Where":"","SchemaName":"","TableName":"","ColumnName":"","DataTypeName":"","ConstraintName":"","File":"builtins.go","Line":0,"Routine":"func378","UnknownFields":null}
91+
{"Type":"RowDescription","Fields":null}
92+
{"Severity":"NOTICE","SeverityUnlocalized":"NOTICE","Code":"00000","Message":"bar","Detail":"","Hint":"","Position":0,"InternalPosition":0,"InternalQuery":"","Where":"","SchemaName":"","TableName":"","ColumnName":"","DataTypeName":"","ConstraintName":"","File":"builtins.go","Line":0,"Routine":"func378","UnknownFields":null}
93+
{"Type":"RowDescription","Fields":null}
94+
{"Severity":"NOTICE","SeverityUnlocalized":"NOTICE","Code":"00000","Message":"baz","Detail":"","Hint":"","Position":0,"InternalPosition":0,"InternalQuery":"","Where":"","SchemaName":"","TableName":"","ColumnName":"","DataTypeName":"","ConstraintName":"","File":"builtins.go","Line":0,"Routine":"func378","UnknownFields":null}
95+
{"Type":"CommandComplete","CommandTag":"CALL"}
96+
{"Type":"ReadyForQuery","TxStatus":"I"}

pkg/sql/routine.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
1515
"github.com/cockroachdb/cockroach/pkg/sql/isql"
1616
"github.com/cockroachdb/cockroach/pkg/sql/opt/memo"
17+
"github.com/cockroachdb/cockroach/pkg/sql/parser/statements"
1718
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
1819
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
1920
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
@@ -660,14 +661,18 @@ type storedProcTxnStateAccessor struct {
660661
}
661662

662663
func (a *storedProcTxnStateAccessor) setStoredProcTxnState(
663-
txnOp tree.StoredProcTxnOp, txnModes *tree.TransactionModes, resumeProc *memo.Memo,
664+
txnOp tree.StoredProcTxnOp,
665+
txnModes *tree.TransactionModes,
666+
resumeProc *memo.Memo,
667+
resumeStmt statements.Statement[tree.Statement],
664668
) {
665669
if a.ex == nil {
666670
panic(errors.AssertionFailedf("setStoredProcTxnState is not supported without connExecutor"))
667671
}
668672
a.ex.extraTxnState.storedProcTxnState.txnOp = txnOp
669673
a.ex.extraTxnState.storedProcTxnState.txnModes = txnModes
670674
a.ex.extraTxnState.storedProcTxnState.resumeProc = resumeProc
675+
a.ex.extraTxnState.storedProcTxnState.resumeStmt = resumeStmt
671676
}
672677

673678
func (a *storedProcTxnStateAccessor) getTxnOp() tree.StoredProcTxnOp {
@@ -713,6 +718,8 @@ func (p *planner) EvalTxnControlExpr(
713718
if err != nil {
714719
return nil, err
715720
}
716-
p.storedProcTxnState.setStoredProcTxnState(expr.Op, &expr.Modes, resumeProc.(*memo.Memo))
721+
p.storedProcTxnState.setStoredProcTxnState(
722+
expr.Op, &expr.Modes, resumeProc.(*memo.Memo), p.stmt.Statement,
723+
)
717724
return tree.DNull, nil
718725
}

pkg/sql/sessiondatapb/local_only_session_data.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -685,6 +685,10 @@ message LocalOnlySessionData {
685685
// DistSQLUseReducedLeafWriteSets, when true, indicates that the DistSQL
686686
// runner should use the reduced write sets when constructing LeafTxns.
687687
bool distsql_use_reduced_leaf_write_sets = 174 [(gogoproto.customname) = "DistSQLUseReducedLeafWriteSets"];
688+
// UseProcTxnControlExtendedProtocolFix, when true, enables the fix for
689+
// PL/pgSQL transaction control statements (COMMIT, ROLLBACK) when the stored
690+
// procedure is executed via a portal in the extended wire protocol.
691+
bool use_proc_txn_control_extended_protocol_fix = 175;
688692

689693
///////////////////////////////////////////////////////////////////////////
690694
// WARNING: consider whether a session parameter you're adding needs to //

pkg/sql/vars.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4143,6 +4143,23 @@ var varGen = map[string]sessionVar{
41434143
},
41444144
GlobalDefault: globalTrue,
41454145
},
4146+
4147+
// CockroachDB extension.
4148+
`use_proc_txn_control_extended_protocol_fix`: {
4149+
GetStringVal: makePostgresBoolGetStringValFn(`use_proc_txn_control_extended_protocol_fix`),
4150+
Set: func(_ context.Context, m sessionDataMutator, s string) error {
4151+
b, err := paramparse.ParseBoolVar("use_proc_txn_control_extended_protocol_fix", s)
4152+
if err != nil {
4153+
return err
4154+
}
4155+
m.SetUseProcTxnControlExtendedProtocolFix(b)
4156+
return nil
4157+
},
4158+
Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) {
4159+
return formatBoolAsPostgresSetting(evalCtx.SessionData().UseProcTxnControlExtendedProtocolFix), nil
4160+
},
4161+
GlobalDefault: globalTrue,
4162+
},
41464163
}
41474164

41484165
func ReplicationModeFromString(s string) (sessiondatapb.ReplicationMode, error) {

0 commit comments

Comments
 (0)