Skip to content

Commit b622042

Browse files
committed
logical: set origin id in tombstone updater
Previously, the tombstone updater did not set an origin id when updating a tombstone. One critical use of the origin id is its used to filter LDR replication events. Events with an origin id are not replicated because replicating a replication write causes an infinite replication loop. Fixes: #146008 Release note: none
1 parent c8e3617 commit b622042

File tree

3 files changed

+87
-0
lines changed

3 files changed

+87
-0
lines changed

pkg/crosscluster/logical/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ go_test(
132132
"sql_row_reader_test.go",
133133
"sql_row_writer_test.go",
134134
"table_batch_handler_test.go",
135+
"tombstone_updater_test.go",
135136
"udf_row_processor_test.go",
136137
],
137138
data = glob(["testdata/**"]) + [

pkg/crosscluster/logical/tombstone_updater.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ func (tu *tombstoneUpdater) updateTombstone(
9797
// If updateTombstone is called in a transaction, create and run a batch
9898
// in the transaction.
9999
batch := txn.KV().NewBatch()
100+
batch.Header.WriteOptions = originID1Options
100101
if err := tu.addToBatch(ctx, txn.KV(), batch, mvccTimestamp, afterRow); err != nil {
101102
return err
102103
}
@@ -106,6 +107,7 @@ func (tu *tombstoneUpdater) updateTombstone(
106107
// 1pc transaction.
107108
return tu.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
108109
batch := txn.NewBatch()
110+
batch.Header.WriteOptions = originID1Options
109111
if err := tu.addToBatch(ctx, txn, batch, mvccTimestamp, afterRow); err != nil {
110112
return err
111113
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package logical
7+
8+
import (
9+
"context"
10+
"testing"
11+
12+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
13+
"github.com/cockroachdb/cockroach/pkg/sql"
14+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
15+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
16+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
17+
"github.com/cockroachdb/cockroach/pkg/sql/isql"
18+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
19+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
20+
"github.com/cockroachdb/cockroach/pkg/util/log"
21+
"github.com/stretchr/testify/require"
22+
)
23+
24+
func TestTombstoneUpdaterSetsOriginID(t *testing.T) {
25+
defer leaktest.AfterTest(t)()
26+
defer log.Scope(t).Close(t)
27+
28+
// This is a regression test for a bug in the tombstone updater. The
29+
// tombstone updater should always set the origin ID. Previously, it would
30+
// not set the origin id which caused logical replication to pick up the
31+
// tombstone update as a deletion event. This is undesirable because the
32+
// tombstone update case is only used when replicating deletes and if a
33+
// replicated write generates an LDR event, it leads to looping.
34+
35+
// Start server with two databases
36+
ctx := context.Background()
37+
server, s, runners, dbNames := setupServerWithNumDBs(t, ctx, testClusterBaseClusterArgs, 1, 2)
38+
defer server.Stopper().Stop(ctx)
39+
40+
// Create test table on both databases
41+
destRunner := runners[1]
42+
43+
// Create a tombstone updater
44+
desc := desctestutils.TestingGetMutableExistingTableDescriptor(
45+
s.DB(), s.Codec(), dbNames[0], "tab")
46+
sd := sql.NewInternalSessionData(ctx, s.ClusterSettings(), "" /* opName */)
47+
tu := newTombstoneUpdater(s.Codec(), s.DB(), s.LeaseManager().(*lease.Manager), desc.GetID(), sd, s.ClusterSettings())
48+
defer tu.ReleaseLeases(ctx)
49+
50+
// Set up 1 way logical replication. The replication stream is used to ensure
51+
// that the tombstone update will not be replicated as a deletion event.
52+
var jobID jobspb.JobID
53+
destRunner.QueryRow(t,
54+
"CREATE LOGICAL REPLICATION STREAM FROM TABLE a.tab ON $1 INTO TABLE b.tab",
55+
GetPGURLs(t, s, dbNames)[0].String()).Scan(&jobID)
56+
57+
// Write the row to the destination
58+
destRunner.Exec(t, "INSERT INTO tab VALUES (1, 42)")
59+
60+
row := tree.Datums{
61+
tree.NewDInt(tree.DInt(1)), // k
62+
tree.DNull, // v (deleted)
63+
}
64+
65+
_, err := tu.updateTombstone(ctx, nil, s.Clock().Now(), row)
66+
require.NoError(t, err)
67+
68+
config := s.ExecutorConfig().(sql.ExecutorConfig)
69+
err = sql.DescsTxn(ctx, &config, func(
70+
ctx context.Context, txn isql.Txn, descriptors *descs.Collection,
71+
) error {
72+
_, err = tu.updateTombstone(ctx, txn, s.Clock().Now(), row)
73+
return err
74+
})
75+
require.NoError(t, err)
76+
77+
// Wait for replication to advance
78+
WaitUntilReplicatedTime(t, s.Clock().Now(), destRunner, jobID)
79+
80+
// Verify the row still exists in the destination
81+
destRunner.CheckQueryResults(t, "SELECT pk, payload FROM tab", [][]string{
82+
{"1", "42"},
83+
})
84+
}

0 commit comments

Comments
 (0)