Skip to content

Commit 0a79328

Browse files
committed
kv, sql/row: don't always convert ConditionFailedError to dupe key error
KV returns a `ConditionFailedError` when an existing row does not match the expected value in a CPut. Up until now, the SQL layer has mostly only used CPut with an empty expected value to verify that no row exists with the key. For this reason, `ConvertBatchError` always converts `ConditionFailedError` to a duplicate key error. (The one exception is backfill of a unique index, which uses CPutAllowingIfNotExists with a non-empty expected value.) In the new UPDATE and DELETE fast paths, the SQL layer will now use CPut with a non-empty expected value to verify that a certain row already exists. If this verification fails, we don't want to return a dupe key error, but rather undo the write and continue without an error. We need `ConvertBatchError` to be aware of the difference. This commit changes `ConvertBatchError` to only convert `ConditionFailedError` to a dupe key error if the original request was a CPut with an empty expected value (or the caller is backfill of a unique index). If the original request had a non-empty expected value, we leave it as a `ConditionFailedError` for the new fast path nodes to handle. Epic: None Release note: None
1 parent 4329148 commit 0a79328

File tree

7 files changed

+58
-22
lines changed

7 files changed

+58
-22
lines changed

pkg/kv/batch.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1161,25 +1161,35 @@ func (b *Batch) bulkRequest(
11611161
b.initResult(numKeys, numKeys, notRaw, nil)
11621162
}
11631163

1164-
// GetResult retrieves the Result and Result row KeyValue for a particular index.
1164+
// GetResult retrieves the Result, expected value bytes, and Result row KeyValue
1165+
// for a particular index.
11651166
//
11661167
// WARNING: introduce new usages of this function with care. See discussion in
11671168
// https://github.com/cockroachdb/cockroach/pull/112937.
11681169
// TODO(yuzefovich): look into removing this confusing function.
1169-
func (b *Batch) GetResult(idx int) (*Result, KeyValue, error) {
1170+
func (b *Batch) GetResult(idx int) (*Result, []byte, KeyValue, error) {
11701171
origIdx := idx
11711172
for i := range b.Results {
11721173
r := &b.Results[i]
11731174
if idx < r.calls {
1175+
var expBytes []byte
1176+
if origIdx < len(b.reqs) {
1177+
req := &b.reqs[origIdx]
1178+
switch t := req.Value.(type) {
1179+
case *kvpb.RequestUnion_ConditionalPut:
1180+
expBytes = t.ConditionalPut.ExpBytes
1181+
}
1182+
}
11741183
if idx < len(r.Rows) {
1175-
return r, r.Rows[idx], nil
1184+
return r, expBytes, r.Rows[idx], nil
11761185
} else if idx < len(r.Keys) {
1177-
return r, KeyValue{Key: r.Keys[idx]}, nil
1186+
return r, expBytes, KeyValue{Key: r.Keys[idx]}, nil
11781187
} else {
1179-
return r, KeyValue{}, nil
1188+
return r, expBytes, KeyValue{}, nil
11801189
}
11811190
}
11821191
idx -= r.calls
11831192
}
1184-
return nil, KeyValue{}, errors.AssertionFailedf("index %d outside of results: %+v", origIdx, b.Results)
1193+
return nil, nil, KeyValue{},
1194+
errors.AssertionFailedf("index %d outside of results: %+v", origIdx, b.Results)
11851195
}

pkg/kv/db_test.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1144,25 +1144,39 @@ func TestGetResult(t *testing.T) {
11441144
b.PutBytes(&byteSliceBulkSource[[]byte]{kys2, vals})
11451145
keyToDel := roachpb.Key("b")
11461146
b.Del(keyToDel)
1147+
keyToCPut := roachpb.Key("c")
1148+
var expVal roachpb.Value
1149+
expVal.SetBytes(vals[2])
1150+
b.CPut(keyToCPut, []byte("who"), expVal.TagAndDataBytes())
11471151
err := txn.CommitInBatch(ctx, b)
11481152
require.NoError(t, err)
11491153
for i := 0; i < len(kys1)+len(kys2); i++ {
1150-
res, row, err := b.GetResult(i)
1151-
require.Equal(t, res, &b.Results[i/3])
1152-
require.Equal(t, row, b.Results[i/3].Rows[i%3])
1154+
res, exp, row, err := b.GetResult(i)
1155+
require.Equal(t, &b.Results[i/3], res)
1156+
require.Nil(t, exp)
1157+
require.Equal(t, b.Results[i/3].Rows[i%3], row)
11531158
require.NoError(t, err)
11541159
}
11551160
// test Del request (it uses Result.Keys rather than Result.Rows)
1156-
_, kv, err := b.GetResult(len(kys1) + len(kys2))
1161+
_, exp, kv, err := b.GetResult(len(kys1) + len(kys2))
11571162
require.NoError(t, err)
1158-
require.Equal(t, keyToDel, kv.Key)
1163+
require.Nil(t, exp)
1164+
require.Equal(t, kv.Key, keyToDel)
11591165
require.Nil(t, kv.Value)
1166+
1167+
// test CPut request (it fills in expBytes)
1168+
res, exp, row, err := b.GetResult(len(kys1) + len(kys2) + 1)
1169+
require.Equal(t, &b.Results[3], res)
1170+
require.Equal(t, expVal.TagAndDataBytes(), exp)
1171+
require.Equal(t, b.Results[3].Rows[0], row)
1172+
require.NoError(t, err)
1173+
11601174
// test EndTxn result
1161-
_, _, err = b.GetResult(len(kys1) + len(kys2) + 1)
1175+
_, _, _, err = b.GetResult(len(kys1) + len(kys2) + 2)
11621176
require.NoError(t, err)
11631177

11641178
// test out of bounds
1165-
_, _, err = b.GetResult(len(kys1) + len(kys2) + 2)
1179+
_, _, _, err = b.GetResult(len(kys1) + len(kys2) + 3)
11661180
require.Error(t, err)
11671181
}
11681182

pkg/sql/backfill/backfill.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ func ConvertBackfillError(
448448
if err != nil {
449449
return err
450450
}
451-
return row.ConvertBatchError(ctx, desc, b)
451+
return row.ConvertBatchError(ctx, desc, b, true /* alwaysConvertCondFailed */)
452452
}
453453

454454
type muBoundAccount struct {

pkg/sql/colexec/insert.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,9 @@ func (v *vectorInserter) Next() coldata.Batch {
184184
err = v.flowCtx.Txn.Run(ctx, kvba.Batch)
185185
}
186186
if err != nil {
187-
colexecerror.ExpectedError(row.ConvertBatchError(ctx, v.desc, kvba.Batch))
187+
colexecerror.ExpectedError(row.ConvertBatchError(
188+
ctx, v.desc, kvba.Batch, false, /* alwaysConvertCondFailed */
189+
))
188190
}
189191
numRows := end - start
190192
start = end

pkg/sql/delete_range.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func (d *deleteRangeNode) startExec(params runParams) error {
121121
d.deleteSpans(params, b, spans)
122122
log.VEventf(ctx, 2, "fast delete: processing %d spans", len(spans))
123123
if err := params.p.txn.Run(ctx, b); err != nil {
124-
return row.ConvertBatchError(ctx, d.desc, b)
124+
return row.ConvertBatchError(ctx, d.desc, b, false /* alwaysConvertCondFailed */)
125125
}
126126

127127
spans = spans[:0]
@@ -144,7 +144,7 @@ func (d *deleteRangeNode) startExec(params runParams) error {
144144
d.deleteSpans(params, b, spans)
145145
log.VEventf(ctx, 2, "fast delete: processing %d spans and committing", len(spans))
146146
if err := params.p.txn.CommitInBatch(ctx, b); err != nil {
147-
return row.ConvertBatchError(ctx, d.desc, b)
147+
return row.ConvertBatchError(ctx, d.desc, b, false /* alwaysConvertCondFailed */)
148148
}
149149
if resumeSpans, err := d.processResults(b.Results, nil /* resumeSpans */); err != nil {
150150
return err

pkg/sql/row/errors.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,14 @@ import (
3131
// ConvertBatchError attempts to map a key-value error generated during a
3232
// key-value batch operating over the specified table to a user friendly SQL
3333
// error.
34-
func ConvertBatchError(ctx context.Context, tableDesc catalog.TableDescriptor, b *kv.Batch) error {
34+
//
35+
// If alwaysConvertCondFailed is set to true, ConditionFailedError (from CPut
36+
// failures) will always be converted to a duplicate key error even if the
37+
// expValue of the CPut was not empty. This is necessary when backfilling a
38+
// unique index.
39+
func ConvertBatchError(
40+
ctx context.Context, tableDesc catalog.TableDescriptor, b *kv.Batch, alwaysConvertCondFailed bool,
41+
) error {
3542
origPErr := b.MustPErr()
3643
switch v := origPErr.GetDetail().(type) {
3744
case *kvpb.MinTimestampBoundUnsatisfiableError:
@@ -50,11 +57,14 @@ func ConvertBatchError(ctx context.Context, tableDesc catalog.TableDescriptor, b
5057
break
5158
}
5259
j := origPErr.Index.Index
53-
_, kv, err := b.GetResult(int(j))
60+
_, expBytes, kv, err := b.GetResult(int(j))
5461
if err != nil {
5562
return err
5663
}
57-
return NewUniquenessConstraintViolationError(ctx, tableDesc, kv.Key, v.ActualValue)
64+
if alwaysConvertCondFailed || len(expBytes) == 0 {
65+
// If we didn't expect the row to exist, this is a uniqueness violation.
66+
return NewUniquenessConstraintViolationError(ctx, tableDesc, kv.Key, v.ActualValue)
67+
}
5868

5969
case *kvpb.WriteIntentError:
6070
key := v.Locks[0].Key

pkg/sql/tablewriter.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func (tb *tableWriterBase) setRowsWrittenLimit(sd *sessiondata.SessionData) {
141141
func (tb *tableWriterBase) flushAndStartNewBatch(ctx context.Context) error {
142142
log.VEventf(ctx, 2, "writing batch with %d requests", len(tb.b.Requests()))
143143
if err := tb.txn.Run(ctx, tb.b); err != nil {
144-
return row.ConvertBatchError(ctx, tb.desc, tb.b)
144+
return row.ConvertBatchError(ctx, tb.desc, tb.b, false /* alwaysConvertCondFailed */)
145145
}
146146
if err := tb.tryDoResponseAdmission(ctx); err != nil {
147147
return err
@@ -180,7 +180,7 @@ func (tb *tableWriterBase) finalize(ctx context.Context) (err error) {
180180
}
181181
tb.lastBatchSize = tb.currentBatchSize
182182
if err != nil {
183-
return row.ConvertBatchError(ctx, tb.desc, tb.b)
183+
return row.ConvertBatchError(ctx, tb.desc, tb.b, false /* alwaysConvertCondFailed */)
184184
}
185185
return tb.tryDoResponseAdmission(ctx)
186186
}

0 commit comments

Comments
 (0)