Skip to content

Commit 1dbf9b1

Browse files
committed
sql: remove planNodeFastPath interface for ops that return rows affected
Previously, there were two ways that `planNode` implementations would propagate the number of rows affected (if using that output mode): 1. Implementing the `planNodeFastPath`, which provided a method to directly pull the cached row count. 2. A special loop in `planNodeToRowSource`, which simply called `Next()` repeatedly and counted the number of iterations. This commit takes advantage of work done in the previous commit for operators that need to return the number of rows affected. Now, the helper structs from the previous commit are used to track this count internally, and then return the final count through the usual `Next()` and `Values()` calls. This allows for the removal of special logic in `planNodeToRowSource`, as well as the `planNodeFastPath` interface. As with the previous commit, this one seeks to remove departures from the usual `Start()`, `Next()`, and `Values()` control flow, to make future refactors easier. Epic: None Release note: None
1 parent 6ad5b43 commit 1dbf9b1

15 files changed

+229
-189
lines changed

pkg/sql/cancel_queries.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,23 @@ import (
1919

2020
type cancelQueriesNode struct {
2121
singleInputPlanNode
22+
rowsAffectedOutputHelper
2223
ifExists bool
2324
}
2425

25-
func (n *cancelQueriesNode) startExec(runParams) error {
26-
return nil
26+
// startExec implements the planNode interface.
27+
func (n *cancelQueriesNode) startExec(params runParams) error {
28+
// Execute the node to completion, keeping track of the affected row count.
29+
for {
30+
ok, err := n.cancelQuery(params)
31+
if !ok || err != nil {
32+
return err
33+
}
34+
n.incAffectedRows()
35+
}
2736
}
2837

29-
func (n *cancelQueriesNode) Next(params runParams) (bool, error) {
38+
func (n *cancelQueriesNode) cancelQuery(params runParams) (bool, error) {
3039
// TODO(knz): instead of performing the cancels sequentially,
3140
// accumulate all the query IDs and then send batches to each of the
3241
// nodes.
@@ -71,7 +80,15 @@ func (n *cancelQueriesNode) Next(params runParams) (bool, error) {
7180
return true, nil
7281
}
7382

74-
func (*cancelQueriesNode) Values() tree.Datums { return nil }
83+
// Next implements the planNode interface.
84+
func (n *cancelQueriesNode) Next(_ runParams) (bool, error) {
85+
return n.next(), nil
86+
}
87+
88+
// Values implements the planNode interface.
89+
func (n *cancelQueriesNode) Values() tree.Datums {
90+
return n.values()
91+
}
7592

7693
func (n *cancelQueriesNode) Close(ctx context.Context) {
7794
n.input.Close(ctx)

pkg/sql/cancel_sessions.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,23 @@ import (
1919

2020
type cancelSessionsNode struct {
2121
singleInputPlanNode
22+
rowsAffectedOutputHelper
2223
ifExists bool
2324
}
2425

25-
func (n *cancelSessionsNode) startExec(runParams) error {
26-
return nil
26+
// startExec implements the planNode interface.
27+
func (n *cancelSessionsNode) startExec(params runParams) error {
28+
// Execute the node to completion, keeping track of the affected row count.
29+
for {
30+
ok, err := n.cancelSession(params)
31+
if !ok || err != nil {
32+
return err
33+
}
34+
n.incAffectedRows()
35+
}
2736
}
2837

29-
func (n *cancelSessionsNode) Next(params runParams) (bool, error) {
38+
func (n *cancelSessionsNode) cancelSession(params runParams) (bool, error) {
3039
// TODO(knz): instead of performing the cancels sequentially,
3140
// accumulate all the query IDs and then send batches to each of the
3241
// nodes.
@@ -71,7 +80,15 @@ func (n *cancelSessionsNode) Next(params runParams) (bool, error) {
7180
return true, nil
7281
}
7382

74-
func (*cancelSessionsNode) Values() tree.Datums { return nil }
83+
// Next implements the planNode interface.
84+
func (n *cancelSessionsNode) Next(_ runParams) (bool, error) {
85+
return n.next(), nil
86+
}
87+
88+
// Values implements the planNode interface.
89+
func (n *cancelSessionsNode) Values() tree.Datums {
90+
return n.values()
91+
}
7592

7693
func (n *cancelSessionsNode) Close(ctx context.Context) {
7794
n.input.Close(ctx)

pkg/sql/colexec/columnarizer.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -211,18 +211,18 @@ func (c *Columnarizer) GetStats() *execinfrapb.ComponentStats {
211211
return s
212212
}
213213

214-
// IsFastPathNode returns true if the provided RowSource is the
215-
// planNodeToRowSource wrapper with "fast-path" enabled. The logic is injected
216-
// in the sql package to avoid an import cycle.
217-
var IsFastPathNode func(source execinfra.RowSource) bool
214+
// IsRowsAffectedNode returns true if the provided RowSource is the
215+
// planNodeToRowSource wrapper returning the number of affected rows. The logic
216+
// is injected from the sql package to avoid an import cycle.
217+
var IsRowsAffectedNode func(source execinfra.RowSource) bool
218218

219-
// IsColumnarizerAroundFastPathNode returns true if the provided Operator is a
220-
// Columnarizer that has "fast-path" enabled planNodeToRowSource wrapper as the
221-
// input.
222-
func IsColumnarizerAroundFastPathNode(o colexecop.Operator) bool {
219+
// IsColumnarizerAroundRowsAffectedNode returns true if the provided Operator is
220+
// a Columnarizer that has a planNodeToRowSource wrapper in "rows affected" mode
221+
// as the input.
222+
func IsColumnarizerAroundRowsAffectedNode(o colexecop.Operator) bool {
223223
o = MaybeUnwrapInvariantsChecker(o)
224224
c, ok := o.(*Columnarizer)
225-
return ok && IsFastPathNode(c.input)
225+
return ok && IsRowsAffectedNode(c.input)
226226
}
227227

228228
// Next is part of the colexecop.Operator interface.

pkg/sql/colflow/stats.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ type batchInfoCollector struct {
5757
// batch is the last batch returned by the wrapped operator.
5858
batch coldata.Batch
5959

60-
// rowCountFastPath is set to indicate that the input is expected to produce
60+
// rowsAffectedMode is set to indicate that the input is expected to produce
6161
// a single batch with a single column with the row count value.
62-
rowCountFastPath bool
62+
rowsAffectedMode bool
6363

6464
// stopwatch keeps track of the amount of time the wrapped operator spent
6565
// doing work. Note that this will include all of the time that the operator's
@@ -86,7 +86,7 @@ func makeBatchInfoCollector(
8686
return batchInfoCollector{
8787
OneInputNode: colexecop.NewOneInputNode(op),
8888
componentID: id,
89-
rowCountFastPath: colexec.IsColumnarizerAroundFastPathNode(op),
89+
rowsAffectedMode: colexec.IsColumnarizerAroundRowsAffectedNode(op),
9090
stopwatch: inputWatch,
9191
childStatsCollectors: childStatsCollectors,
9292
}
@@ -134,12 +134,12 @@ func (bic *batchInfoCollector) Next() coldata.Batch {
134134
bic.mu.Lock()
135135
defer bic.mu.Unlock()
136136
bic.mu.numBatches++
137-
if bic.rowCountFastPath {
137+
if bic.rowsAffectedMode {
138138
// We have a special case where the batch has exactly one column
139139
// with exactly one row in which we have the row count.
140140
if buildutil.CrdbTestBuild {
141141
if bic.mu.numBatches != 1 {
142-
colexecerror.InternalError(errors.AssertionFailedf("saw second batch in fast path:\n%s", bic.batch))
142+
colexecerror.InternalError(errors.AssertionFailedf("saw second batch in rows affected mode:\n%s", bic.batch))
143143
}
144144
if bic.batch.Width() != 1 {
145145
colexecerror.InternalError(errors.AssertionFailedf("batch width is not 1:\n%s", bic.batch))

pkg/sql/control_jobs.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ import (
2020

2121
type controlJobsNode struct {
2222
singleInputPlanNode
23+
rowsAffectedOutputHelper
2324
desiredStatus jobs.State
24-
numRows int
2525
reason string
2626
}
2727

@@ -31,11 +31,7 @@ var jobCommandToDesiredStatus = map[tree.JobCommand]jobs.State{
3131
tree.PauseJob: jobs.StatePaused,
3232
}
3333

34-
// FastPathResults implements the planNodeFastPath interface.
35-
func (n *controlJobsNode) FastPathResults() (int, bool) {
36-
return n.numRows, true
37-
}
38-
34+
// startExec implements the planNode interface.
3935
func (n *controlJobsNode) startExec(params runParams) error {
4036
if n.desiredStatus != jobs.StatePaused && len(n.reason) > 0 {
4137
return errors.AssertionFailedf("status %v is not %v and thus does not support a reason %v",
@@ -86,7 +82,7 @@ func (n *controlJobsNode) startExec(params runParams) error {
8682
return err
8783
}
8884

89-
n.numRows++
85+
n.incAffectedRows()
9086
}
9187
switch n.desiredStatus {
9288
case jobs.StatePaused:
@@ -99,9 +95,15 @@ func (n *controlJobsNode) startExec(params runParams) error {
9995
return nil
10096
}
10197

102-
func (*controlJobsNode) Next(runParams) (bool, error) { return false, nil }
98+
// Next implements the planNode interface.
99+
func (n *controlJobsNode) Next(_ runParams) (bool, error) {
100+
return n.next(), nil
101+
}
103102

104-
func (*controlJobsNode) Values() tree.Datums { return nil }
103+
// Values implements the planNode interface.
104+
func (n *controlJobsNode) Values() tree.Datums {
105+
return n.values()
106+
}
105107

106108
func (n *controlJobsNode) Close(ctx context.Context) {
107109
n.input.Close(ctx)

pkg/sql/control_schedules.go

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ import (
2626

2727
type controlSchedulesNode struct {
2828
singleInputPlanNode
29+
rowsAffectedOutputHelper
2930
command tree.ScheduleCommand
30-
numRows int
3131
}
3232

3333
func collectTelemetry(command tree.ScheduleCommand) {
@@ -41,11 +41,6 @@ func collectTelemetry(command tree.ScheduleCommand) {
4141
}
4242
}
4343

44-
// FastPathResults implements the planNodeFastPath interface.
45-
func (n *controlSchedulesNode) FastPathResults() (int, bool) {
46-
return n.numRows, true
47-
}
48-
4944
// JobSchedulerEnv returns JobSchedulerEnv.
5045
func JobSchedulerEnv(knobs *jobs.TestingKnobs) scheduledjobs.JobSchedulerEnv {
5146
if knobs != nil && knobs.JobSchedulerEnv != nil {
@@ -175,7 +170,7 @@ func (n *controlSchedulesNode) startExec(params runParams) error {
175170
if err != nil {
176171
return errors.Wrap(err, "failed to run OnDrop")
177172
}
178-
n.numRows += additionalDroppedSchedules
173+
n.addAffectedRows(additionalDroppedSchedules)
179174
}
180175
err = DeleteSchedule(
181176
params.ctx, params.ExecCfg(), params.p.InternalSQLTxn(),
@@ -189,17 +184,21 @@ func (n *controlSchedulesNode) startExec(params runParams) error {
189184
if err != nil {
190185
return err
191186
}
192-
n.numRows++
187+
n.incAffectedRows()
193188
}
194189

195190
return nil
196191
}
197192

198-
// Next implements planNode interface.
199-
func (*controlSchedulesNode) Next(runParams) (bool, error) { return false, nil }
193+
// Next implements the planNode interface.
194+
func (n *controlSchedulesNode) Next(_ runParams) (bool, error) {
195+
return n.next(), nil
196+
}
200197

201-
// Values implements planNode interface.
202-
func (*controlSchedulesNode) Values() tree.Datums { return nil }
198+
// Values implements the planNode interface.
199+
func (n *controlSchedulesNode) Values() tree.Datums {
200+
return n.values()
201+
}
203202

204203
// Close implements planNode interface.
205204
func (n *controlSchedulesNode) Close(ctx context.Context) {

pkg/sql/distsql_physical_planner.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4285,8 +4285,6 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode(
42854285
func (dsp *DistSQLPlanner) wrapPlan(
42864286
ctx context.Context, planCtx *PlanningCtx, n planNode, allowPartialDistribution bool,
42874287
) (*PhysicalPlan, error) {
4288-
useFastPath := planCtx.planDepth == 1 && planCtx.stmtType == tree.RowsAffected
4289-
42904288
// First, we search the planNode tree we're trying to wrap for the first
42914289
// DistSQL-enabled planNode in the tree. If we find one, we ask the planner to
42924290
// continue the DistSQL planning recursion on that planNode.
@@ -4341,20 +4339,19 @@ func (dsp *DistSQLPlanner) wrapPlan(
43414339

43424340
// Copy the evalCtx.
43434341
evalCtx := *planCtx.ExtendedEvalCtx
4344-
// We permit the planNodeToRowSource to trigger the wrapped planNode's fast
4345-
// path if its the very first node in the flow, and if the statement type we're
4346-
// expecting is in fact RowsAffected. RowsAffected statements return a single
4347-
// row with the number of rows affected by the statement, and are the only
4348-
// types of statement where it's valid to invoke a plan's fast path.
43494342
wrapper := newPlanNodeToRowSource(
43504343
n,
43514344
runParams{
43524345
extendedEvalCtx: &evalCtx,
43534346
p: planCtx.planner,
43544347
},
4355-
useFastPath,
43564348
firstNotWrapped,
43574349
)
4350+
if !wrapper.rowsAffected && planCtx.planDepth == 1 && planCtx.stmtType == tree.RowsAffected {
4351+
// Return an error if the receiver expects to get the number of rows
4352+
// affected, but the planNode returns something else.
4353+
return nil, errors.AssertionFailedf("planNode %T should return rows affected", n)
4354+
}
43584355

43594356
localProcIdx := p.AddLocalProcessor(wrapper)
43604357
var input []execinfrapb.InputSyncSpec

pkg/sql/opaque.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ func planOpaque(ctx context.Context, p *planner, stmt tree.Statement) (planNode,
238238
case *tree.Inspect:
239239
return p.Inspect(ctx, n)
240240
case *tree.MoveCursor:
241-
return p.FetchCursor(ctx, &n.CursorStmt)
241+
return p.MoveCursor(ctx, &n.CursorStmt)
242242
case *tree.ReassignOwnedBy:
243243
return p.ReassignOwnedBy(ctx, n)
244244
case *tree.RefreshMaterializedView:

pkg/sql/opt/exec/execbuilder/testdata/unique

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2679,7 +2679,7 @@ quality of service: regular
26792679
│ ├── • update
26802680
│ │ │ sql nodes: <hidden>
26812681
│ │ │ regions: <hidden>
2682-
│ │ │ actual row count: 1
2682+
│ │ │ actual row count: 2
26832683
│ │ │ execution time: 0µs
26842684
│ │ │ table: uniq_fk_child
26852685
│ │ │ set: b, c

pkg/sql/plan.go

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,7 @@ type planNode interface {
7575
// of results each time that Next() returns true.
7676
//
7777
// Available after startPlan(). It is illegal to call Next() after it returns
78-
// false. It is legal to call Next() even if the node implements
79-
// planNodeFastPath and the FastPathResults() method returns true.
78+
// false.
8079
Next(params runParams) (bool, error)
8180

8281
// Values returns the values at the current row. The result is only valid
@@ -156,17 +155,6 @@ type mutationPlanNode interface {
156155
returnsRowsAffected() bool
157156
}
158157

159-
// planNodeFastPath is implemented by nodes that can perform all their
160-
// work during startPlan(), possibly affecting even multiple rows. For
161-
// example, DELETE can do this.
162-
type planNodeFastPath interface {
163-
// FastPathResults returns the affected row count and true if the
164-
// node has no result set and has already executed when startPlan() completes.
165-
// Note that Next() must still be valid even if this method returns
166-
// true, although it may have nothing left to do.
167-
FastPathResults() (int, bool)
168-
}
169-
170158
// planNodeReadingOwnWrites can be implemented by planNodes which do
171159
// not use the standard SQL principle of reading at the snapshot
172160
// established at the start of the transaction. It requests that
@@ -271,9 +259,6 @@ var _ planNode = &virtualTableNode{}
271259
var _ planNode = &windowNode{}
272260
var _ planNode = &zeroNode{}
273261

274-
var _ planNodeFastPath = &controlJobsNode{}
275-
var _ planNodeFastPath = &controlSchedulesNode{}
276-
277262
var _ planNodeReadingOwnWrites = &alterIndexNode{}
278263
var _ planNodeReadingOwnWrites = &alterSchemaNode{}
279264
var _ planNodeReadingOwnWrites = &alterSequenceNode{}

0 commit comments

Comments
 (0)