Skip to content

Commit 77c11b1

Browse files
craig[bot]michae2yuzefovich
committed
144186: sql: add updateSwapNode and deleteSwapNode r=mgartner,yuzefovich a=michae2 This PR adds two new planNode operators: update swap and delete swap. See individual commits for details. (Release note will be included in the upcoming optimizer PR.) Informs: #71153 Release note: None 144276: kvclient: let QueryLocks and LeaseInfo requests through txnWriteBuffer r=yuzefovich a=yuzefovich These requests don't interact with buffered writes, so we simply allow the requests and their response to go through the interceptor unchanged. Epic: None Release note: None Co-authored-by: Michael Erickson <[email protected]> Co-authored-by: Yahor Yuzefovich <[email protected]>
3 parents 30bcd20 + f70f968 + a4924ec commit 77c11b1

19 files changed

+386
-26
lines changed

pkg/kv/batch.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1161,25 +1161,35 @@ func (b *Batch) bulkRequest(
11611161
b.initResult(numKeys, numKeys, notRaw, nil)
11621162
}
11631163

1164-
// GetResult retrieves the Result and Result row KeyValue for a particular index.
1164+
// GetResult retrieves the Result, expected value bytes, and Result row KeyValue
1165+
// for a particular index.
11651166
//
11661167
// WARNING: introduce new usages of this function with care. See discussion in
11671168
// https://github.com/cockroachdb/cockroach/pull/112937.
11681169
// TODO(yuzefovich): look into removing this confusing function.
1169-
func (b *Batch) GetResult(idx int) (*Result, KeyValue, error) {
1170+
func (b *Batch) GetResult(idx int) (*Result, []byte, KeyValue, error) {
11701171
origIdx := idx
11711172
for i := range b.Results {
11721173
r := &b.Results[i]
11731174
if idx < r.calls {
1175+
var expBytes []byte
1176+
if origIdx < len(b.reqs) {
1177+
req := &b.reqs[origIdx]
1178+
switch t := req.Value.(type) {
1179+
case *kvpb.RequestUnion_ConditionalPut:
1180+
expBytes = t.ConditionalPut.ExpBytes
1181+
}
1182+
}
11741183
if idx < len(r.Rows) {
1175-
return r, r.Rows[idx], nil
1184+
return r, expBytes, r.Rows[idx], nil
11761185
} else if idx < len(r.Keys) {
1177-
return r, KeyValue{Key: r.Keys[idx]}, nil
1186+
return r, expBytes, KeyValue{Key: r.Keys[idx]}, nil
11781187
} else {
1179-
return r, KeyValue{}, nil
1188+
return r, expBytes, KeyValue{}, nil
11801189
}
11811190
}
11821191
idx -= r.calls
11831192
}
1184-
return nil, KeyValue{}, errors.AssertionFailedf("index %d outside of results: %+v", origIdx, b.Results)
1193+
return nil, nil, KeyValue{},
1194+
errors.AssertionFailedf("index %d outside of results: %+v", origIdx, b.Results)
11851195
}

pkg/kv/db_test.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1144,25 +1144,39 @@ func TestGetResult(t *testing.T) {
11441144
b.PutBytes(&byteSliceBulkSource[[]byte]{kys2, vals})
11451145
keyToDel := roachpb.Key("b")
11461146
b.Del(keyToDel)
1147+
keyToCPut := roachpb.Key("c")
1148+
var expVal roachpb.Value
1149+
expVal.SetBytes(vals[2])
1150+
b.CPut(keyToCPut, []byte("who"), expVal.TagAndDataBytes())
11471151
err := txn.CommitInBatch(ctx, b)
11481152
require.NoError(t, err)
11491153
for i := 0; i < len(kys1)+len(kys2); i++ {
1150-
res, row, err := b.GetResult(i)
1151-
require.Equal(t, res, &b.Results[i/3])
1152-
require.Equal(t, row, b.Results[i/3].Rows[i%3])
1154+
res, exp, row, err := b.GetResult(i)
1155+
require.Equal(t, &b.Results[i/3], res)
1156+
require.Nil(t, exp)
1157+
require.Equal(t, b.Results[i/3].Rows[i%3], row)
11531158
require.NoError(t, err)
11541159
}
11551160
// test Del request (it uses Result.Keys rather than Result.Rows)
1156-
_, kv, err := b.GetResult(len(kys1) + len(kys2))
1161+
_, exp, kv, err := b.GetResult(len(kys1) + len(kys2))
11571162
require.NoError(t, err)
1158-
require.Equal(t, keyToDel, kv.Key)
1163+
require.Nil(t, exp)
1164+
require.Equal(t, kv.Key, keyToDel)
11591165
require.Nil(t, kv.Value)
1166+
1167+
// test CPut request (it fills in expBytes)
1168+
res, exp, row, err := b.GetResult(len(kys1) + len(kys2) + 1)
1169+
require.Equal(t, &b.Results[3], res)
1170+
require.Equal(t, expVal.TagAndDataBytes(), exp)
1171+
require.Equal(t, b.Results[3].Rows[0], row)
1172+
require.NoError(t, err)
1173+
11601174
// test EndTxn result
1161-
_, _, err = b.GetResult(len(kys1) + len(kys2) + 1)
1175+
_, _, _, err = b.GetResult(len(kys1) + len(kys2) + 2)
11621176
require.NoError(t, err)
11631177

11641178
// test out of bounds
1165-
_, _, err = b.GetResult(len(kys1) + len(kys2) + 2)
1179+
_, _, _, err = b.GetResult(len(kys1) + len(kys2) + 3)
11661180
require.Error(t, err)
11671181
}
11681182

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,7 @@ func (twb *txnWriteBuffer) validateRequests(ba *kvpb.BatchRequest) error {
344344
if t.ScanFormat == kvpb.COL_BATCH_RESPONSE {
345345
return unsupportedOptionError(t.Method(), "COL_BATCH_RESPONSE scan format")
346346
}
347+
case *kvpb.QueryLocksRequest, *kvpb.LeaseInfoRequest:
347348
default:
348349
// All other requests are unsupported. Note that we assume EndTxn and
349350
// DeleteRange requests were handled explicitly before this method was
@@ -816,6 +817,11 @@ func (twb *txnWriteBuffer) applyTransformations(
816817
// the request to the KV layer.
817818
baRemote.Requests = append(baRemote.Requests, ru)
818819

820+
case *kvpb.QueryLocksRequest, *kvpb.LeaseInfoRequest:
821+
// These requests don't interact with buffered writes, so we simply
822+
// let them through.
823+
baRemote.Requests = append(baRemote.Requests, ru)
824+
819825
default:
820826
return nil, nil, kvpb.NewError(unsupportedMethodError(t.Method()))
821827
}
@@ -1183,6 +1189,10 @@ func (t transformation) toResp(
11831189
}
11841190
ru.MustSetInner(reverseScanResp)
11851191

1192+
case *kvpb.QueryLocksRequest, *kvpb.LeaseInfoRequest:
1193+
// These requests don't interact with buffered writes, so we simply
1194+
// let the response through unchanged.
1195+
11861196
default:
11871197
return ru, kvpb.NewError(unsupportedMethodError(req.Method()))
11881198
}

pkg/sql/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ go_library(
8080
"delayed.go",
8181
"delete.go",
8282
"delete_range.go",
83+
"delete_swap.go",
8384
"descriptor.go",
8485
"discard.go",
8586
"distinct.go",
@@ -280,6 +281,7 @@ go_library(
280281
"unsplit.go",
281282
"unsupported_vars.go",
282283
"update.go",
284+
"update_swap.go",
283285
"upsert.go",
284286
"user.go",
285287
"values.go",

pkg/sql/backfill/backfill.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ func ConvertBackfillError(
448448
if err != nil {
449449
return err
450450
}
451-
return row.ConvertBatchError(ctx, desc, b)
451+
return row.ConvertBatchError(ctx, desc, b, true /* alwaysConvertCondFailed */)
452452
}
453453

454454
type muBoundAccount struct {

pkg/sql/colexec/insert.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,9 @@ func (v *vectorInserter) Next() coldata.Batch {
184184
err = v.flowCtx.Txn.Run(ctx, kvba.Batch)
185185
}
186186
if err != nil {
187-
colexecerror.ExpectedError(row.ConvertBatchError(ctx, v.desc, kvba.Batch))
187+
colexecerror.ExpectedError(row.ConvertBatchError(
188+
ctx, v.desc, kvba.Batch, false, /* alwaysConvertCondFailed */
189+
))
188190
}
189191
numRows := end - start
190192
start = end

pkg/sql/delete.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ type deleteRun struct {
6161
// of the target table being returned, that must be passed through from the
6262
// input node.
6363
numPassthrough int
64+
65+
mustValidateOldPKValues bool
6466
}
6567

6668
func (r *deleteRun) initRowContainer(params runParams, columns colinfo.ResultColumns) {
@@ -187,7 +189,7 @@ func (r *deleteRun) processSourceRow(params runParams, sourceVals tree.Datums) e
187189

188190
// Queue the deletion in the KV batch.
189191
if err := r.td.row(
190-
params.ctx, deleteVals, pm, vh, false /* mustValidateOldPKValues */, r.traceKV,
192+
params.ctx, deleteVals, pm, vh, r.mustValidateOldPKValues, r.traceKV,
191193
); err != nil {
192194
return err
193195
}

pkg/sql/delete_range.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func (d *deleteRangeNode) startExec(params runParams) error {
121121
d.deleteSpans(params, b, spans)
122122
log.VEventf(ctx, 2, "fast delete: processing %d spans", len(spans))
123123
if err := params.p.txn.Run(ctx, b); err != nil {
124-
return row.ConvertBatchError(ctx, d.desc, b)
124+
return row.ConvertBatchError(ctx, d.desc, b, false /* alwaysConvertCondFailed */)
125125
}
126126

127127
spans = spans[:0]
@@ -144,7 +144,7 @@ func (d *deleteRangeNode) startExec(params runParams) error {
144144
d.deleteSpans(params, b, spans)
145145
log.VEventf(ctx, 2, "fast delete: processing %d spans and committing", len(spans))
146146
if err := params.p.txn.CommitInBatch(ctx, b); err != nil {
147-
return row.ConvertBatchError(ctx, d.desc, b)
147+
return row.ConvertBatchError(ctx, d.desc, b, false /* alwaysConvertCondFailed */)
148148
}
149149
if resumeSpans, err := d.processResults(b.Results, nil /* resumeSpans */); err != nil {
150150
return err

pkg/sql/delete_swap.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package sql
7+
8+
import (
9+
"context"
10+
"sync"
11+
12+
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
13+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
14+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
15+
"github.com/cockroachdb/errors"
16+
)
17+
18+
var deleteSwapNodePool = sync.Pool{
19+
New: func() interface{} {
20+
return &deleteSwapNode{}
21+
},
22+
}
23+
24+
type deleteSwapNode struct {
25+
// Unlike insertFastPathNode, deleteSwapNode reads from input in order to
26+
// support projections, which are used by some DELETE statements.
27+
singleInputPlanNode
28+
29+
// columns is set if this DELETE is returning any rows, to be
30+
// consumed by a renderNode upstream. This occurs when there is a
31+
// RETURNING clause with some scalar expressions.
32+
columns colinfo.ResultColumns
33+
34+
run deleteRun
35+
}
36+
37+
var _ mutationPlanNode = &deleteSwapNode{}
38+
39+
func (d *deleteSwapNode) startExec(params runParams) error {
40+
// Cache traceKV during execution, to avoid re-evaluating it for every row.
41+
d.run.traceKV = params.p.ExtendedEvalContext().Tracing.KVTracingEnabled()
42+
43+
d.run.mustValidateOldPKValues = true
44+
45+
d.run.initRowContainer(params, d.columns)
46+
47+
return d.run.td.init(params.ctx, params.p.txn, params.EvalContext())
48+
}
49+
50+
// Next is required because batchedPlanNode inherits from planNode, but
51+
// batchedPlanNode doesn't really provide it. See the explanatory comments
52+
// in plan_batch.go.
53+
func (d *deleteSwapNode) Next(params runParams) (bool, error) { panic("not valid") }
54+
55+
// Values is required because batchedPlanNode inherits from planNode, but
56+
// batchedPlanNode doesn't really provide it. See the explanatory comments
57+
// in plan_batch.go.
58+
func (d *deleteSwapNode) Values() tree.Datums { panic("not valid") }
59+
60+
// BatchedNext implements the batchedPlanNode interface.
61+
func (d *deleteSwapNode) BatchedNext(params runParams) (bool, error) {
62+
if d.run.done {
63+
return false, nil
64+
}
65+
66+
// Delete swap does everything in one batch. There should only be a single row
67+
// of input, to ensure the savepoint rollback below has the correct SQL
68+
// semantics.
69+
70+
if err := params.p.cancelChecker.Check(); err != nil {
71+
return false, err
72+
}
73+
74+
next, err := d.input.Next(params)
75+
if next {
76+
if err := d.run.processSourceRow(params, d.input.Values()); err != nil {
77+
return false, err
78+
}
79+
// Verify that there was only a single row of input.
80+
next, err = d.input.Next(params)
81+
if next {
82+
return false, errors.AssertionFailedf("expected only 1 row as input to delete swap")
83+
}
84+
}
85+
if err != nil {
86+
return false, err
87+
}
88+
89+
// Delete swap works by optimistically modifying every index in the same
90+
// batch. If the row does not actually exist, the write to the primary index
91+
// will fail with ConditionFailedError, but writes to some secondary indexes
92+
// might succeed. We use a savepoint here to undo those writes.
93+
sp, err := d.run.td.createSavepoint(params.ctx)
94+
if err != nil {
95+
return false, err
96+
}
97+
98+
d.run.td.setRowsWrittenLimit(params.extendedEvalCtx.SessionData())
99+
if err := d.run.td.finalize(params.ctx); err != nil {
100+
// If this was a ConditionFailedError, it means the row did not exist in the
101+
// primary index. We must roll back to the savepoint above to undo writes to
102+
// all secondary indexes.
103+
if condErr := (*kvpb.ConditionFailedError)(nil); errors.As(err, &condErr) {
104+
// Reset the table writer so that it looks like there were no rows to
105+
// delete.
106+
d.run.td.rowsWritten = 0
107+
d.run.td.clearLastBatch(params.ctx)
108+
if err := d.run.td.rollbackToSavepoint(params.ctx, sp); err != nil {
109+
return false, err
110+
}
111+
return false, nil
112+
}
113+
return false, err
114+
}
115+
116+
// Remember we're done for the next call to BatchedNext().
117+
d.run.done = true
118+
119+
// Possibly initiate a run of CREATE STATISTICS.
120+
params.ExecCfg().StatsRefresher.NotifyMutation(d.run.td.tableDesc(), d.run.td.lastBatchSize)
121+
122+
return d.run.td.lastBatchSize > 0, nil
123+
}
124+
125+
// BatchedCount implements the batchedPlanNode interface.
126+
func (d *deleteSwapNode) BatchedCount() int {
127+
return d.run.td.lastBatchSize
128+
}
129+
130+
// BatchedValues implements the batchedPlanNode interface.
131+
func (d *deleteSwapNode) BatchedValues(rowIdx int) tree.Datums {
132+
return d.run.td.rows.At(rowIdx)
133+
}
134+
135+
func (d *deleteSwapNode) Close(ctx context.Context) {
136+
d.run.td.close(ctx)
137+
*d = deleteSwapNode{}
138+
deleteSwapNodePool.Put(d)
139+
}
140+
141+
func (d *deleteSwapNode) rowsWritten() int64 {
142+
return d.run.td.rowsWritten
143+
}

pkg/sql/plan.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ var _ planNode = &CreateRoleNode{}
206206
var _ planNode = &createViewNode{}
207207
var _ planNode = &delayedNode{}
208208
var _ planNode = &deleteNode{}
209+
var _ planNode = &deleteSwapNode{}
209210
var _ planNode = &deleteRangeNode{}
210211
var _ planNode = &distinctNode{}
211212
var _ planNode = &dropDatabaseNode{}
@@ -259,6 +260,7 @@ var _ planNode = &truncateNode{}
259260
var _ planNode = &unaryNode{}
260261
var _ planNode = &unionNode{}
261262
var _ planNode = &updateNode{}
263+
var _ planNode = &updateSwapNode{}
262264
var _ planNode = &upsertNode{}
263265
var _ planNode = &valuesNode{}
264266
var _ planNode = &vectorMutationSearchNode{}

0 commit comments

Comments
 (0)