Skip to content

Commit 48b940d

Browse files
committed
logical: wrap recoverable errors in savepoints
Internal executor statements failures are not atomic. They may fail and leave behind live KV operations. Usually this is okay because the error is returned up the stack and the caller aborts the transaction. There is a bug LDR because it recovers condition failed errors to detect LWW losses. The insert fast path writes indexes before it writes to the primary key, so these fast path failures could corrupt index entries. Now, all LDR code paths that commit a transaction containing a recoverable internal executor error wrap the statement in a savepoint. Informs: #144645 Epic: CRDB-48647 Release note: None
1 parent 4d33777 commit 48b940d

File tree

6 files changed

+188
-3
lines changed

6 files changed

+188
-3
lines changed

pkg/crosscluster/logical/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ go_library(
1717
"purgatory.go",
1818
"range_stats.go",
1919
"replication_statements.go",
20+
"savepoint.go",
2021
"sql_crud_writer.go",
2122
"sql_row_reader.go",
2223
"sql_row_writer.go",
@@ -127,6 +128,7 @@ go_test(
127128
"purgatory_test.go",
128129
"range_stats_test.go",
129130
"replication_statements_test.go",
131+
"savepoint_test.go",
130132
"sql_row_reader_test.go",
131133
"sql_row_writer_test.go",
132134
"table_batch_handler_test.go",

pkg/crosscluster/logical/logical_replication_job_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,61 @@ func TestLogicalStreamIngestionJobNameResolution(t *testing.T) {
181181
}
182182
}
183183

184+
func TestOptimsitcInsertCorruption(t *testing.T) {
185+
defer leaktest.AfterTest(t)()
186+
skip.UnderDeadlock(t)
187+
defer log.Scope(t).Close(t)
188+
189+
// This is a regression test for #144645. Running the optimistic insert code
190+
// path could corrupt indexes if the insert is not wrapped in a savepoint.
191+
192+
ctx := context.Background()
193+
194+
server, s, dbSource, dbDest := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
195+
defer server.Stopper().Stop(ctx)
196+
197+
dbSource.Exec(t, "SET CLUSTER SETTING logical_replication.consumer.try_optimistic_insert.enabled = true")
198+
199+
createTableStmt := `CREATE TABLE computed_cols (
200+
a INT,
201+
b INT,
202+
c INT,
203+
PRIMARY KEY (a, b)
204+
)`
205+
dbSource.Exec(t, createTableStmt)
206+
dbDest.Exec(t, createTableStmt)
207+
208+
createIdxStmt := `CREATE INDEX c ON computed_cols (c)`
209+
dbSource.Exec(t, createIdxStmt)
210+
dbDest.Exec(t, createIdxStmt)
211+
212+
// Insert initial data into destination that should be overwritten. This
213+
// gives more opportunities for corruption.
214+
dbDest.Exec(t, "INSERT INTO computed_cols (a, b, c) VALUES (1, 2, 3), (3, 4, 5)")
215+
dbSource.Exec(t, "INSERT INTO computed_cols (a, b, c) VALUES (1, 2, 6), (3, 4, 7)")
216+
217+
// Create logical replication stream from source to destination
218+
sourceURL := replicationtestutils.GetExternalConnectionURI(t, s, s, serverutils.DBName("a"))
219+
var jobID jobspb.JobID
220+
dbDest.QueryRow(t,
221+
"CREATE LOGICAL REPLICATION STREAM FROM TABLE computed_cols ON $1 INTO TABLE computed_cols",
222+
sourceURL.String(),
223+
).Scan(&jobID)
224+
225+
WaitUntilReplicatedTime(t, s.Clock().Now(), dbDest, jobID)
226+
227+
dbSource.CheckQueryResults(t, "SELECT * FROM computed_cols", [][]string{
228+
{"1", "2", "6"},
229+
{"3", "4", "7"},
230+
})
231+
dbDest.CheckQueryResults(t, "SELECT * FROM computed_cols", [][]string{
232+
{"1", "2", "6"},
233+
{"3", "4", "7"},
234+
})
235+
236+
compareReplicatedTables(t, s, "a", "b", "computed_cols", dbSource, dbDest)
237+
}
238+
184239
type fatalDLQ struct{ *testing.T }
185240

186241
func (fatalDLQ) Create(ctx context.Context) error { return nil }

pkg/crosscluster/logical/lww_row_processor.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -640,7 +640,11 @@ func (lww *lwwQuerier) InsertRow(
640640
if !useLowPriority.Get(&lww.settings.SV) {
641641
sess.QualityOfService = nil
642642
}
643-
if _, err = ie.ExecParsed(ctx, replicatedOptimisticInsertOpName, kvTxn, sess, stmt, datums...); err != nil {
643+
err = withSavepoint(ctx, kvTxn, func() error {
644+
_, err = ie.ExecParsed(ctx, replicatedOptimisticInsertOpName, kvTxn, sess, stmt, datums...)
645+
return err
646+
})
647+
if err != nil {
644648
if isLwwLoser(err) {
645649
return batchStats{}, nil
646650
}
@@ -667,7 +671,10 @@ func (lww *lwwQuerier) InsertRow(
667671
sess.QualityOfService = nil
668672
}
669673
sess.OriginTimestampForLogicalDataReplication = row.MvccTimestamp
670-
_, err = ie.ExecParsed(ctx, replicatedInsertOpName, kvTxn, sess, stmt, datums...)
674+
err = withSavepoint(ctx, kvTxn, func() error {
675+
_, err = ie.ExecParsed(ctx, replicatedInsertOpName, kvTxn, sess, stmt, datums...)
676+
return err
677+
})
671678
if isLwwLoser(err) {
672679
return batchStats{}, nil
673680
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package logical
7+
8+
import (
9+
"context"
10+
11+
"github.com/cockroachdb/cockroach/pkg/kv"
12+
"github.com/cockroachdb/errors"
13+
)
14+
15+
// withSavepoint is a utility function that runs the provided function within
16+
// the context of the savepoint.
17+
//
18+
// If the caller wishes to recover from an error returned by an internal
19+
// executor and keep the transaction, then the call to the internal executor
20+
// must be scoped within a savepoint. Failed internal executor calls are not
21+
// atomic and may have left behind some KV operations. Usually that is okay
22+
// because the error is passed up the call stack and the transaction is rolled
23+
// back.
24+
func withSavepoint(ctx context.Context, txn *kv.Txn, fn func() error) error {
25+
// In the classic sql writer the txn is optional.
26+
if txn == nil {
27+
return fn()
28+
}
29+
// TODO(jeffswenson): consider changing the internal executor so all calls
30+
// implicitly create and apply/rollback savepoints.
31+
savepoint, err := txn.CreateSavepoint(ctx)
32+
if err != nil {
33+
return err
34+
}
35+
err = fn()
36+
if err != nil {
37+
// NOTE: we return the save point error if rollback fails because we do not
38+
// want something checking error types to attempt to handle the inner
39+
// error.
40+
if savePointErr := txn.RollbackToSavepoint(ctx, savepoint); savePointErr != nil {
41+
return errors.WithSecondaryError(savePointErr, err)
42+
}
43+
return err
44+
}
45+
if err := txn.ReleaseSavepoint(ctx, savepoint); err != nil {
46+
return err
47+
}
48+
return nil
49+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package logical
7+
8+
import (
9+
"context"
10+
"testing"
11+
12+
"github.com/cockroachdb/cockroach/pkg/base"
13+
"github.com/cockroachdb/cockroach/pkg/kv"
14+
"github.com/cockroachdb/cockroach/pkg/sql"
15+
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
16+
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
17+
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
18+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
19+
"github.com/cockroachdb/cockroach/pkg/util/log"
20+
"github.com/cockroachdb/errors"
21+
"github.com/stretchr/testify/require"
22+
)
23+
24+
func TestWithSavepoint(t *testing.T) {
25+
defer leaktest.AfterTest(t)()
26+
defer log.Scope(t).Close(t)
27+
28+
srv, rawDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
29+
defer srv.Stopper().Stop(context.Background())
30+
31+
ctx := context.Background()
32+
sqlDB := sqlutils.MakeSQLRunner(rawDB)
33+
sqlDB.Exec(t, "CREATE TABLE test (id STRING PRIMARY KEY, value STRING)")
34+
35+
require.NoError(t, srv.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
36+
err := withSavepoint(ctx, txn, func() error {
37+
_, err := srv.InternalExecutor().(*sql.InternalExecutor).ExecEx(
38+
ctx,
39+
"test-insert",
40+
txn,
41+
sessiondata.NodeUserSessionDataOverride,
42+
"INSERT INTO defaultdb.test VALUES ('ok', 'is-persisted')",
43+
)
44+
return err
45+
})
46+
require.NoError(t, err)
47+
48+
err = withSavepoint(ctx, txn, func() error {
49+
_, err := srv.InternalExecutor().(*sql.InternalExecutor).ExecEx(
50+
ctx,
51+
"test-insert",
52+
txn,
53+
sessiondata.NodeUserSessionDataOverride,
54+
"INSERT INTO defaultdb.test VALUES ('fails', 'is-rolled-back')",
55+
)
56+
require.NoError(t, err)
57+
// NOTE: the query above is okay, which means it wrote things to KV,
58+
// but we're going to return an error which rolls back the
59+
// savepoint.
60+
return errors.New("something to rollback")
61+
})
62+
require.ErrorContains(t, err, "something to rollback")
63+
64+
return nil
65+
}))
66+
67+
sqlDB.CheckQueryResults(t, "SELECT id, value FROM test", [][]string{
68+
{"ok", "is-persisted"},
69+
})
70+
}

pkg/crosscluster/logical/table_batch_handler.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,9 @@ func (t *tableHandler) attemptBatch(
170170
stats.kvLwwLosers += tombstoneUpdateStats.kvWriteTooOld
171171
case event.prevRow == nil:
172172
stats.inserts++
173-
err := t.sqlWriter.InsertRow(ctx, txn, event.originTimestamp, event.row)
173+
err := withSavepoint(ctx, txn.KV(), func() error {
174+
return t.sqlWriter.InsertRow(ctx, txn, event.originTimestamp, event.row)
175+
})
174176
if isLwwLoser(err) {
175177
// Insert may observe a LWW failure if it attempts to write over a tombstone.
176178
stats.kvLwwLosers++

0 commit comments

Comments
 (0)