Skip to content

Commit 8d3d95c

Browse files
craig[bot]michae2
andcommitted
Merge #143778
143778: row, sql: add CPut of primary index to Updater and Deleter r=yuzefovich a=michae2 The upcoming fast paths for UPDATE and DELETE will skip the initial scan of the target row. Instead, they will rely on CPut(s) to the primary index to confirm that the target row exists and matches the WHERE clause of the statement. This PR adds a new CPut path to the Updater and Deleter which uses expValue to validate the old values. It closely mirrors the CPut paths added for the OriginTimestampCPutHelper in #130512. The main difference is that unlike the OriginTimestampCPutHelper, these paths do not need to delete all-NULL non-0 column families when not overwriting. Only writes to the primary index need validation using CPut with expValue. Writes to secondary indexes should be unchanged. If the CPut to the primary index fails, all writes for the statement will be rolled back using a savepoint (next PR). Informs: #71153 Release note: None Co-authored-by: Michael Erickson <[email protected]>
2 parents 1128917 + 0d4c727 commit 8d3d95c

File tree

13 files changed

+108
-21
lines changed

13 files changed

+108
-21
lines changed

pkg/crosscluster/logical/lww_kv_processor.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,10 @@ func (p *kvTableWriter) updateRow(
503503
// and destination clusters.
504504
// ShouldWinTie: true,
505505
}
506-
_, err := p.ru.UpdateRow(ctx, b, p.oldVals, p.newVals, ph, vh, oth, false)
506+
_, err := p.ru.UpdateRow(
507+
ctx, b, p.oldVals, p.newVals, ph, vh, oth, false, /* mustValidateOldPKValues */
508+
false, /* traceKV */
509+
)
507510
return err
508511
}
509512

@@ -526,7 +529,9 @@ func (p *kvTableWriter) deleteRow(
526529
// ShouldWinTie: true,
527530
}
528531

529-
return p.rd.DeleteRow(ctx, b, p.oldVals, ph, vh, oth, false)
532+
return p.rd.DeleteRow(
533+
ctx, b, p.oldVals, ph, vh, oth, false /* mustValidateOldPKValues */, false, /* traceKV */
534+
)
530535
}
531536

532537
func (p *kvTableWriter) fillOld(vals cdcevent.Row) error {

pkg/crosscluster/logical/tombstone_updater.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ func (tu *tombstoneUpdater) addToBatch(
155155
OriginTimestamp: afterRow.MvccTimestamp,
156156
PreviousWasDeleted: true,
157157
},
158+
false, /* mustValidateOldPKValues */
158159
false, /* traceKV */
159160
)
160161
}

pkg/sql/backfill/backfill.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
402402
var pm row.PartialIndexUpdateHelper
403403
var vh row.VectorIndexUpdateHelper
404404
if _, err := ru.UpdateRow(
405-
ctx, b, oldValues, updateValues, pm, vh, nil, traceKV,
405+
ctx, b, oldValues, updateValues, pm, vh, nil, false /* mustValidateOldPKValues */, traceKV,
406406
); err != nil {
407407
return roachpb.Key{}, err
408408
}

pkg/sql/delete.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,9 @@ func (d *deleteNode) processSourceRow(params runParams, sourceVals tree.Datums)
171171
}
172172

173173
// Queue the deletion in the KV batch.
174-
if err := d.run.td.row(params.ctx, deleteVals, pm, vh, d.run.traceKV); err != nil {
174+
if err := d.run.td.row(
175+
params.ctx, deleteVals, pm, vh, false /* mustValidateOldPKValues */, d.run.traceKV,
176+
); err != nil {
175177
return err
176178
}
177179

pkg/sql/row/deleter.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ func (rd *Deleter) DeleteRow(
141141
pm PartialIndexUpdateHelper,
142142
vh VectorIndexUpdateHelper,
143143
oth *OriginTimestampCPutHelper,
144+
mustValidateOldPKValues bool,
144145
traceKV bool,
145146
) error {
146147
b := &KVBatchAdapter{Batch: batch}
@@ -197,7 +198,7 @@ func (rd *Deleter) DeleteRow(
197198
familyID := family.ID
198199
rd.key = keys.MakeFamilyKey(primaryIndexKey, uint32(familyID))
199200

200-
if oth.IsSet() {
201+
if oth.IsSet() || mustValidateOldPKValues {
201202
var expValue []byte
202203
if !oth.PreviousWasDeleted {
203204
prevValue, err := rd.encodeValueForPrimaryIndexFamily(family, values)
@@ -208,7 +209,11 @@ func (rd *Deleter) DeleteRow(
208209
expValue = prevValue.TagAndDataBytes()
209210
}
210211
}
211-
oth.DelWithCPut(ctx, b, &rd.key, expValue, traceKV)
212+
if oth.IsSet() {
213+
oth.DelWithCPut(ctx, b, &rd.key, expValue, traceKV)
214+
} else {
215+
delWithCPutFn(ctx, b, &rd.key, expValue, traceKV, rd.Helper.primIndexValDirs)
216+
}
212217
} else {
213218
delFn(ctx, b, &rd.key, !rd.primaryLocked /* needsLock */, traceKV, rd.Helper.primIndexValDirs)
214219
}

pkg/sql/row/helper.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,24 @@ func delFn(
494494
}
495495
}
496496

497+
func delWithCPutFn(
498+
ctx context.Context,
499+
b Putter,
500+
key *roachpb.Key,
501+
expVal []byte,
502+
traceKV bool,
503+
keyEncodingDirs []encoding.Direction,
504+
) {
505+
if traceKV {
506+
if keyEncodingDirs != nil {
507+
log.VEventf(ctx, 2, "CPut %s -> nil (delete)", keys.PrettyPrint(keyEncodingDirs, *key))
508+
} else {
509+
log.VEventf(ctx, 2, "CPut %s -> nil (delete)", *key)
510+
}
511+
}
512+
b.CPut(key, nil, expVal)
513+
}
514+
497515
func (rh *RowHelper) deleteIndexEntry(
498516
ctx context.Context,
499517
b Putter,
@@ -550,7 +568,10 @@ func (oh *OriginTimestampCPutHelper) CPutFn(
550568
traceKV bool,
551569
) {
552570
if traceKV {
553-
log.VEventfDepth(ctx, 1, 2, "CPutWithOriginTimestamp %s -> %s @ %s", *key, value.PrettyPrint(), oh.OriginTimestamp)
571+
log.VEventfDepth(
572+
ctx, 1, 2, "CPutWithOriginTimestamp %s -> %s (swap) @ %s", *key, value.PrettyPrint(),
573+
oh.OriginTimestamp,
574+
)
554575
}
555576
b.CPutWithOriginTimestamp(key, value, expVal, oh.OriginTimestamp)
556577
}
@@ -559,7 +580,9 @@ func (oh *OriginTimestampCPutHelper) DelWithCPut(
559580
ctx context.Context, b Putter, key *roachpb.Key, expVal []byte, traceKV bool,
560581
) {
561582
if traceKV {
562-
log.VEventfDepth(ctx, 1, 2, "CPutWithOriginTimestamp %s -> nil (delete) @ %s", key, oh.OriginTimestamp)
583+
log.VEventfDepth(
584+
ctx, 1, 2, "CPutWithOriginTimestamp %s -> nil (delete) @ %s", key, oh.OriginTimestamp,
585+
)
563586
}
564587
b.CPutWithOriginTimestamp(key, nil, expVal, oh.OriginTimestamp)
565588
}

pkg/sql/row/inserter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func (ri *Inserter) InsertRow(
199199
ri.valueBuf, err = prepareInsertOrUpdateBatch(
200200
ctx, b, &ri.Helper, primaryIndexKey, ri.InsertCols, values, ri.InsertColIDtoRowIndex,
201201
ri.InsertColIDtoRowIndex, &ri.key, &ri.value, ri.valueBuf, oth, nil, /* oldValues */
202-
kvOp, traceKV,
202+
kvOp, false /* mustValidateOldPKValues */, traceKV,
203203
)
204204
if err != nil {
205205
return err

pkg/sql/row/updater.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/cockroachdb/cockroach/pkg/util/deduplicate"
2626
"github.com/cockroachdb/cockroach/pkg/util/encoding"
2727
"github.com/cockroachdb/cockroach/pkg/util/intsets"
28+
"github.com/cockroachdb/cockroach/pkg/util/log"
2829
"github.com/cockroachdb/errors"
2930
)
3031

@@ -241,6 +242,7 @@ func (ru *Updater) UpdateRow(
241242
pm PartialIndexUpdateHelper,
242243
vh VectorIndexUpdateHelper,
243244
oth *OriginTimestampCPutHelper,
245+
mustValidateOldPKValues bool,
244246
traceKV bool,
245247
) ([]tree.Datum, error) {
246248
if len(oldValues) != len(ru.FetchCols) {
@@ -396,7 +398,9 @@ func (ru *Updater) UpdateRow(
396398
// followed by a CPut we could skip the Del altogether. Furthermore, if
397399
// we acquired the lock on this index during the initial scan we could
398400
// replace a CPut with a Put.
399-
if err := ru.rd.DeleteRow(ctx, batch, oldValues, pm, vh, oth, traceKV); err != nil {
401+
if err := ru.rd.DeleteRow(
402+
ctx, batch, oldValues, pm, vh, oth, mustValidateOldPKValues, traceKV,
403+
); err != nil {
400404
return nil, err
401405
}
402406
if err := ru.ri.InsertRow(
@@ -418,7 +422,7 @@ func (ru *Updater) UpdateRow(
418422
ru.valueBuf, err = prepareInsertOrUpdateBatch(
419423
ctx, b, &ru.Helper, primaryIndexKey, ru.FetchCols, ru.newValues, ru.FetchColIDtoRowIndex,
420424
ru.UpdateColIDtoRowIndex, &ru.key, &ru.value, ru.valueBuf, oth, oldValues,
421-
kvOp, traceKV,
425+
kvOp, mustValidateOldPKValues, traceKV,
422426
)
423427
if err != nil {
424428
return nil, err
@@ -658,3 +662,21 @@ func (ru *Updater) IsColumnOnlyUpdate() bool {
658662
// operations) and these should be split.
659663
return !ru.primaryKeyColChange && ru.DeleteHelper == nil && len(ru.Helper.Indexes) == 0
660664
}
665+
666+
func updateCPutFn(
667+
ctx context.Context,
668+
b Putter,
669+
key *roachpb.Key,
670+
value *roachpb.Value,
671+
expVal []byte,
672+
traceKV bool,
673+
keyEncodingDirs []encoding.Direction,
674+
) {
675+
if traceKV {
676+
log.VEventfDepth(
677+
ctx, 1, 2, "CPut %s -> %s (swap)", keys.PrettyPrint(keyEncodingDirs, *key),
678+
value.PrettyPrint(),
679+
)
680+
}
681+
b.CPut(key, value, expVal)
682+
}

pkg/sql/row/writer.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ func ColMapping(fromCols, toCols []catalog.Column) []int {
7979
// capacity to avoid allocations. The function returns the slice.
8080
// - kvOp indicates which KV write operation should be used. If it is PutOp,
8181
// it also indicates that the old keys have been locked.
82+
// - mustValidateOldPKValues indicates whether the expected previous row must
83+
// be verified (using CPut)
8284
// - traceKV is to be set to log the KV operations added to the batch.
8385
func prepareInsertOrUpdateBatch(
8486
ctx context.Context,
@@ -95,6 +97,7 @@ func prepareInsertOrUpdateBatch(
9597
oth *OriginTimestampCPutHelper,
9698
oldValues []tree.Datum,
9799
kvOp KVInsertOp,
100+
mustValidateOldPKValues bool,
98101
traceKV bool,
99102
) ([]byte, error) {
100103
families := helper.TableDesc.GetFamilies()
@@ -182,7 +185,7 @@ func prepareInsertOrUpdateBatch(
182185
}
183186

184187
var oldVal []byte
185-
if oth.IsSet() && len(oldValues) > 0 {
188+
if (oth.IsSet() || mustValidateOldPKValues) && len(oldValues) > 0 {
186189
// If the column could be composite, we only encode the old value if it
187190
// was a composite value.
188191
if !couldBeComposite || oldValues[idx].(tree.CompositeDatum).IsComposite() {
@@ -204,8 +207,12 @@ func prepareInsertOrUpdateBatch(
204207
} else if overwrite {
205208
// If the new family contains a NULL value, then we must
206209
// delete any pre-existing row.
207-
needsLock := !oldKeysLocked
208-
delFn(ctx, batch, kvKey, needsLock, traceKV, helper.primIndexValDirs)
210+
if mustValidateOldPKValues {
211+
delWithCPutFn(ctx, batch, kvKey, oldVal, traceKV, helper.primIndexValDirs)
212+
} else {
213+
needsLock := !oldKeysLocked
214+
delFn(ctx, batch, kvKey, needsLock, traceKV, helper.primIndexValDirs)
215+
}
209216
}
210217
} else {
211218
// We only output non-NULL values. Non-existent column keys are
@@ -217,6 +224,8 @@ func prepareInsertOrUpdateBatch(
217224

218225
if oth.IsSet() {
219226
oth.CPutFn(ctx, batch, kvKey, &marshaled, oldVal, traceKV)
227+
} else if mustValidateOldPKValues {
228+
updateCPutFn(ctx, batch, kvKey, &marshaled, oldVal, traceKV, helper.primIndexValDirs)
220229
} else {
221230
// TODO(yuzefovich): in case of multiple column families,
222231
// whenever we locked the primary index during the initial
@@ -254,7 +263,7 @@ func prepareInsertOrUpdateBatch(
254263
// If we are using OriginTimestamp ConditionalPuts, calculate the expected
255264
// value.
256265
var expBytes []byte
257-
if oth.IsSet() && len(oldValues) > 0 {
266+
if (oth.IsSet() || mustValidateOldPKValues) && len(oldValues) > 0 {
258267
var oldBytes []byte
259268
oldBytes, err = helper.encodePrimaryIndexValuesToBuf(oldValues, valColIDMapping, familySortedColumnIDs, fetchedCols, oldBytes)
260269
if err != nil {
@@ -277,8 +286,12 @@ func prepareInsertOrUpdateBatch(
277286
} else if overwrite {
278287
// The family might have already existed but every column in it is being
279288
// set to NULL, so delete it.
280-
needsLock := !oldKeysLocked
281-
delFn(ctx, batch, kvKey, needsLock, traceKV, helper.primIndexValDirs)
289+
if mustValidateOldPKValues {
290+
delWithCPutFn(ctx, batch, kvKey, expBytes, traceKV, helper.primIndexValDirs)
291+
} else {
292+
needsLock := !oldKeysLocked
293+
delFn(ctx, batch, kvKey, needsLock, traceKV, helper.primIndexValDirs)
294+
}
282295
}
283296
} else {
284297
// Copy the contents of rawValueBuf into the roachpb.Value. This is
@@ -290,6 +303,8 @@ func prepareInsertOrUpdateBatch(
290303
}
291304
if oth.IsSet() {
292305
oth.CPutFn(ctx, batch, kvKey, kvValue, expBytes, traceKV)
306+
} else if mustValidateOldPKValues {
307+
updateCPutFn(ctx, batch, kvKey, kvValue, expBytes, traceKV, helper.primIndexValDirs)
293308
} else {
294309
putFn(ctx, batch, kvKey, kvValue, traceKV, helper.primIndexValDirs)
295310
}

pkg/sql/tablewriter_delete.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ func (td *tableDeleter) init(_ context.Context, txn *kv.Txn, evalCtx *eval.Conte
4343
// of the table, and are materialized only for the purpose of updating vector
4444
// indexes.
4545
//
46+
// The mustValidateOldPKValues parameter indicates whether the expected previous
47+
// row must be verified (using CPut).
48+
//
4649
// The traceKV parameter determines whether the individual K/V operations
4750
// should be logged to the context. We use a separate argument here instead
4851
// of a Value field on the context because Value access in context.Context
@@ -52,10 +55,11 @@ func (td *tableDeleter) row(
5255
values tree.Datums,
5356
pm row.PartialIndexUpdateHelper,
5457
vh row.VectorIndexUpdateHelper,
58+
mustValidateOldPKValues bool,
5559
traceKV bool,
5660
) error {
5761
td.currentBatchSize++
58-
return td.rd.DeleteRow(ctx, td.b, values, pm, vh, nil, traceKV)
62+
return td.rd.DeleteRow(ctx, td.b, values, pm, vh, nil, mustValidateOldPKValues, traceKV)
5963
}
6064

6165
// deleteIndex runs the kv operations necessary to delete all kv entries in the

0 commit comments

Comments
 (0)