Skip to content

Commit 691f423

Browse files
committed
logical: have tombstone updater accept datums
Previously, the tombstone deleter accepted a cdcevent.Row. Now, it accepts datums derived from the cdcevent.Row. This allows the tombstone updater to be used by the crud sql writer which internally expects datums. This also removes an extra `isLwwLoser(err)` check that caused the tombstone updater to silently drop errors. This bug was caught by testing in #143988. Release Note: none Epic: CRDB-48647
1 parent 3ae1c3c commit 691f423

File tree

2 files changed

+25
-38
lines changed

2 files changed

+25
-38
lines changed

pkg/crosscluster/logical/lww_row_processor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -711,7 +711,7 @@ func (lww *lwwQuerier) DeleteRow(
711711
// NOTE: at this point we don't know if we are updating a tombstone or if
712712
// we are losing LWW. As long as it is a LWW loss or a tombstone update,
713713
// updateTombstone will return okay.
714-
return lww.tombstoneUpdaters[row.TableID].updateTombstone(ctx, txn, row)
714+
return lww.tombstoneUpdaters[row.TableID].updateTombstoneAny(ctx, txn, row.MvccTimestamp, datums)
715715
}
716716
return batchStats{}, nil
717717
}

pkg/crosscluster/logical/tombstone_updater.go

Lines changed: 24 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package logical
88
import (
99
"context"
1010

11-
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
1211
"github.com/cockroachdb/cockroach/pkg/keys"
1312
"github.com/cockroachdb/cockroach/pkg/kv"
1413
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
@@ -19,6 +18,7 @@ import (
1918
"github.com/cockroachdb/cockroach/pkg/sql/row"
2019
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2120
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
21+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2222
)
2323

2424
// tombstoneUpdater is a helper for updating the mvcc origin timestamp assigned
@@ -40,12 +40,8 @@ type tombstoneUpdater struct {
4040
// deleter is a row.Deleter that uses the leased descriptor. Callers should
4141
// use getDeleter to ensure the lease is valid for the current transaction.
4242
deleter row.Deleter
43-
// columns are the name of the columns that are expected in the cdc row.
44-
columns []string
4543
}
4644

47-
// scratch is a scratch buffer for the tombstone updater. This is reused
48-
// across calls to addToBatch to avoid allocations.
4945
scratch []tree.Datum
5046
}
5147

@@ -56,7 +52,6 @@ func (c *tombstoneUpdater) ReleaseLeases(ctx context.Context) {
5652
c.leased.descriptor.Release(ctx)
5753
c.leased.descriptor = nil
5854
c.leased.deleter = row.Deleter{}
59-
c.leased.columns = c.leased.columns[:0]
6055
}
6156
}
6257

@@ -78,19 +73,31 @@ func newTombstoneUpdater(
7873
}
7974
}
8075

76+
// updateTombstoneAny is an `updateTombstone` wrapper that accepts the []any
77+
// datum slice from the original sql writer's datum builder.
78+
func (tu *tombstoneUpdater) updateTombstoneAny(
79+
ctx context.Context, txn isql.Txn, mvccTimestamp hlc.Timestamp, datums []any,
80+
) (batchStats, error) {
81+
tu.scratch = tu.scratch[:0]
82+
for _, datum := range datums {
83+
tu.scratch = append(tu.scratch, datum.(tree.Datum))
84+
}
85+
return tu.updateTombstone(ctx, txn, mvccTimestamp, tu.scratch)
86+
}
87+
8188
// updateTombstone attempts to update the tombstone for the given row. This is
8289
// expected to always succeed. The delete will only return zero rows if the
8390
// operation loses LWW or the row does not exist. So if the cput fails on a
8491
// condition, it should also fail on LWW, which is treated as a success.
8592
func (tu *tombstoneUpdater) updateTombstone(
86-
ctx context.Context, txn isql.Txn, afterRow cdcevent.Row,
93+
ctx context.Context, txn isql.Txn, mvccTimestamp hlc.Timestamp, afterRow []tree.Datum,
8794
) (batchStats, error) {
8895
err := func() error {
8996
if txn != nil {
9097
// If updateTombstone is called in a transaction, create and run a batch
9198
// in the transaction.
9299
batch := txn.KV().NewBatch()
93-
if err := tu.addToBatch(ctx, txn.KV(), batch, afterRow); err != nil {
100+
if err := tu.addToBatch(ctx, txn.KV(), batch, mvccTimestamp, afterRow); err != nil {
94101
return err
95102
}
96103
return txn.KV().Run(ctx, batch)
@@ -99,13 +106,13 @@ func (tu *tombstoneUpdater) updateTombstone(
99106
// 1pc transaction.
100107
return tu.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
101108
batch := txn.NewBatch()
102-
if err := tu.addToBatch(ctx, txn, batch, afterRow); err != nil {
109+
if err := tu.addToBatch(ctx, txn, batch, mvccTimestamp, afterRow); err != nil {
103110
return err
104111
}
105112
return txn.CommitInBatch(ctx, batch)
106113
})
107114
}()
108-
if err != nil && isLwwLoser(err) {
115+
if err != nil {
109116
if isLwwLoser(err) {
110117
return batchStats{kvWriteTooOld: 1}, nil
111118
}
@@ -115,44 +122,28 @@ func (tu *tombstoneUpdater) updateTombstone(
115122
}
116123

117124
func (tu *tombstoneUpdater) addToBatch(
118-
ctx context.Context, txn *kv.Txn, batch *kv.Batch, afterRow cdcevent.Row,
125+
ctx context.Context,
126+
txn *kv.Txn,
127+
batch *kv.Batch,
128+
mvccTimestamp hlc.Timestamp,
129+
afterRow []tree.Datum,
119130
) error {
120131
deleter, err := tu.getDeleter(ctx, txn)
121132
if err != nil {
122133
return err
123134
}
124135

125-
tu.scratch = tu.scratch[:0]
126-
127-
// Note that the columns in the cdcevent row are decoded using a descriptor
128-
// for the source column, whereas the row.Deleter column list is initialized
129-
// using columns from the destination table. This is funky, but it works
130-
// because we validate LDR schemas are compatible.
131-
132-
datums, err := afterRow.DatumsNamed(tu.leased.columns)
133-
if err != nil {
134-
return err
135-
}
136-
if err := datums.Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
137-
tu.scratch = append(tu.scratch, d)
138-
return nil
139-
}); err != nil {
140-
return err
141-
}
142-
143-
// the index helpers are never really used since we are always updating a
144-
// tombstone.
145136
var ph row.PartialIndexUpdateHelper
146137
var vh row.VectorIndexUpdateHelper
147138

148139
return deleter.DeleteRow(
149140
ctx,
150141
batch,
151-
tu.scratch,
142+
afterRow,
152143
ph,
153144
vh,
154145
&row.OriginTimestampCPutHelper{
155-
OriginTimestamp: afterRow.MvccTimestamp,
146+
OriginTimestamp: mvccTimestamp,
156147
PreviousWasDeleted: true,
157148
},
158149
false, /* mustValidateOldPKValues */
@@ -176,10 +167,6 @@ func (tu *tombstoneUpdater) getDeleter(ctx context.Context, txn *kv.Txn) (row.De
176167
return row.Deleter{}, err
177168
}
178169

179-
for _, col := range cols {
180-
tu.leased.columns = append(tu.leased.columns, col.GetName())
181-
}
182-
183170
tu.leased.deleter = row.MakeDeleter(tu.codec, tu.leased.descriptor.Underlying().(catalog.TableDescriptor), nil /* lockedIndexes */, cols, tu.sd, &tu.settings.SV, nil /* metrics */)
184171
}
185172
if err := txn.UpdateDeadline(ctx, tu.leased.descriptor.Expiration(ctx)); err != nil {

0 commit comments

Comments
 (0)