Skip to content

Commit f5888e3

Browse files
craig[bot]DrewKimball
andcommitted
Merge #145934
145934: sql: refactor plan node control flow r=DrewKimball a=DrewKimball #### sql: change SetZoneConfig to return type Ack This commit changes the return type for `tree.SetZoneConfig` to `Ack` from `RowsAffected`. This mirrors similar statements, and the row count has been broken (always zero) ever since this statement was implemented in the declarative schema changer, anyway. Removing the need to track row count allows a few related functions to be simplified. Epic: None Release note: None #### sql: remove unnecessary nodes and interfaces related to mutation planNodes This commit adds two new helper structs for use in mutation nodes and other planNodes that need to output the number of affected rows: `mutationOutputHelper` and `rowsAffectedOutputHelper`. The former is now embedded into the mutation planNodes apart from `deleteRangeNode`, which embeds `rowsAffectedOutputHelper` instead. This commit also removes `spoolNode`, `serializeNode`, and `rowCount` node, with `mutationOutputHelper` replacing the work they used to do. This also allowed the removal of the `batchedPlanNode` interface and its methods. These changes simplify some factory logic, and should simplify the process of moving from `planNode` implementations to processors in the future. This change also fixes #139378, where the `spoolNode` added for a mutation with a RETURNING clause would break some of the EXPLAIN ANALYZE stats. Fixes #139378 Release note (bug fix): Previously, CockroachDB would omit execution statistics in EXPLAIN ANALYZE output for mutation nodes when a RETURNING clause was used. The bug has been present since before v21.1 and is now fixed. #### sql: remove planNodeFastPath interface for ops that return rows affected Previously, there were two ways that `planNode` implementations would propagate the number of rows affected (if using that output mode): 1. Implementing the `planNodeFastPath`, which provided a method to directly pull the cached row count. 2. A special loop in `planNodeToRowSource`, which simply called `Next()` repeatedly and counted the number of iterations. This commit takes advantage of work done in the previous commit for operators that need to return the number of rows affected. Now, the helper structs from the previous commit are used to track this count internally, and then return the final count through the usual `Next()` and `Values()` calls. This allows for the removal of special logic in `planNodeToRowSource`, as well as the `planNodeFastPath` interface. As with the previous commit, this one seeks to remove departures from the usual `Start()`, `Next()`, and `Values()` control flow, to make future refactors easier. Epic: None Release note: None Co-authored-by: Drew Kimball <[email protected]>
2 parents 71a8a9b + 1dbf9b1 commit f5888e3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+965
-1252
lines changed

pkg/cli/interactive_tests/test_missing_log_output.tcl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ interrupt_and_wait
5656
# Disable replication so as to avoid spurious purgatory errors.
5757
start_server $argv
5858
send "$argv sql --insecure -e \"ALTER RANGE default CONFIGURE ZONE USING num_replicas = 1\"\r"
59-
eexpect {CONFIGURE ZONE [0-9]}
59+
eexpect "CONFIGURE ZONE"
6060
eexpect ":/# "
6161
stop_server $argv
6262

pkg/sql/BUILD.bazel

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,9 +171,9 @@ go_library(
171171
"pg_extension.go",
172172
"pg_metadata_diff.go",
173173
"plan.go",
174-
"plan_batch.go",
175174
"plan_columns.go",
176175
"plan_names.go",
176+
"plan_node_output_helper.go",
177177
"plan_node_to_row_source.go",
178178
"plan_opt.go",
179179
"plan_ordering.go",
@@ -248,7 +248,6 @@ go_library(
248248
"show_zone_config.go",
249249
"sort.go",
250250
"split.go",
251-
"spool.go",
252251
"sql_activity_update_job.go",
253252
"sql_cursor.go",
254253
"statement.go",
@@ -722,6 +721,7 @@ go_test(
722721
"pg_metadata_test.go",
723722
"pg_oid_test.go",
724723
"pgwire_internal_test.go",
724+
"plan_node_output_helper_test.go",
725725
"plan_opt_test.go",
726726
"privileged_accessor_test.go",
727727
"region_util_test.go",
@@ -893,6 +893,7 @@ go_test(
893893
"//pkg/sql/randgen",
894894
"//pkg/sql/regions",
895895
"//pkg/sql/row",
896+
"//pkg/sql/rowcontainer",
896897
"//pkg/sql/rowenc",
897898
"//pkg/sql/rowenc/keyside",
898899
"//pkg/sql/rowenc/rowencpb",

pkg/sql/cancel_queries.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,23 @@ import (
1919

2020
type cancelQueriesNode struct {
2121
singleInputPlanNode
22+
rowsAffectedOutputHelper
2223
ifExists bool
2324
}
2425

25-
func (n *cancelQueriesNode) startExec(runParams) error {
26-
return nil
26+
// startExec implements the planNode interface.
27+
func (n *cancelQueriesNode) startExec(params runParams) error {
28+
// Execute the node to completion, keeping track of the affected row count.
29+
for {
30+
ok, err := n.cancelQuery(params)
31+
if !ok || err != nil {
32+
return err
33+
}
34+
n.incAffectedRows()
35+
}
2736
}
2837

29-
func (n *cancelQueriesNode) Next(params runParams) (bool, error) {
38+
func (n *cancelQueriesNode) cancelQuery(params runParams) (bool, error) {
3039
// TODO(knz): instead of performing the cancels sequentially,
3140
// accumulate all the query IDs and then send batches to each of the
3241
// nodes.
@@ -71,7 +80,15 @@ func (n *cancelQueriesNode) Next(params runParams) (bool, error) {
7180
return true, nil
7281
}
7382

74-
func (*cancelQueriesNode) Values() tree.Datums { return nil }
83+
// Next implements the planNode interface.
84+
func (n *cancelQueriesNode) Next(_ runParams) (bool, error) {
85+
return n.next(), nil
86+
}
87+
88+
// Values implements the planNode interface.
89+
func (n *cancelQueriesNode) Values() tree.Datums {
90+
return n.values()
91+
}
7592

7693
func (n *cancelQueriesNode) Close(ctx context.Context) {
7794
n.input.Close(ctx)

pkg/sql/cancel_sessions.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,23 @@ import (
1919

2020
type cancelSessionsNode struct {
2121
singleInputPlanNode
22+
rowsAffectedOutputHelper
2223
ifExists bool
2324
}
2425

25-
func (n *cancelSessionsNode) startExec(runParams) error {
26-
return nil
26+
// startExec implements the planNode interface.
27+
func (n *cancelSessionsNode) startExec(params runParams) error {
28+
// Execute the node to completion, keeping track of the affected row count.
29+
for {
30+
ok, err := n.cancelSession(params)
31+
if !ok || err != nil {
32+
return err
33+
}
34+
n.incAffectedRows()
35+
}
2736
}
2837

29-
func (n *cancelSessionsNode) Next(params runParams) (bool, error) {
38+
func (n *cancelSessionsNode) cancelSession(params runParams) (bool, error) {
3039
// TODO(knz): instead of performing the cancels sequentially,
3140
// accumulate all the query IDs and then send batches to each of the
3241
// nodes.
@@ -71,7 +80,15 @@ func (n *cancelSessionsNode) Next(params runParams) (bool, error) {
7180
return true, nil
7281
}
7382

74-
func (*cancelSessionsNode) Values() tree.Datums { return nil }
83+
// Next implements the planNode interface.
84+
func (n *cancelSessionsNode) Next(_ runParams) (bool, error) {
85+
return n.next(), nil
86+
}
87+
88+
// Values implements the planNode interface.
89+
func (n *cancelSessionsNode) Values() tree.Datums {
90+
return n.values()
91+
}
7592

7693
func (n *cancelSessionsNode) Close(ctx context.Context) {
7794
n.input.Close(ctx)

pkg/sql/colexec/columnarizer.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -211,18 +211,18 @@ func (c *Columnarizer) GetStats() *execinfrapb.ComponentStats {
211211
return s
212212
}
213213

214-
// IsFastPathNode returns true if the provided RowSource is the
215-
// planNodeToRowSource wrapper with "fast-path" enabled. The logic is injected
216-
// in the sql package to avoid an import cycle.
217-
var IsFastPathNode func(source execinfra.RowSource) bool
214+
// IsRowsAffectedNode returns true if the provided RowSource is the
215+
// planNodeToRowSource wrapper returning the number of affected rows. The logic
216+
// is injected from the sql package to avoid an import cycle.
217+
var IsRowsAffectedNode func(source execinfra.RowSource) bool
218218

219-
// IsColumnarizerAroundFastPathNode returns true if the provided Operator is a
220-
// Columnarizer that has "fast-path" enabled planNodeToRowSource wrapper as the
221-
// input.
222-
func IsColumnarizerAroundFastPathNode(o colexecop.Operator) bool {
219+
// IsColumnarizerAroundRowsAffectedNode returns true if the provided Operator is
220+
// a Columnarizer that has a planNodeToRowSource wrapper in "rows affected" mode
221+
// as the input.
222+
func IsColumnarizerAroundRowsAffectedNode(o colexecop.Operator) bool {
223223
o = MaybeUnwrapInvariantsChecker(o)
224224
c, ok := o.(*Columnarizer)
225-
return ok && IsFastPathNode(c.input)
225+
return ok && IsRowsAffectedNode(c.input)
226226
}
227227

228228
// Next is part of the colexecop.Operator interface.

pkg/sql/colflow/stats.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ type batchInfoCollector struct {
5757
// batch is the last batch returned by the wrapped operator.
5858
batch coldata.Batch
5959

60-
// rowCountFastPath is set to indicate that the input is expected to produce
60+
// rowsAffectedMode is set to indicate that the input is expected to produce
6161
// a single batch with a single column with the row count value.
62-
rowCountFastPath bool
62+
rowsAffectedMode bool
6363

6464
// stopwatch keeps track of the amount of time the wrapped operator spent
6565
// doing work. Note that this will include all of the time that the operator's
@@ -86,7 +86,7 @@ func makeBatchInfoCollector(
8686
return batchInfoCollector{
8787
OneInputNode: colexecop.NewOneInputNode(op),
8888
componentID: id,
89-
rowCountFastPath: colexec.IsColumnarizerAroundFastPathNode(op),
89+
rowsAffectedMode: colexec.IsColumnarizerAroundRowsAffectedNode(op),
9090
stopwatch: inputWatch,
9191
childStatsCollectors: childStatsCollectors,
9292
}
@@ -134,12 +134,12 @@ func (bic *batchInfoCollector) Next() coldata.Batch {
134134
bic.mu.Lock()
135135
defer bic.mu.Unlock()
136136
bic.mu.numBatches++
137-
if bic.rowCountFastPath {
137+
if bic.rowsAffectedMode {
138138
// We have a special case where the batch has exactly one column
139139
// with exactly one row in which we have the row count.
140140
if buildutil.CrdbTestBuild {
141141
if bic.mu.numBatches != 1 {
142-
colexecerror.InternalError(errors.AssertionFailedf("saw second batch in fast path:\n%s", bic.batch))
142+
colexecerror.InternalError(errors.AssertionFailedf("saw second batch in rows affected mode:\n%s", bic.batch))
143143
}
144144
if bic.batch.Width() != 1 {
145145
colexecerror.InternalError(errors.AssertionFailedf("batch width is not 1:\n%s", bic.batch))

pkg/sql/control_jobs.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ import (
2020

2121
type controlJobsNode struct {
2222
singleInputPlanNode
23+
rowsAffectedOutputHelper
2324
desiredStatus jobs.State
24-
numRows int
2525
reason string
2626
}
2727

@@ -31,11 +31,7 @@ var jobCommandToDesiredStatus = map[tree.JobCommand]jobs.State{
3131
tree.PauseJob: jobs.StatePaused,
3232
}
3333

34-
// FastPathResults implements the planNodeFastPath interface.
35-
func (n *controlJobsNode) FastPathResults() (int, bool) {
36-
return n.numRows, true
37-
}
38-
34+
// startExec implements the planNode interface.
3935
func (n *controlJobsNode) startExec(params runParams) error {
4036
if n.desiredStatus != jobs.StatePaused && len(n.reason) > 0 {
4137
return errors.AssertionFailedf("status %v is not %v and thus does not support a reason %v",
@@ -86,7 +82,7 @@ func (n *controlJobsNode) startExec(params runParams) error {
8682
return err
8783
}
8884

89-
n.numRows++
85+
n.incAffectedRows()
9086
}
9187
switch n.desiredStatus {
9288
case jobs.StatePaused:
@@ -99,9 +95,15 @@ func (n *controlJobsNode) startExec(params runParams) error {
9995
return nil
10096
}
10197

102-
func (*controlJobsNode) Next(runParams) (bool, error) { return false, nil }
98+
// Next implements the planNode interface.
99+
func (n *controlJobsNode) Next(_ runParams) (bool, error) {
100+
return n.next(), nil
101+
}
103102

104-
func (*controlJobsNode) Values() tree.Datums { return nil }
103+
// Values implements the planNode interface.
104+
func (n *controlJobsNode) Values() tree.Datums {
105+
return n.values()
106+
}
105107

106108
func (n *controlJobsNode) Close(ctx context.Context) {
107109
n.input.Close(ctx)

pkg/sql/control_schedules.go

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ import (
2626

2727
type controlSchedulesNode struct {
2828
singleInputPlanNode
29+
rowsAffectedOutputHelper
2930
command tree.ScheduleCommand
30-
numRows int
3131
}
3232

3333
func collectTelemetry(command tree.ScheduleCommand) {
@@ -41,11 +41,6 @@ func collectTelemetry(command tree.ScheduleCommand) {
4141
}
4242
}
4343

44-
// FastPathResults implements the planNodeFastPath interface.
45-
func (n *controlSchedulesNode) FastPathResults() (int, bool) {
46-
return n.numRows, true
47-
}
48-
4944
// JobSchedulerEnv returns JobSchedulerEnv.
5045
func JobSchedulerEnv(knobs *jobs.TestingKnobs) scheduledjobs.JobSchedulerEnv {
5146
if knobs != nil && knobs.JobSchedulerEnv != nil {
@@ -175,7 +170,7 @@ func (n *controlSchedulesNode) startExec(params runParams) error {
175170
if err != nil {
176171
return errors.Wrap(err, "failed to run OnDrop")
177172
}
178-
n.numRows += additionalDroppedSchedules
173+
n.addAffectedRows(additionalDroppedSchedules)
179174
}
180175
err = DeleteSchedule(
181176
params.ctx, params.ExecCfg(), params.p.InternalSQLTxn(),
@@ -189,17 +184,21 @@ func (n *controlSchedulesNode) startExec(params runParams) error {
189184
if err != nil {
190185
return err
191186
}
192-
n.numRows++
187+
n.incAffectedRows()
193188
}
194189

195190
return nil
196191
}
197192

198-
// Next implements planNode interface.
199-
func (*controlSchedulesNode) Next(runParams) (bool, error) { return false, nil }
193+
// Next implements the planNode interface.
194+
func (n *controlSchedulesNode) Next(_ runParams) (bool, error) {
195+
return n.next(), nil
196+
}
200197

201-
// Values implements planNode interface.
202-
func (*controlSchedulesNode) Values() tree.Datums { return nil }
198+
// Values implements the planNode interface.
199+
func (n *controlSchedulesNode) Values() tree.Datums {
200+
return n.values()
201+
}
203202

204203
// Close implements planNode interface.
205204
func (n *controlSchedulesNode) Close(ctx context.Context) {

pkg/sql/create_table.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,6 @@ func (n *createTableNode) startExec(params runParams) error {
570570
ti := tableInserterPool.Get().(*tableInserter)
571571
*ti = tableInserter{ri: ri}
572572
defer func() {
573-
ti.close(params.ctx)
574573
*ti = tableInserter{}
575574
tableInserterPool.Put(ti)
576575
}()

pkg/sql/database_region_change_finalizer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func (r *databaseRegionChangeFinalizer) preDrop(ctx context.Context, txn descs.T
115115
return err
116116
}
117117
for _, update := range zoneConfigUpdates {
118-
if _, err := writeZoneConfigUpdate(
118+
if err = writeZoneConfigUpdate(
119119
ctx, txn,
120120
r.localPlanner.ExtendedEvalContext().Tracing.KVTracingEnabled(),
121121
update,

0 commit comments

Comments
 (0)