Skip to content

Commit 757ce69

Browse files
craig[bot]jeffswenson
andcommitted
Merge #143567
143567: crosscluster: fix tombstone updates in sql writer r=jeffswenson a=jeffswenson This fixes a subtle bug in the sql LDR writer. If a row is deleted in two clusters replicated via LDR, when the LWW winning delete is replicated to the losing cluster, the losing cluster does not update the origin timestamp assigned to its tombstone. This can lead to non-convergence in three cluster setups. While this is fixing a bug, the most common case that will take the new code path is replays during catch up scans or processor replanning. In that case the replay will see zero rows impacted by the delete, then the cput will fail LWW against the value that is stored in the local DB. Release note: none Epic: [CRDB-48647](https://cockroachlabs.atlassian.net/browse/CRDB-48647) Co-authored-by: Jeff Swenson <[email protected]>
2 parents 150d3d1 + 4167f6a commit 757ce69

File tree

7 files changed

+337
-15
lines changed

7 files changed

+337
-15
lines changed

pkg/crosscluster/logical/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ go_library(
1616
"purgatory.go",
1717
"range_stats.go",
1818
"replication_statements.go",
19+
"tombstone_updater.go",
1920
"udf_row_processor.go",
2021
],
2122
importpath = "github.com/cockroachdb/cockroach/pkg/crosscluster/logical",
@@ -154,6 +155,7 @@ go_test(
154155
"//pkg/sql/catalog/descpb",
155156
"//pkg/sql/catalog/descs",
156157
"//pkg/sql/catalog/desctestutils",
158+
"//pkg/sql/catalog/lease",
157159
"//pkg/sql/catalog/resolver",
158160
"//pkg/sql/execinfra",
159161
"//pkg/sql/execinfrapb",

pkg/crosscluster/logical/logical_replication_job_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1388,6 +1388,86 @@ func TestThreeWayReplication(t *testing.T) {
13881388
verifyExpectedRowAllServers(t, runners, expectedRows, dbNames)
13891389
}
13901390

1391+
func TestTombstoneUpdate(t *testing.T) {
1392+
defer leaktest.AfterTest(t)()
1393+
skip.UnderDeadlock(t)
1394+
defer log.Scope(t).Close(t)
1395+
// This test is a regression test for a bug where replicating a delete does
1396+
// not update the origin timestamp assigned to a tombstone if the replicated
1397+
// value is overwriting a tombstone.
1398+
//
1399+
// Replicating the bug requires three databases ('src-a', 'src-b', 'dst').
1400+
//
1401+
// Here's the timeline of events:
1402+
//
1403+
// 1. row is inserted into 'src-a', 'src-b', and 'dst'
1404+
// 2. row is deleted from 'dst'
1405+
// 3. row is updated in 'src-b'
1406+
// 4. row is deleted in 'src-a'
1407+
//
1408+
// 5. Replication is started from 'src-a' -> 'dst' and allowed to catch up to now.
1409+
// 6. Replication is started from 'srb-b' -> 'dst' and allowed to catch up to now.
1410+
//
1411+
// In this example, the last delete is the LWW winner. But if tombstones are not updated, then the update in 'src-b' will be
1412+
// replicated to 'dst' will win LWW since the replicated delete was no-op'd.
1413+
1414+
ctx := context.Background()
1415+
1416+
clusterArgs := base.TestClusterArgs{
1417+
ServerArgs: base.TestServerArgs{
1418+
DefaultTestTenant: base.TestControlsTenantsExplicitly,
1419+
Knobs: base.TestingKnobs{
1420+
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
1421+
},
1422+
},
1423+
}
1424+
1425+
numDBs := 3
1426+
server, s, runners, dbNames := setupServerWithNumDBs(t, ctx, clusterArgs, 1, numDBs)
1427+
defer server.Stopper().Stop(ctx)
1428+
PGURLs := GetPGURLs(t, s, dbNames)
1429+
1430+
// Insert row into all three databases
1431+
for i := range runners {
1432+
runners[i].Exec(t, "UPSERT INTO tab VALUES (1, 'initial')")
1433+
}
1434+
1435+
srcA, srcB, dst := runners[0], runners[1], runners[2]
1436+
urlSrcA, urlSrcB, _ := PGURLs[0].String(), PGURLs[1].String(), PGURLs[2].String()
1437+
1438+
// Grab timestamp that will be used as a cursor for starting the logical replication jobs
1439+
start := s.Clock().Now()
1440+
1441+
// Delete the row in the dest, this should lose all LWW conflicts.
1442+
dst.Exec(t, "DELETE FROM tab WHERE pk = 1")
1443+
1444+
// Update the row in one of the sources, this should lose LWW conlficts to
1445+
// the final delete.
1446+
srcB.Exec(t, "UPSERT INTO tab VALUES (1, 'updated')")
1447+
1448+
// Delete the row in the other source, this should win LWW.
1449+
srcA.Exec(t, "DELETE FROM tab WHERE pk = 1")
1450+
1451+
dst.CheckQueryResults(t, "SELECT * FROM tab", [][]string{})
1452+
srcA.CheckQueryResults(t, "SELECT * FROM tab", [][]string{})
1453+
srcB.CheckQueryResults(t, "SELECT * FROM tab", [][]string{{"1", "updated"}})
1454+
1455+
// 5. Replicate the delete from 'src-a' -> 'dst'
1456+
var jobIDSrcA jobspb.JobID
1457+
dst.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab WITH MODE = VALIDATED, CURSOR = $2",
1458+
urlSrcA, start.AsOfSystemTime()).Scan(&jobIDSrcA)
1459+
WaitUntilReplicatedTime(t, s.Clock().Now(), dst, jobIDSrcA)
1460+
1461+
// 6. Replicate the update from 'src-b' -> 'dst'
1462+
var jobIDSrcB jobspb.JobID
1463+
dst.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab WITH MODE = VALIDATED, CURSOR = $2",
1464+
urlSrcB, start.AsOfSystemTime()).Scan(&jobIDSrcB)
1465+
WaitUntilReplicatedTime(t, s.Clock().Now(), dst, jobIDSrcB)
1466+
1467+
// Verify that the delete won LWW since it has the highest mvcc value
1468+
dst.CheckQueryResults(t, "SELECT * FROM tab WHERE pk = 1", [][]string{})
1469+
}
1470+
13911471
func TestForeignKeyConstraints(t *testing.T) {
13921472
defer leaktest.AfterTest(t)()
13931473
skip.UnderDeadlock(t)

pkg/crosscluster/logical/logical_replication_writer_processor.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/cockroachdb/cockroach/pkg/settings"
2626
"github.com/cockroachdb/cockroach/pkg/sql"
2727
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
28+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
2829
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
2930
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
3031
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
@@ -738,6 +739,8 @@ func (lrw *logicalReplicationWriterProcessor) setupBatchHandlers(ctx context.Con
738739
// avoid creating a new copy on each executor usage.
739740
flowCtx.Cfg.DB.Executor(isql.WithSessionData(sql.NewInternalSessionData(ctx, flowCtx.Cfg.Settings, "" /* opName */))),
740741
sd, lrw.spec,
742+
flowCtx.Codec(),
743+
flowCtx.Cfg.LeaseManager.(*lease.Manager),
741744
)
742745
if err != nil {
743746
return err

pkg/crosscluster/logical/lww_row_processor.go

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
2424
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
2525
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
26+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
2627
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
2728
"github.com/cockroachdb/cockroach/pkg/sql/isql"
2829
"github.com/cockroachdb/cockroach/pkg/sql/lexbase"
@@ -79,6 +80,7 @@ type querier interface {
7980
InsertRow(ctx context.Context, txn isql.Txn, ie isql.Executor, row cdcevent.Row, prevRow *cdcevent.Row, likelyInsert bool) (batchStats, error)
8081
DeleteRow(ctx context.Context, txn isql.Txn, ie isql.Executor, row cdcevent.Row, prevRow *cdcevent.Row) (batchStats, error)
8182
RequiresParsedBeforeRow(catid.DescID) bool
83+
ReleaseLeases(ctx context.Context)
8284
}
8385

8486
// isLwwLoser returns true if the error is a ConditionFailedError with an
@@ -274,7 +276,9 @@ func (sqlRowProcessor) ReportMutations(_ *stats.Refresher) {}
274276

275277
// ReleaseLeases implements the BatchHandler interface but is a no-op since each
276278
// query does this itself.
277-
func (sqlRowProcessor) ReleaseLeases(_ context.Context) {}
279+
func (srp *sqlRowProcessor) ReleaseLeases(ctx context.Context) {
280+
srp.querier.ReleaseLeases(ctx)
281+
}
278282

279283
func (*sqlRowProcessor) Close(ctx context.Context) {}
280284

@@ -458,6 +462,8 @@ func makeSQLProcessor(
458462
ie isql.Executor,
459463
sd *sessiondata.SessionData,
460464
spec execinfrapb.LogicalReplicationWriterSpec,
465+
codec keys.SQLCodec,
466+
leaseMgr *lease.Manager,
461467
) (*sqlRowProcessor, error) {
462468

463469
needUDFQuerier := false
@@ -469,10 +475,14 @@ func makeSQLProcessor(
469475

470476
lwwQuerier := &lwwQuerier{
471477
settings: settings,
478+
codec: codec,
479+
db: db,
480+
leaseMgr: leaseMgr,
472481
queryBuffer: queryBuffer{
473482
deleteQueries: make(map[catid.DescID]queryBuilder, len(tableConfigByDestID)),
474483
insertQueries: make(map[catid.DescID]map[catid.FamilyID]queryBuilder, len(tableConfigByDestID)),
475484
},
485+
tombstoneUpdaters: make(map[descpb.ID]*tombstoneUpdater, len(tableConfigByDestID)),
476486
ieOverrideOptimisticInsert: getIEOverride(replicatedOptimisticInsertOpName, jobID),
477487
ieOverrideInsert: getIEOverride(replicatedInsertOpName, jobID),
478488
ieOverrideDelete: getIEOverride(replicatedDeleteOpName, jobID),
@@ -532,6 +542,13 @@ func (m *muxQuerier) RequiresParsedBeforeRow(id catid.DescID) bool {
532542
return m.shouldUseUDF[id]
533543
}
534544

545+
func (m *muxQuerier) ReleaseLeases(ctx context.Context) {
546+
if m.udfQuerier != nil {
547+
m.udfQuerier.ReleaseLeases(ctx)
548+
}
549+
m.lwwQuerier.ReleaseLeases(ctx)
550+
}
551+
535552
// lwwQuerier is a querier that implements partial last-write-wins
536553
// semantics using SQL queries.
537554
//
@@ -542,12 +559,16 @@ func (m *muxQuerier) RequiresParsedBeforeRow(id catid.DescID) bool {
542559
//
543560
// See the design document for possible solutions to these problems.
544561
type lwwQuerier struct {
545-
settings *cluster.Settings
546-
queryBuffer queryBuffer
562+
settings *cluster.Settings
563+
codec keys.SQLCodec
564+
db descs.DB
565+
queryBuffer queryBuffer
566+
tombstoneUpdaters map[descpb.ID]*tombstoneUpdater
547567

548568
ieOverrideOptimisticInsert sessiondata.InternalExecutorOverride
549569
ieOverrideInsert sessiondata.InternalExecutorOverride
550570
ieOverrideDelete sessiondata.InternalExecutorOverride
571+
leaseMgr *lease.Manager
551572
}
552573

553574
func (lww *lwwQuerier) AddTable(targetDescID int32, tc sqlProcessorTableConfig) error {
@@ -561,6 +582,15 @@ func (lww *lwwQuerier) AddTable(targetDescID int32, tc sqlProcessorTableConfig)
561582
if err != nil {
562583
return err
563584
}
585+
586+
lww.tombstoneUpdaters[td.GetID()] = newTombstoneUpdater(
587+
lww.codec,
588+
lww.db.KV(),
589+
lww.leaseMgr,
590+
catid.DescID(targetDescID),
591+
lww.settings,
592+
)
593+
564594
return nil
565595
}
566596

@@ -665,13 +695,26 @@ func (lww *lwwQuerier) DeleteRow(
665695
sess.QualityOfService = nil
666696
}
667697
sess.OriginTimestampForLogicalDataReplication = row.MvccTimestamp
668-
if _, err := ie.ExecParsed(ctx, replicatedDeleteOpName, kvTxn, sess, stmt, datums...); err != nil {
698+
rowCount, err := ie.ExecParsed(ctx, replicatedDeleteOpName, kvTxn, sess, stmt, datums...)
699+
if err != nil {
669700
log.Warningf(ctx, "replicated delete failed (query: %s): %s", stmt.SQL, err.Error())
670701
return batchStats{}, err
671702
}
703+
if rowCount != 1 {
704+
// NOTE: at this point we don't know if we are updating a tombstone or if
705+
// we are losing LWW. As long as it is a LWW loss or a tombstone update,
706+
// updateTombstone will return okay.
707+
return lww.tombstoneUpdaters[row.TableID].updateTombstone(ctx, txn, row)
708+
}
672709
return batchStats{}, nil
673710
}
674711

712+
func (lww *lwwQuerier) ReleaseLeases(ctx context.Context) {
713+
for _, tu := range lww.tombstoneUpdaters {
714+
tu.ReleaseLeases(ctx)
715+
}
716+
}
717+
675718
const (
676719
insertQueryOptimistic = `INSERT INTO [%d AS t] (%s) VALUES (%s)`
677720
insertQueryPessimistic = `

pkg/crosscluster/logical/lww_row_processor_test.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
2121
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
2222
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
23+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
2324
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
2425
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
2526
"github.com/cockroachdb/cockroach/pkg/sql/isql"
@@ -79,7 +80,7 @@ func TestLWWInsertQueryGeneration(t *testing.T) {
7980
return tableName
8081
}
8182

82-
setup := func(t *testing.T, schemaTmpl string) (*sqlRowProcessor, func(...interface{}) roachpb.KeyValue) {
83+
setup := func(t *testing.T, schemaTmpl string) (*sqlRowProcessor, func(...interface{}) roachpb.KeyValue, func()) {
8384
tableNameSrc := createTable(t, schemaTmpl)
8485
tableNameDst := createTable(t, schemaTmpl)
8586
srcDesc := desctestutils.TestingGetPublicTableDescriptor(s.DB(), s.Codec(), "defaultdb", tableNameSrc)
@@ -89,20 +90,23 @@ func TestLWWInsertQueryGeneration(t *testing.T) {
8990
dstDesc.GetID(): {
9091
srcDesc: srcDesc,
9192
},
92-
}, jobspb.JobID(1), s.InternalDB().(descs.DB), s.InternalExecutor().(isql.Executor), sd, execinfrapb.LogicalReplicationWriterSpec{})
93+
}, jobspb.JobID(1), s.InternalDB().(descs.DB), s.InternalExecutor().(isql.Executor), sd, execinfrapb.LogicalReplicationWriterSpec{}, s.Codec(), s.LeaseManager().(*lease.Manager))
9394
require.NoError(t, err)
9495
return rp, func(datums ...interface{}) roachpb.KeyValue {
95-
kv := replicationtestutils.EncodeKV(t, s.Codec(), srcDesc, datums...)
96-
kv.Value.Timestamp = hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
97-
return kv
98-
}
96+
kv := replicationtestutils.EncodeKV(t, s.Codec(), srcDesc, datums...)
97+
kv.Value.Timestamp = hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
98+
return kv
99+
}, func() {
100+
rp.ReleaseLeases(ctx)
101+
}
99102
}
100103

101104
for _, tc := range testCases {
102105
t.Run(fmt.Sprintf("%s/insert", tc.name), func(t *testing.T) {
103106
runner.Exec(t, "SET CLUSTER SETTING logical_replication.consumer.try_optimistic_insert.enabled=true")
104107
defer runner.Exec(t, "RESET CLUSTER SETTING logical_replication.consumer.try_optimistic_insert.enabled")
105-
rp, encoder := setup(t, tc.schemaTmpl)
108+
rp, encoder, cleanup := setup(t, tc.schemaTmpl)
109+
defer cleanup()
106110
keyValue := encoder(tc.row...)
107111
require.NoError(t, s.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
108112
_, err := rp.ProcessRow(ctx, txn, keyValue, roachpb.Value{})
@@ -112,15 +116,17 @@ func TestLWWInsertQueryGeneration(t *testing.T) {
112116
t.Run(fmt.Sprintf("%s/insert-without-optimistic-insert", tc.name), func(t *testing.T) {
113117
runner.Exec(t, "SET CLUSTER SETTING logical_replication.consumer.try_optimistic_insert.enabled=false")
114118
defer runner.Exec(t, "RESET CLUSTER SETTING logical_replication.consumer.try_optimistic_insert.enabled")
115-
rp, encoder := setup(t, tc.schemaTmpl)
119+
rp, encoder, cleanup := setup(t, tc.schemaTmpl)
120+
defer cleanup()
116121
keyValue := encoder(tc.row...)
117122
require.NoError(t, s.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
118123
_, err := rp.ProcessRow(ctx, txn, keyValue, roachpb.Value{})
119124
return err
120125
}))
121126
})
122127
t.Run(fmt.Sprintf("%s/delete", tc.name), func(t *testing.T) {
123-
rp, encoder := setup(t, tc.schemaTmpl)
128+
rp, encoder, cleanup := setup(t, tc.schemaTmpl)
129+
defer cleanup()
124130
keyValue := encoder(tc.row...)
125131
keyValue.Value.RawBytes = nil
126132
require.NoError(t, s.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
@@ -157,7 +163,7 @@ func BenchmarkLWWInsertBatch(b *testing.B) {
157163
desc.GetID(): {
158164
srcDesc: desc,
159165
},
160-
}, jobspb.JobID(1), s.InternalDB().(descs.DB), s.InternalDB().(isql.DB).Executor(isql.WithSessionData(sd)), sd, execinfrapb.LogicalReplicationWriterSpec{})
166+
}, jobspb.JobID(1), s.InternalDB().(descs.DB), s.InternalDB().(isql.DB).Executor(isql.WithSessionData(sd)), sd, execinfrapb.LogicalReplicationWriterSpec{}, s.Codec(), s.LeaseManager().(*lease.Manager))
161167
require.NoError(b, err)
162168

163169
// In some configs, we'll be simulating processing the same INSERT over and
@@ -345,7 +351,7 @@ func TestLWWConflictResolution(t *testing.T) {
345351
dstDesc.GetID(): {
346352
srcDesc: srcDesc,
347353
},
348-
}, jobspb.JobID(1), s.InternalDB().(descs.DB), s.InternalExecutor().(isql.Executor), sd, execinfrapb.LogicalReplicationWriterSpec{})
354+
}, jobspb.JobID(1), s.InternalDB().(descs.DB), s.InternalExecutor().(isql.Executor), sd, execinfrapb.LogicalReplicationWriterSpec{}, s.Codec(), s.LeaseManager().(*lease.Manager))
349355
require.NoError(t, err)
350356

351357
if useKVProc {

0 commit comments

Comments
 (0)