Skip to content

Commit c482610

Browse files
craig[bot]yuzefovich
andcommitted
Merge #156959
156959: copy: step txn unconditionally r=yuzefovich a=yuzefovich Currently, we have an issue with COPY FROM command if it runs within the txn which had just been rolled back to a savepoint - we'd hit an assertion error in the KV layer. This commit fixes the oversight by stepping the txn, similar to what we do on the main query path. Additionally, this commit explicitly prohibits running COPY FROM via the internal executor (previously it could lead to undefined behavior or internal errors) and adds a test with savepoints for COPY TO (which works because it uses different execution machinery than COPY FROM). Fixes: #144383. Release note (bug fix): Previously, CockroachDB could encounter an internal error when evaluating COPY FROM command in a transaction after it's been rolled back to a savepoint. The bug has been present since before 23.2 version and is now fixed. Co-authored-by: Yahor Yuzefovich <[email protected]>
2 parents a053744 + b320b51 commit c482610

File tree

5 files changed

+67
-42
lines changed

5 files changed

+67
-42
lines changed

pkg/sql/conn_executor.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3227,6 +3227,12 @@ func (ex *connExecutor) execCopyIn(
32273227
// Disable the buffered writes for COPY since there is no benefit in this
32283228
// ability here.
32293229
ex.state.mu.txn.SetBufferedWritesEnabled(false /* enabled */)
3230+
// Step the txn in case it had just been rolled back to a savepoint (if it
3231+
// wasn't, this is harmless). This also matches what we do unconditionally
3232+
// on the main query path.
3233+
if err := ex.state.mu.txn.Step(ctx, false /* allowReadTimestampStep */); err != nil {
3234+
return ex.makeErrEvent(err, cmd.ParsedStmt.AST)
3235+
}
32303236
txnOpt := copyTxnOpt{
32313237
txn: ex.state.mu.txn,
32323238
txnTimestamp: ex.state.sqlTimestamp,

pkg/sql/copy/copy_in_test.go

Lines changed: 33 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -646,49 +646,51 @@ func TestCopyTransaction(t *testing.T) {
646646
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
647647
defer s.Stopper().Stop(context.Background())
648648

649-
if _, err := db.Exec(`
649+
_, err := db.Exec(`
650650
CREATE DATABASE d;
651651
SET DATABASE = d;
652652
CREATE TABLE t (
653653
i INT PRIMARY KEY
654654
);
655-
`); err != nil {
656-
t.Fatal(err)
657-
}
655+
`)
656+
require.NoError(t, err)
658657

659658
txn, err := db.Begin()
660-
if err != nil {
661-
t.Fatal(err)
662-
}
663-
664-
// Note that, at least with lib/pq, this doesn't actually send a Parse msg
665-
// (which we wouldn't support, as we don't support Copy-in in extended
666-
// protocol mode). lib/pq has magic for recognizing a Copy.
667-
stmt, err := txn.Prepare(pq.CopyIn("t", "i"))
668-
if err != nil {
669-
t.Fatal(err)
670-
}
659+
require.NoError(t, err)
671660

672-
const val = 2
661+
// Run COPY twice with the first one being rolled back via savepoints.
662+
for val, doSavepoint := range []bool{true, false} {
663+
func() {
664+
if doSavepoint {
665+
_, err = txn.Exec("SAVEPOINT s")
666+
require.NoError(t, err)
667+
defer func() {
668+
_, err = txn.Exec("ROLLBACK TO SAVEPOINT s")
669+
require.NoError(t, err)
670+
}()
671+
}
672+
// Note that, at least with lib/pq, this doesn't actually send a
673+
// Parse msg (which we wouldn't support, as we don't support Copy-in
674+
// in extended protocol mode). lib/pq has magic for recognizing a
675+
// Copy.
676+
stmt, err := txn.Prepare(pq.CopyIn("t", "i"))
677+
require.NoError(t, err)
673678

674-
_, err = stmt.Exec(val)
675-
if err != nil {
676-
t.Fatal(err)
677-
}
679+
_, err = stmt.Exec(val)
680+
require.NoError(t, err)
678681

679-
if err = stmt.Close(); err != nil {
680-
t.Fatal(err)
681-
}
682+
err = stmt.Close()
683+
require.NoError(t, err)
682684

683-
var i int
684-
if err := txn.QueryRow("SELECT i FROM d.t").Scan(&i); err != nil {
685-
t.Fatal(err)
686-
} else if i != val {
687-
t.Fatalf("expected 1, got %d", i)
688-
}
689-
if err := txn.Commit(); err != nil {
690-
t.Fatal(err)
685+
var i int
686+
err = txn.QueryRow("SELECT i FROM d.t").Scan(&i)
687+
require.NoError(t, err)
688+
if i != val {
689+
t.Fatalf("expected %d, got %d", val, i)
690+
}
691+
}()
691692
}
693+
require.NoError(t, txn.Commit())
692694
}
693695

694696
// TestCopyFromFKCheck verifies that foreign keys are checked during COPY.

pkg/sql/copy/copy_out_test.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,26 @@ func TestCopyOutTransaction(t *testing.T) {
6161
tx, err := conn.Begin(ctx)
6262
require.NoError(t, err)
6363

64-
_, err = tx.Exec(ctx, "INSERT INTO t VALUES (1)")
65-
require.NoError(t, err)
64+
for val, doSavepoint := range []bool{true, false} {
65+
func() {
66+
if doSavepoint {
67+
_, err = tx.Exec(ctx, "SAVEPOINT s")
68+
require.NoError(t, err)
69+
defer func() {
70+
_, err = tx.Exec(ctx, "ROLLBACK TO SAVEPOINT s")
71+
require.NoError(t, err)
72+
}()
73+
}
6674

67-
var buf bytes.Buffer
68-
_, err = tx.Conn().PgConn().CopyTo(ctx, &buf, "COPY t TO STDOUT")
69-
require.NoError(t, err)
70-
require.Equal(t, "1\n", buf.String())
75+
_, err = tx.Exec(ctx, "INSERT INTO t VALUES ($1)", val)
76+
require.NoError(t, err)
7177

78+
var buf bytes.Buffer
79+
_, err = tx.Conn().PgConn().CopyTo(ctx, &buf, "COPY t TO STDOUT")
80+
require.NoError(t, err)
81+
require.Equal(t, fmt.Sprintf("%d\n", val), buf.String())
82+
}()
83+
}
7284
require.NoError(t, tx.Rollback(ctx))
7385
}
7486

pkg/sql/internal.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1464,15 +1464,17 @@ func (ie *InternalExecutor) commitTxn(ctx context.Context) error {
14641464
return ex.commitSQLTransactionInternal(ctx)
14651465
}
14661466

1467-
// checkIfStmtIsAllowed returns an error if the internal executor is not bound
1468-
// with the outer-txn-related info but is used to run DDL statements within an
1469-
// outer txn.
1470-
// TODO (janexing): this will be deprecate soon since it's not a good idea
1471-
// to have `extraTxnState` to store the info from a outer txn.
1467+
// checkIfStmtIsAllowed returns an error if the internal executor cannot execute
1468+
// the given stmt.
14721469
func (ie *InternalExecutor) checkIfStmtIsAllowed(stmt tree.Statement, txn *kv.Txn) error {
14731470
if stmt == nil {
14741471
return nil
14751472
}
1473+
if _, ok := stmt.(*tree.CopyFrom); ok {
1474+
// COPY FROM has special handling in the connExecutor, so we can't run
1475+
// it via the internal executor.
1476+
return errors.New("COPY cannot be run via the internal executor")
1477+
}
14761478
if tree.CanModifySchema(stmt) && txn != nil && ie.extraTxnState == nil {
14771479
return errors.New("DDL statement is disallowed if internal " +
14781480
"executor is not bound with txn metadata")

pkg/sql/opt/exec/execbuilder/testdata/execute_internally_builtin

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,3 +298,6 @@ subtest end
298298
# Regression test for crashing with SHOW COMMIT TIMESTAMP.
299299
statement error this statement is disallowed
300300
SELECT crdb_internal.execute_internally('SHOW COMMIT TIMESTAMP;', true);
301+
302+
statement error COPY cannot be run via the internal executor
303+
SELECT crdb_internal.execute_internally('COPY t FROM STDIN');

0 commit comments

Comments
 (0)