Skip to content

Commit 6ad5b43

Browse files
committed
sql: remove unnecessary nodes and interfaces related to mutation planNodes
This commit adds two new helper structs for use in mutation nodes and other planNodes that need to output the number of affected rows: `mutationOutputHelper` and `rowsAffectedOutputHelper`. The former is now embedded into the mutation planNodes apart from `deleteRangeNode`, which embeds `rowsAffectedOutputHelper` instead. This commit also removes `spoolNode`, `serializeNode`, and `rowCount` node, with `mutationOutputHelper` replacing the work they used to do. This also allowed the removal of the `batchedPlanNode` interface and its methods. These changes simplify some factory logic, and should simplify the process of moving from `planNode` implementations to processors in the future. This change also fixes #139378, where the `spoolNode` added for a mutation with a RETURNING clause would break some of the EXPLAIN ANALYZE stats. Fixes #139378 Release note (bug fix): Previously, CockroachDB would omit execution statistics in EXPLAIN ANALYZE output for mutation nodes when a RETURNING clause was used. The bug has been present since before v21.1 and is now fixed.
1 parent cee475f commit 6ad5b43

33 files changed

+724
-1032
lines changed

pkg/sql/BUILD.bazel

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,9 +171,9 @@ go_library(
171171
"pg_extension.go",
172172
"pg_metadata_diff.go",
173173
"plan.go",
174-
"plan_batch.go",
175174
"plan_columns.go",
176175
"plan_names.go",
176+
"plan_node_output_helper.go",
177177
"plan_node_to_row_source.go",
178178
"plan_opt.go",
179179
"plan_ordering.go",
@@ -248,7 +248,6 @@ go_library(
248248
"show_zone_config.go",
249249
"sort.go",
250250
"split.go",
251-
"spool.go",
252251
"sql_activity_update_job.go",
253252
"sql_cursor.go",
254253
"statement.go",
@@ -722,6 +721,7 @@ go_test(
722721
"pg_metadata_test.go",
723722
"pg_oid_test.go",
724723
"pgwire_internal_test.go",
724+
"plan_node_output_helper_test.go",
725725
"plan_opt_test.go",
726726
"privileged_accessor_test.go",
727727
"region_util_test.go",
@@ -893,6 +893,7 @@ go_test(
893893
"//pkg/sql/randgen",
894894
"//pkg/sql/regions",
895895
"//pkg/sql/row",
896+
"//pkg/sql/rowcontainer",
896897
"//pkg/sql/rowenc",
897898
"//pkg/sql/rowenc/keyside",
898899
"//pkg/sql/rowenc/rowencpb",

pkg/sql/create_table.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,6 @@ func (n *createTableNode) startExec(params runParams) error {
570570
ti := tableInserterPool.Get().(*tableInserter)
571571
*ti = tableInserter{ri: ri}
572572
defer func() {
573-
ti.close(params.ctx)
574573
*ti = tableInserter{}
575574
tableInserterPool.Put(ti)
576575
}()

pkg/sql/delete.go

Lines changed: 72 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,12 @@ var _ mutationPlanNode = &deleteNode{}
3636

3737
// deleteRun contains the run-time state of deleteNode during local execution.
3838
type deleteRun struct {
39-
td tableDeleter
40-
rowsNeeded bool
39+
mutationOutputHelper
40+
td tableDeleter
4141

42-
// done informs a new call to BatchedNext() that the previous call
43-
// to BatchedNext() has completed the work already.
44-
done bool
42+
// rowsNeeded is set to true if the mutation operator needs to return the rows
43+
// that were affected by the mutation.
44+
rowsNeeded bool
4545

4646
// resultRowBuffer is used to prepare a result row for accumulation
4747
// into the row container above, when rowsNeeded is set.
@@ -76,7 +76,7 @@ func (r *deleteRun) init(params runParams, columns colinfo.ResultColumns) {
7676
return
7777
}
7878

79-
r.td.rows = rowcontainer.NewRowContainer(
79+
r.rows = rowcontainer.NewRowContainer(
8080
params.p.Mon().MakeBoundAccount(),
8181
colinfo.ColTypeInfoFromResCols(columns),
8282
)
@@ -92,31 +92,34 @@ func (d *deleteNode) startExec(params runParams) error {
9292

9393
d.run.init(params, d.columns)
9494

95-
return d.run.td.init(params.ctx, params.p.txn, params.EvalContext())
96-
}
95+
if err := d.run.td.init(params.ctx, params.p.txn, params.EvalContext()); err != nil {
96+
return err
97+
}
9798

98-
// Next is required because batchedPlanNode inherits from planNode, but
99-
// batchedPlanNode doesn't really provide it. See the explanatory comments
100-
// in plan_batch.go.
101-
func (d *deleteNode) Next(params runParams) (bool, error) { panic("not valid") }
99+
// Run the mutation to completion.
100+
for {
101+
lastBatch, err := d.processBatch(params)
102+
if err != nil || lastBatch {
103+
return err
104+
}
105+
}
106+
}
102107

103-
// Values is required because batchedPlanNode inherits from planNode, but
104-
// batchedPlanNode doesn't really provide it. See the explanatory comments
105-
// in plan_batch.go.
106-
func (d *deleteNode) Values() tree.Datums { panic("not valid") }
108+
// Next implements the planNode interface.
109+
func (d *deleteNode) Next(_ runParams) (bool, error) {
110+
return d.run.next(), nil
111+
}
107112

108-
// BatchedNext implements the batchedPlanNode interface.
109-
func (d *deleteNode) BatchedNext(params runParams) (bool, error) {
110-
if d.run.done {
111-
return false, nil
112-
}
113+
// Values implements the planNode interface.
114+
func (d *deleteNode) Values() tree.Datums {
115+
return d.run.values()
116+
}
113117

114-
// Advance one batch. First, clear the last batch.
115-
d.run.td.clearLastBatch(params.ctx)
116-
// Now consume/accumulate the rows for this batch.
117-
lastBatch := false
118+
func (d *deleteNode) processBatch(params runParams) (lastBatch bool, err error) {
119+
// Consume/accumulate the rows for this batch.
120+
lastBatch = false
118121
for {
119-
if err := params.p.cancelChecker.Check(); err != nil {
122+
if err = params.p.cancelChecker.Check(); err != nil {
120123
return false, err
121124
}
122125

@@ -131,7 +134,7 @@ func (d *deleteNode) BatchedNext(params runParams) (bool, error) {
131134

132135
// Process the deletion of the current input row,
133136
// potentially accumulating the result row for later.
134-
if err := d.run.processSourceRow(params, d.input.Values()); err != nil {
137+
if err = d.run.processSourceRow(params, d.input.Values()); err != nil {
135138
return false, err
136139
}
137140

@@ -146,24 +149,21 @@ func (d *deleteNode) BatchedNext(params runParams) (bool, error) {
146149
if !lastBatch {
147150
// We only run/commit the batch if there were some rows processed
148151
// in this batch.
149-
if err := d.run.td.flushAndStartNewBatch(params.ctx); err != nil {
152+
if err = d.run.td.flushAndStartNewBatch(params.ctx); err != nil {
150153
return false, err
151154
}
152155
}
153156
}
154157

155158
if lastBatch {
156159
d.run.td.setRowsWrittenLimit(params.extendedEvalCtx.SessionData())
157-
if err := d.run.td.finalize(params.ctx); err != nil {
160+
if err = d.run.td.finalize(params.ctx); err != nil {
158161
return false, err
159162
}
160-
// Remember we're done for the next call to BatchedNext().
161-
d.run.done = true
162163
// Possibly initiate a run of CREATE STATISTICS.
163-
params.ExecCfg().StatsRefresher.NotifyMutation(d.run.td.tableDesc(), int(d.run.td.rowsWritten))
164+
params.ExecCfg().StatsRefresher.NotifyMutation(d.run.td.tableDesc(), int(d.run.rowsAffected()))
164165
}
165-
166-
return d.run.td.lastBatchSize > 0, nil
166+
return lastBatch, nil
167167
}
168168

169169
// processSourceRow processes one row from the source for deletion and, if
@@ -199,64 +199,60 @@ func (r *deleteRun) processSourceRow(params runParams, sourceVals tree.Datums) e
199199
); err != nil {
200200
return err
201201
}
202+
r.onModifiedRow()
203+
if !r.rowsNeeded {
204+
return nil
205+
}
202206

203-
// If result rows need to be accumulated, do it.
204-
if r.td.rows != nil {
205-
// The new values can include all columns, so the values may contain
206-
// additional columns for every newly dropped column not visible. We do not
207-
// want them to be available for RETURNING.
208-
//
209-
// r.rows.NumCols() is guaranteed to only contain the requested
210-
// public columns.
211-
largestRetIdx := -1
212-
for i := range r.rowIdxToRetIdx {
213-
retIdx := r.rowIdxToRetIdx[i]
214-
if retIdx >= 0 {
215-
if retIdx >= largestRetIdx {
216-
largestRetIdx = retIdx
217-
}
218-
r.resultRowBuffer[retIdx] = deleteVals[i]
207+
// Result rows must be accumulated.
208+
//
209+
// The new values can include all columns, so the values may contain
210+
// additional columns for every newly dropped column not visible. We do not
211+
// want them to be available for RETURNING.
212+
//
213+
// r.rows.NumCols() is guaranteed to only contain the requested
214+
// public columns.
215+
largestRetIdx := -1
216+
for i := range r.rowIdxToRetIdx {
217+
retIdx := r.rowIdxToRetIdx[i]
218+
if retIdx >= 0 {
219+
if retIdx >= largestRetIdx {
220+
largestRetIdx = retIdx
219221
}
222+
r.resultRowBuffer[retIdx] = deleteVals[i]
220223
}
224+
}
221225

222-
// At this point we've extracted all the RETURNING values that are part
223-
// of the target table. We must now extract the columns in the RETURNING
224-
// clause that refer to other tables (from the USING clause of the delete).
225-
if r.numPassthrough > 0 {
226-
passthroughBegin := len(r.td.rd.FetchCols)
227-
passthroughEnd := passthroughBegin + r.numPassthrough
228-
passthroughValues := deleteVals[passthroughBegin:passthroughEnd]
229-
230-
for i := 0; i < r.numPassthrough; i++ {
231-
largestRetIdx++
232-
r.resultRowBuffer[largestRetIdx] = passthroughValues[i]
233-
}
234-
226+
// At this point we've extracted all the RETURNING values that are part
227+
// of the target table. We must now extract the columns in the RETURNING
228+
// clause that refer to other tables (from the USING clause of the delete).
229+
if r.numPassthrough > 0 {
230+
passthroughBegin := len(r.td.rd.FetchCols)
231+
passthroughEnd := passthroughBegin + r.numPassthrough
232+
passthroughValues := deleteVals[passthroughBegin:passthroughEnd]
233+
234+
for i := 0; i < r.numPassthrough; i++ {
235+
largestRetIdx++
236+
r.resultRowBuffer[largestRetIdx] = passthroughValues[i]
235237
}
236238

237-
if _, err := r.td.rows.AddRow(params.ctx, r.resultRowBuffer); err != nil {
238-
return err
239-
}
240239
}
241-
242-
return nil
240+
return r.addRow(params.ctx, r.resultRowBuffer)
243241
}
244242

245-
// BatchedCount implements the batchedPlanNode interface.
246-
func (d *deleteNode) BatchedCount() int { return d.run.td.lastBatchSize }
247-
248-
// BatchedValues implements the batchedPlanNode interface.
249-
func (d *deleteNode) BatchedValues(rowIdx int) tree.Datums { return d.run.td.rows.At(rowIdx) }
250-
251243
func (d *deleteNode) Close(ctx context.Context) {
252244
d.input.Close(ctx)
253-
d.run.td.close(ctx)
245+
d.run.close(ctx)
254246
*d = deleteNode{}
255247
deleteNodePool.Put(d)
256248
}
257249

258250
func (d *deleteNode) rowsWritten() int64 {
259-
return d.run.td.rowsWritten
251+
return d.run.rowsAffected()
252+
}
253+
254+
func (d *deleteNode) returnsRowsAffected() bool {
255+
return !d.run.rowsNeeded
260256
}
261257

262258
func (d *deleteNode) enableAutoCommit() {

pkg/sql/delete_range.go

Lines changed: 11 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
// be deleted, it'll enable autoCommit for delete range.
3232
type deleteRangeNode struct {
3333
zeroInputPlanNode
34+
rowsAffectedOutputHelper
3435
// spans are the spans to delete.
3536
spans roachpb.Spans
3637
// desc is the table descriptor the delete is operating on.
@@ -45,42 +46,22 @@ type deleteRangeNode struct {
4546
// batches and will just send one big delete with a commit statement attached.
4647
autoCommitEnabled bool
4748

48-
// rowCount will be set to the count of rows deleted.
49-
rowCount int
50-
5149
// curRowPrefix is the prefix for all KVs (i.e. for all column families) of
5250
// the SQL row that increased rowCount last. It is maintained across
5351
// different BatchRequests in order to not double count the same SQL row.
5452
curRowPrefix []byte
5553
}
5654

5755
var _ planNode = &deleteRangeNode{}
58-
var _ planNodeFastPath = &deleteRangeNode{}
59-
var _ batchedPlanNode = &deleteRangeNode{}
6056
var _ mutationPlanNode = &deleteRangeNode{}
6157

62-
// BatchedNext implements the batchedPlanNode interface.
63-
func (d *deleteRangeNode) BatchedNext(params runParams) (bool, error) {
64-
return false, nil
65-
}
66-
67-
// BatchedCount implements the batchedPlanNode interface.
68-
func (d *deleteRangeNode) BatchedCount() int {
69-
return d.rowCount
70-
}
71-
72-
// BatchedValues implements the batchedPlanNode interface.
73-
func (d *deleteRangeNode) BatchedValues(rowIdx int) tree.Datums {
74-
panic("invalid")
75-
}
76-
77-
// FastPathResults implements the planNodeFastPath interface.
78-
func (d *deleteRangeNode) FastPathResults() (int, bool) {
79-
return d.rowCount, true
58+
func (d *deleteRangeNode) rowsWritten() int64 {
59+
return d.rowsAffected()
8060
}
8161

82-
func (d *deleteRangeNode) rowsWritten() int64 {
83-
return int64(d.rowCount)
62+
func (d *deleteRangeNode) returnsRowsAffected() bool {
63+
// DeleteRange always returns the number of rows deleted.
64+
return true
8465
}
8566

8667
// startExec implements the planNode interface.
@@ -228,7 +209,7 @@ func (d *deleteRangeNode) processResults(
228209
k := keyBytes[:len(keyBytes)-len(after)]
229210
if !bytes.Equal(k, d.curRowPrefix) {
230211
d.curRowPrefix = k
231-
d.rowCount++
212+
d.incAffectedRows()
232213
}
233214
}
234215
if r.ResumeSpan != nil && r.ResumeSpan.Valid() {
@@ -239,15 +220,13 @@ func (d *deleteRangeNode) processResults(
239220
}
240221

241222
// Next implements the planNode interface.
242-
func (*deleteRangeNode) Next(params runParams) (bool, error) {
243-
// TODO(radu): this shouldn't be used, but it gets called when a cascade uses
244-
// delete-range. Investigate this.
245-
return false, nil
223+
func (d *deleteRangeNode) Next(params runParams) (bool, error) {
224+
return d.next(), nil
246225
}
247226

248227
// Values implements the planNode interface.
249-
func (*deleteRangeNode) Values() tree.Datums {
250-
panic("invalid")
228+
func (d *deleteRangeNode) Values() tree.Datums {
229+
return d.values()
251230
}
252231

253232
// Close implements the planNode interface.

0 commit comments

Comments
 (0)