Skip to content

Commit f70f968

Browse files
committed
sql: add updateSwapNode and deleteSwapNode
Add two new fast-path operators for UPDATE and DELETE: update swap and delete swap. These operators optimistically modify all KVs for a single row in one batch, using CPut on the primary index KVs to verify that the row did exist with the expected values. If the row did not exist, or did not have the expected values, savepoint rollback is used to undo the optimistic writes. Because update swap and delete swap use CPut to verify the contents of the original row, they can only be used when all column values can be constrained to a single value, in order to generate the expected value for CPut. And because they use savepoint rollback to undo the entire batch after ConditionFailedError, they can only be used for single-row writes. Unlike the insert fast path, update swap and delete swap read from input in order to support nontrivial projections. Thanks to sharing updateRun and deleteRun, update swap and delete swap are able to support RETURNING out of the box. However none of the following will be supported initially: * check constraints * unique checks (for RBR tables) * foreign key checks * foreign key cascades * after triggers UPDATE and DELETE statements needing those abilities, or modifying more than a single row, or not constraining every column to a single value, will not be able to use update swap or delete swap. Informs: #71153 Release note: None
1 parent 0a79328 commit f70f968

File tree

10 files changed

+309
-3
lines changed

10 files changed

+309
-3
lines changed

pkg/sql/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ go_library(
8080
"delayed.go",
8181
"delete.go",
8282
"delete_range.go",
83+
"delete_swap.go",
8384
"descriptor.go",
8485
"discard.go",
8586
"distinct.go",
@@ -280,6 +281,7 @@ go_library(
280281
"unsplit.go",
281282
"unsupported_vars.go",
282283
"update.go",
284+
"update_swap.go",
283285
"upsert.go",
284286
"user.go",
285287
"values.go",

pkg/sql/delete.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ type deleteRun struct {
6161
// of the target table being returned, that must be passed through from the
6262
// input node.
6363
numPassthrough int
64+
65+
mustValidateOldPKValues bool
6466
}
6567

6668
func (r *deleteRun) initRowContainer(params runParams, columns colinfo.ResultColumns) {
@@ -187,7 +189,7 @@ func (r *deleteRun) processSourceRow(params runParams, sourceVals tree.Datums) e
187189

188190
// Queue the deletion in the KV batch.
189191
if err := r.td.row(
190-
params.ctx, deleteVals, pm, vh, false /* mustValidateOldPKValues */, r.traceKV,
192+
params.ctx, deleteVals, pm, vh, r.mustValidateOldPKValues, r.traceKV,
191193
); err != nil {
192194
return err
193195
}

pkg/sql/delete_swap.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package sql
7+
8+
import (
9+
"context"
10+
"sync"
11+
12+
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
13+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
14+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
15+
"github.com/cockroachdb/errors"
16+
)
17+
18+
var deleteSwapNodePool = sync.Pool{
19+
New: func() interface{} {
20+
return &deleteSwapNode{}
21+
},
22+
}
23+
24+
type deleteSwapNode struct {
25+
// Unlike insertFastPathNode, deleteSwapNode reads from input in order to
26+
// support projections, which are used by some DELETE statements.
27+
singleInputPlanNode
28+
29+
// columns is set if this DELETE is returning any rows, to be
30+
// consumed by a renderNode upstream. This occurs when there is a
31+
// RETURNING clause with some scalar expressions.
32+
columns colinfo.ResultColumns
33+
34+
run deleteRun
35+
}
36+
37+
var _ mutationPlanNode = &deleteSwapNode{}
38+
39+
func (d *deleteSwapNode) startExec(params runParams) error {
40+
// Cache traceKV during execution, to avoid re-evaluating it for every row.
41+
d.run.traceKV = params.p.ExtendedEvalContext().Tracing.KVTracingEnabled()
42+
43+
d.run.mustValidateOldPKValues = true
44+
45+
d.run.initRowContainer(params, d.columns)
46+
47+
return d.run.td.init(params.ctx, params.p.txn, params.EvalContext())
48+
}
49+
50+
// Next is required because batchedPlanNode inherits from planNode, but
51+
// batchedPlanNode doesn't really provide it. See the explanatory comments
52+
// in plan_batch.go.
53+
func (d *deleteSwapNode) Next(params runParams) (bool, error) { panic("not valid") }
54+
55+
// Values is required because batchedPlanNode inherits from planNode, but
56+
// batchedPlanNode doesn't really provide it. See the explanatory comments
57+
// in plan_batch.go.
58+
func (d *deleteSwapNode) Values() tree.Datums { panic("not valid") }
59+
60+
// BatchedNext implements the batchedPlanNode interface.
61+
func (d *deleteSwapNode) BatchedNext(params runParams) (bool, error) {
62+
if d.run.done {
63+
return false, nil
64+
}
65+
66+
// Delete swap does everything in one batch. There should only be a single row
67+
// of input, to ensure the savepoint rollback below has the correct SQL
68+
// semantics.
69+
70+
if err := params.p.cancelChecker.Check(); err != nil {
71+
return false, err
72+
}
73+
74+
next, err := d.input.Next(params)
75+
if next {
76+
if err := d.run.processSourceRow(params, d.input.Values()); err != nil {
77+
return false, err
78+
}
79+
// Verify that there was only a single row of input.
80+
next, err = d.input.Next(params)
81+
if next {
82+
return false, errors.AssertionFailedf("expected only 1 row as input to delete swap")
83+
}
84+
}
85+
if err != nil {
86+
return false, err
87+
}
88+
89+
// Delete swap works by optimistically modifying every index in the same
90+
// batch. If the row does not actually exist, the write to the primary index
91+
// will fail with ConditionFailedError, but writes to some secondary indexes
92+
// might succeed. We use a savepoint here to undo those writes.
93+
sp, err := d.run.td.createSavepoint(params.ctx)
94+
if err != nil {
95+
return false, err
96+
}
97+
98+
d.run.td.setRowsWrittenLimit(params.extendedEvalCtx.SessionData())
99+
if err := d.run.td.finalize(params.ctx); err != nil {
100+
// If this was a ConditionFailedError, it means the row did not exist in the
101+
// primary index. We must roll back to the savepoint above to undo writes to
102+
// all secondary indexes.
103+
if condErr := (*kvpb.ConditionFailedError)(nil); errors.As(err, &condErr) {
104+
// Reset the table writer so that it looks like there were no rows to
105+
// delete.
106+
d.run.td.rowsWritten = 0
107+
d.run.td.clearLastBatch(params.ctx)
108+
if err := d.run.td.rollbackToSavepoint(params.ctx, sp); err != nil {
109+
return false, err
110+
}
111+
return false, nil
112+
}
113+
return false, err
114+
}
115+
116+
// Remember we're done for the next call to BatchedNext().
117+
d.run.done = true
118+
119+
// Possibly initiate a run of CREATE STATISTICS.
120+
params.ExecCfg().StatsRefresher.NotifyMutation(d.run.td.tableDesc(), d.run.td.lastBatchSize)
121+
122+
return d.run.td.lastBatchSize > 0, nil
123+
}
124+
125+
// BatchedCount implements the batchedPlanNode interface.
126+
func (d *deleteSwapNode) BatchedCount() int {
127+
return d.run.td.lastBatchSize
128+
}
129+
130+
// BatchedValues implements the batchedPlanNode interface.
131+
func (d *deleteSwapNode) BatchedValues(rowIdx int) tree.Datums {
132+
return d.run.td.rows.At(rowIdx)
133+
}
134+
135+
func (d *deleteSwapNode) Close(ctx context.Context) {
136+
d.run.td.close(ctx)
137+
*d = deleteSwapNode{}
138+
deleteSwapNodePool.Put(d)
139+
}
140+
141+
func (d *deleteSwapNode) rowsWritten() int64 {
142+
return d.run.td.rowsWritten
143+
}

pkg/sql/plan.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ var _ planNode = &CreateRoleNode{}
206206
var _ planNode = &createViewNode{}
207207
var _ planNode = &delayedNode{}
208208
var _ planNode = &deleteNode{}
209+
var _ planNode = &deleteSwapNode{}
209210
var _ planNode = &deleteRangeNode{}
210211
var _ planNode = &distinctNode{}
211212
var _ planNode = &dropDatabaseNode{}
@@ -259,6 +260,7 @@ var _ planNode = &truncateNode{}
259260
var _ planNode = &unaryNode{}
260261
var _ planNode = &unionNode{}
261262
var _ planNode = &updateNode{}
263+
var _ planNode = &updateSwapNode{}
262264
var _ planNode = &upsertNode{}
263265
var _ planNode = &valuesNode{}
264266
var _ planNode = &vectorMutationSearchNode{}

pkg/sql/plan_batch.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ type batchedPlanNode interface {
6868
}
6969

7070
var _ batchedPlanNode = &deleteNode{}
71+
var _ batchedPlanNode = &deleteSwapNode{}
7172
var _ batchedPlanNode = &updateNode{}
73+
var _ batchedPlanNode = &updateSwapNode{}
7274

7375
// serializeNode serializes the results of a batchedPlanNode into a
7476
// plain planNode interface. In other words, it wraps around

pkg/sql/plan_columns.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,12 @@ func getPlanColumns(plan planNode, mut bool) colinfo.ResultColumns {
7171
return n.columns
7272
case *deleteNode:
7373
return n.columns
74+
case *deleteSwapNode:
75+
return n.columns
7476
case *updateNode:
7577
return n.columns
78+
case *updateSwapNode:
79+
return n.columns
7680
case *insertNode:
7781
return n.columns
7882
case *insertFastPathNode:

pkg/sql/plan_names.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ var planNodeNames = map[reflect.Type]string{
112112
reflect.TypeOf(&delayedNode{}): "virtual table",
113113
reflect.TypeOf(&deleteNode{}): "delete",
114114
reflect.TypeOf(&deleteRangeNode{}): "delete range",
115+
reflect.TypeOf(&deleteSwapNode{}): "delete swap",
115116
reflect.TypeOf(&discardNode{}): "discard",
116117
reflect.TypeOf(&distinctNode{}): "distinct",
117118
reflect.TypeOf(&dropDatabaseNode{}): "drop database",
@@ -188,6 +189,7 @@ var planNodeNames = map[reflect.Type]string{
188189
reflect.TypeOf(&unaryNode{}): "emptyrow",
189190
reflect.TypeOf(&unionNode{}): "union",
190191
reflect.TypeOf(&updateNode{}): "update",
192+
reflect.TypeOf(&updateSwapNode{}): "update swap",
191193
reflect.TypeOf(&upsertNode{}): "upsert",
192194
reflect.TypeOf(&valuesNode{}): "values",
193195
reflect.TypeOf(&vectorMutationSearchNode{}): "vector mutation search",

pkg/sql/plan_ordering.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ func planReqOrdering(plan planNode) ReqOrdering {
3131
if n.run.rowsNeeded {
3232
return planReqOrdering(n.input)
3333
}
34+
case *deleteSwapNode:
35+
if n.run.rowsNeeded {
36+
return planReqOrdering(n.input)
37+
}
3438

3539
case *filterNode:
3640
return n.reqOrdering
@@ -57,7 +61,7 @@ func planReqOrdering(plan planNode) ReqOrdering {
5761
return n.reqOrdering
5862
case *insertNode, *insertFastPathNode:
5963
// TODO(knz): RETURNING is ordered by the PK.
60-
case *updateNode, *upsertNode:
64+
case *updateNode, *updateSwapNode, *upsertNode:
6165
// After an update, the original order may have been destroyed.
6266
// For example, if the PK is updated by a SET expression.
6367
// So we can't assume any ordering.

pkg/sql/update.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ type updateRun struct {
7070
// regionLocalInfo handles erroring out the UPDATE when the
7171
// enforce_home_region setting is on.
7272
regionLocalInfo regionLocalInfoType
73+
74+
mustValidateOldPKValues bool
7375
}
7476

7577
func (r *updateRun) initRowContainer(params runParams, columns colinfo.ResultColumns) {
@@ -240,7 +242,7 @@ func (r *updateRun) processSourceRow(params runParams, sourceVals tree.Datums) e
240242

241243
// Queue the insert in the KV batch.
242244
newValues, err := r.tu.rowForUpdate(
243-
params.ctx, oldValues, updateValues, pm, vh, false /* mustValidateOldPKValues */, r.traceKV,
245+
params.ctx, oldValues, updateValues, pm, vh, r.mustValidateOldPKValues, r.traceKV,
244246
)
245247
if err != nil {
246248
return err

0 commit comments

Comments
 (0)