Skip to content

Commit ab8a53b

Browse files
committed
sql: always use cput for ldr primary key writes
This change reworks the SQL layer so that it always uses a CPUT with an origin timestamp if the LDR origin timestamp option is set. This change allows the classic and crud SQL writers to correctly implement LWW in the presence of tombstones. Note: the classic SQL writer only depends on the CPUT when inserting or upserting over a tombstone. The crud SQL writer relies on the CPut of inserts, updates, and deletes. Release note: none Fixes: #146117
1 parent 75c5230 commit ab8a53b

26 files changed

+84
-48
lines changed

pkg/crosscluster/logical/batch_handler_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
3232
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
3333
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
34-
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
3534
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
3635
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3736
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
@@ -198,17 +197,13 @@ func TestBatchHandlerExhaustiveSQL(t *testing.T) {
198197
defer leaktest.AfterTest(t)()
199198
defer log.Scope(t).Close(t)
200199

201-
skip.WithIssue(t, 146117)
202-
203200
testBatchHandlerExhaustive(t, newSqlBatchHandler)
204201
}
205202

206203
func TestBatchHandlerExhaustiveCrud(t *testing.T) {
207204
defer leaktest.AfterTest(t)()
208205
defer log.Scope(t).Close(t)
209206

210-
skip.WithIssue(t, 146117)
211-
212207
testBatchHandlerExhaustive(t, newCrudBatchHandler)
213208
}
214209

pkg/crosscluster/logical/lww_kv_processor.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,7 @@ func (p *kvTableWriter) insertRow(ctx context.Context, b *kv.Batch, after cdceve
493493
// TODO(dt): support partial indexes.
494494
var vh row.VectorIndexUpdateHelper
495495
// TODO(mw5h, drewk): support vector indexes.
496-
oth := &row.OriginTimestampCPutHelper{
496+
oth := row.OriginTimestampCPutHelper{
497497
OriginTimestamp: after.MvccTimestamp,
498498
// TODO(ssd): We should choose this based by comparing the cluster IDs of the source
499499
// and destination clusters.
@@ -516,7 +516,7 @@ func (p *kvTableWriter) updateRow(
516516
// TODO(dt): support partial indexes.
517517
var vh row.VectorIndexUpdateHelper
518518
// TODO(mw5h, drewk): support vector indexes.
519-
oth := &row.OriginTimestampCPutHelper{
519+
oth := row.OriginTimestampCPutHelper{
520520
OriginTimestamp: after.MvccTimestamp,
521521
// TODO(ssd): We should choose this based by comparing the cluster IDs of the source
522522
// and destination clusters.
@@ -540,7 +540,7 @@ func (p *kvTableWriter) deleteRow(
540540
// TODO(dt): support partial indexes.
541541
var vh row.VectorIndexUpdateHelper
542542
// TODO(mw5h, drewk): support vector indexes.
543-
oth := &row.OriginTimestampCPutHelper{
543+
oth := row.OriginTimestampCPutHelper{
544544
PreviousWasDeleted: before.IsDeleted(),
545545
OriginTimestamp: after.MvccTimestamp,
546546
// TODO(ssd): We should choose this based by comparing the cluster IDs of the source

pkg/crosscluster/logical/lww_row_processor.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -897,7 +897,8 @@ DELETE FROM [%d as t] WHERE %s
897897
AND ((t.crdb_internal_mvcc_timestamp < $%[3]d
898898
AND t.crdb_internal_origin_timestamp IS NULL)
899899
OR (t.crdb_internal_origin_timestamp < $%[3]d
900-
AND t.crdb_internal_origin_timestamp IS NOT NULL))`
900+
AND t.crdb_internal_origin_timestamp IS NOT NULL))
901+
RETURNING *`
901902
stmt, err := parser.ParseOne(
902903
fmt.Sprintf(baseQuery, dstTableDescID, whereClause.String(), originTSIdx))
903904
if err != nil {

pkg/crosscluster/logical/replication_statements.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,8 +254,11 @@ func newDeleteStatement(
254254
TableID: int64(table.GetID()),
255255
As: tree.AliasClause{Alias: "replication_target"},
256256
},
257-
Where: &tree.Where{Type: tree.AstWhere, Expr: whereClause},
258-
Returning: tree.AbsentReturningClause,
257+
Where: &tree.Where{Type: tree.AstWhere, Expr: whereClause},
258+
// NOTE: we use RETURNING * to ensure that every column in the table is decoded.
259+
// This ensures that the Deleter can reconstruct the previous value when generating
260+
// the cput to update the primary key.
261+
Returning: &tree.ReturningExprs{tree.StarSelectExpr()},
259262
}
260263

261264
return toParsedStatement(delete)

pkg/crosscluster/logical/testdata/ldr_statements

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ UPDATE [104 AS replication_target] SET id = $5::INT8, name = $6::STRING, value =
1818

1919
show-delete table=basic_table
2020
----
21-
DELETE FROM [104 AS replication_target] WHERE (((id = $1::INT8) AND (name IS NOT DISTINCT FROM $2::STRING)) AND (value IS NOT DISTINCT FROM $3::INT8)) AND (data IS NOT DISTINCT FROM $4::BYTES)
21+
DELETE FROM [104 AS replication_target] WHERE (((id = $1::INT8) AND (name IS NOT DISTINCT FROM $2::STRING)) AND (value IS NOT DISTINCT FROM $3::INT8)) AND (data IS NOT DISTINCT FROM $4::BYTES) RETURNING *
2222

2323
show-select table=basic_table
2424
----
@@ -52,7 +52,7 @@ UPDATE [107 AS replication_target] SET id = $7::INT8, title = $8::STRING, descri
5252

5353
show-delete table=tasks
5454
----
55-
DELETE FROM [107 AS replication_target] WHERE (((((id = $1::INT8) AND (title IS NOT DISTINCT FROM $2::STRING)) AND (description IS NOT DISTINCT FROM $3::STRING)) AND (status IS NOT DISTINCT FROM $4::@100105)) AND (priority IS NOT DISTINCT FROM $5::INT8)) AND (created_at IS NOT DISTINCT FROM $6::TIMESTAMP)
55+
DELETE FROM [107 AS replication_target] WHERE (((((id = $1::INT8) AND (title IS NOT DISTINCT FROM $2::STRING)) AND (description IS NOT DISTINCT FROM $3::STRING)) AND (status IS NOT DISTINCT FROM $4::@100105)) AND (priority IS NOT DISTINCT FROM $5::INT8)) AND (created_at IS NOT DISTINCT FROM $6::TIMESTAMP) RETURNING *
5656

5757
show-select table=tasks
5858
----
@@ -86,7 +86,7 @@ UPDATE [108 AS replication_target] SET id = $7::INT8, name = $8::STRING, unit_pr
8686
# NOTE: total_price and discount_price are not included since they are computed.[
8787
show-delete table=products
8888
----
89-
DELETE FROM [108 AS replication_target] WHERE ((((id = $1::INT8) AND (name IS NOT DISTINCT FROM $2::STRING)) AND (unit_price IS NOT DISTINCT FROM $3::DECIMAL(10,2))) AND (quantity IS NOT DISTINCT FROM $4::INT8)) AND (last_updated IS NOT DISTINCT FROM $6::TIMESTAMP)
89+
DELETE FROM [108 AS replication_target] WHERE ((((id = $1::INT8) AND (name IS NOT DISTINCT FROM $2::STRING)) AND (unit_price IS NOT DISTINCT FROM $3::DECIMAL(10,2))) AND (quantity IS NOT DISTINCT FROM $4::INT8)) AND (last_updated IS NOT DISTINCT FROM $6::TIMESTAMP) RETURNING *
9090

9191
# NOTE: total_price is not included because it is a computed column, but
9292
# discount_price is included because its part of the primary key.
@@ -137,7 +137,7 @@ UPDATE [109 AS replication_target] SET id = $8::INT8, first_name = $9::STRING, l
137137

138138
show-delete table=employees
139139
----
140-
DELETE FROM [109 AS replication_target] WHERE ((((((id = $1::INT8) AND (first_name IS NOT DISTINCT FROM $2::STRING)) AND (last_name IS NOT DISTINCT FROM $3::STRING)) AND (email IS NOT DISTINCT FROM $4::STRING)) AND (salary IS NOT DISTINCT FROM $5::DECIMAL(12,2))) AND (department IS NOT DISTINCT FROM $6::STRING)) AND (hire_date IS NOT DISTINCT FROM $7::DATE)
140+
DELETE FROM [109 AS replication_target] WHERE ((((((id = $1::INT8) AND (first_name IS NOT DISTINCT FROM $2::STRING)) AND (last_name IS NOT DISTINCT FROM $3::STRING)) AND (email IS NOT DISTINCT FROM $4::STRING)) AND (salary IS NOT DISTINCT FROM $5::DECIMAL(12,2))) AND (department IS NOT DISTINCT FROM $6::STRING)) AND (hire_date IS NOT DISTINCT FROM $7::DATE) RETURNING *
141141

142142
show-select table=employees
143143
----
@@ -171,7 +171,7 @@ UPDATE [112 AS replication_target] SET id = $7::UUID, user_id = $8::INT8, event_
171171

172172
show-delete table=user_events
173173
----
174-
DELETE FROM [112 AS replication_target] WHERE (((((id = $1::UUID) AND (user_id IS NOT DISTINCT FROM $2::INT8)) AND (event_type IS NOT DISTINCT FROM $3::STRING)) AND (event_data IS NOT DISTINCT FROM $4::JSONB)) AND (created_at IS NOT DISTINCT FROM $5::TIMESTAMP)) AND (region = $6::@100110)
174+
DELETE FROM [112 AS replication_target] WHERE (((((id = $1::UUID) AND (user_id IS NOT DISTINCT FROM $2::INT8)) AND (event_type IS NOT DISTINCT FROM $3::STRING)) AND (event_data IS NOT DISTINCT FROM $4::JSONB)) AND (created_at IS NOT DISTINCT FROM $5::TIMESTAMP)) AND (region = $6::@100110) RETURNING *
175175

176176
show-select table=user_events
177177
----

pkg/crosscluster/logical/tombstone_updater.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func (tu *tombstoneUpdater) addToBatch(
144144
afterRow,
145145
ph,
146146
vh,
147-
&row.OriginTimestampCPutHelper{
147+
row.OriginTimestampCPutHelper{
148148
OriginTimestamp: mvccTimestamp,
149149
PreviousWasDeleted: true,
150150
},

pkg/sql/backfill/backfill.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,8 +404,9 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
404404
// VectorIndexUpdateHelper in this case.
405405
var pm row.PartialIndexUpdateHelper
406406
var vh row.VectorIndexUpdateHelper
407+
var oth row.OriginTimestampCPutHelper
407408
if _, err := ru.UpdateRow(
408-
ctx, b, oldValues, updateValues, pm, vh, nil, false /* mustValidateOldPKValues */, traceKV,
409+
ctx, b, oldValues, updateValues, pm, vh, oth, false /* mustValidateOldPKValues */, traceKV,
409410
); err != nil {
410411
return roachpb.Key{}, err
411412
}

pkg/sql/colenc/encode_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -619,8 +619,9 @@ func buildRowKVs(
619619
p := &capturePutter{}
620620
var pm row.PartialIndexUpdateHelper
621621
var vh row.VectorIndexUpdateHelper
622+
var oth row.OriginTimestampCPutHelper
622623
for _, d := range datums {
623-
if err := inserter.InsertRow(context.Background(), p, d, pm, vh, nil, row.CPutOp, true /* traceKV */); err != nil {
624+
if err := inserter.InsertRow(context.Background(), p, d, pm, vh, oth, row.CPutOp, true /* traceKV */); err != nil {
624625
return kvs{}, err
625626
}
626627
}

pkg/sql/create_table.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,8 @@ func (n *createTableNode) startExec(params runParams) error {
614614
// indexes, partial, vector, or otherwise, to update.
615615
var pm row.PartialIndexUpdateHelper
616616
var vh row.VectorIndexUpdateHelper
617-
if err := ti.row(params.ctx, rowBuffer, pm, vh, params.extendedEvalCtx.Tracing.KVTracingEnabled()); err != nil {
617+
var oth row.OriginTimestampCPutHelper
618+
if err := ti.row(params.ctx, rowBuffer, pm, vh, oth, params.extendedEvalCtx.Tracing.KVTracingEnabled()); err != nil {
618619
return err
619620
}
620621
}

pkg/sql/delete.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,19 @@ type deleteRun struct {
6363
numPassthrough int
6464

6565
mustValidateOldPKValues bool
66+
67+
originTimestampCPutHelper row.OriginTimestampCPutHelper
6668
}
6769

68-
func (r *deleteRun) initRowContainer(params runParams, columns colinfo.ResultColumns) {
70+
func (r *deleteRun) init(params runParams, columns colinfo.ResultColumns) {
71+
if ots := params.extendedEvalCtx.SessionData().OriginTimestampForLogicalDataReplication; ots.IsSet() {
72+
r.originTimestampCPutHelper.OriginTimestamp = ots
73+
}
74+
6975
if !r.rowsNeeded {
7076
return
7177
}
78+
7279
r.td.rows = rowcontainer.NewRowContainer(
7380
params.p.Mon().MakeBoundAccount(),
7481
colinfo.ColTypeInfoFromResCols(columns),
@@ -83,7 +90,7 @@ func (d *deleteNode) startExec(params runParams) error {
8390
// cache traceKV during execution, to avoid re-evaluating it for every row.
8491
d.run.traceKV = params.p.ExtendedEvalContext().Tracing.KVTracingEnabled()
8592

86-
d.run.initRowContainer(params, d.columns)
93+
d.run.init(params, d.columns)
8794

8895
return d.run.td.init(params.ctx, params.p.txn, params.EvalContext())
8996
}
@@ -189,7 +196,7 @@ func (r *deleteRun) processSourceRow(params runParams, sourceVals tree.Datums) e
189196

190197
// Queue the deletion in the KV batch.
191198
if err := r.td.row(
192-
params.ctx, deleteVals, pm, vh, r.mustValidateOldPKValues, r.traceKV,
199+
params.ctx, deleteVals, pm, vh, r.originTimestampCPutHelper, r.mustValidateOldPKValues, r.traceKV,
193200
); err != nil {
194201
return err
195202
}

0 commit comments

Comments
 (0)