Skip to content

Commit cb5c026

Browse files
craig[bot]normanchennmichae2
committed
144054: jsonpath: optimize executeAnyItem by iterating over JSON containers r=normanchenn a=normanchenn This commit replaces the usage of `AllPathsWithDepth` with direct iteration over JSON arrays and objects in `executeAnyItem`. This avoids unnecessary JSON decoding of the entire object structure (`jsonEncoded.decode()`) when only direct children are needed. Preliminary testing shows this change reduces execution time for queries with any array/object unwrapping by approximately 50%. Epic: None Release note: None 144071: sql: refactor updateRun, upsertRun, and deleteRun r=yuzefovich a=michae2 **sql: bring back resultRowBuffer in updateRun and deleteRun** For some reason, updateRun and deleteRun differ from insertRun in that they were creating result slices for RETURNING in processSourceRow, instead of creating a resultRowBuffer in startExec. This commit does a small refactor to move allocation of resultRowBuffer to startExec. This refactor will make it easier for the new UPDATE and DELETE fast path planNodes to share updateRun and deleteRun with the existing updateNode and deleteNode. Epic: None Release note: None --- **sql: move processSourceRow to updateRun, upsertRun, and deleteRun** Refactor processSourceRow so that it's a method of updateRun, upsertRun, and deleteRun, rather than a method of updateNode, upsertNode, and deleteNode, respectively. This matches insertRun. This refactor will make it easier for the new UPDATE and DELETE fast path planNodes to share updateRun and deleteRun with the existing updateNode and deleteNode. (There's no new fast path in the works for UPSERT, at least not yet, but upsertRun is changed for completeness.) Epic: None Release note: None Co-authored-by: Norman Chen <[email protected]> Co-authored-by: Michael Erickson <[email protected]>
3 parents 06a3a90 + b46bb94 + 5036c14 commit cb5c026

File tree

4 files changed

+132
-109
lines changed

4 files changed

+132
-109
lines changed

pkg/sql/delete.go

Lines changed: 41 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ type deleteNode struct {
3232
run deleteRun
3333
}
3434

35+
var _ mutationPlanNode = &deleteNode{}
36+
3537
// deleteRun contains the run-time state of deleteNode during local execution.
3638
type deleteRun struct {
3739
td tableDeleter
@@ -41,6 +43,10 @@ type deleteRun struct {
4143
// to BatchedNext() has completed the work already.
4244
done bool
4345

46+
// resultRowBuffer is used to prepare a result row for accumulation
47+
// into the row container above, when rowsNeeded is set.
48+
resultRowBuffer tree.Datums
49+
4450
// traceKV caches the current KV tracing flag.
4551
traceKV bool
4652

@@ -57,17 +63,26 @@ type deleteRun struct {
5763
numPassthrough int
5864
}
5965

60-
var _ mutationPlanNode = &deleteNode{}
66+
func (r *deleteRun) initRowContainer(params runParams, columns colinfo.ResultColumns) {
67+
if !r.rowsNeeded {
68+
return
69+
}
70+
r.td.rows = rowcontainer.NewRowContainer(
71+
params.p.Mon().MakeBoundAccount(),
72+
colinfo.ColTypeInfoFromResCols(columns),
73+
)
74+
r.resultRowBuffer = make([]tree.Datum, len(columns))
75+
for i := range r.resultRowBuffer {
76+
r.resultRowBuffer[i] = tree.DNull
77+
}
78+
}
6179

6280
func (d *deleteNode) startExec(params runParams) error {
6381
// cache traceKV during execution, to avoid re-evaluating it for every row.
6482
d.run.traceKV = params.p.ExtendedEvalContext().Tracing.KVTracingEnabled()
6583

66-
if d.run.rowsNeeded {
67-
d.run.td.rows = rowcontainer.NewRowContainer(
68-
params.p.Mon().MakeBoundAccount(),
69-
colinfo.ColTypeInfoFromResCols(d.columns))
70-
}
84+
d.run.initRowContainer(params, d.columns)
85+
7186
return d.run.td.init(params.ctx, params.p.txn, params.EvalContext())
7287
}
7388

@@ -107,7 +122,7 @@ func (d *deleteNode) BatchedNext(params runParams) (bool, error) {
107122

108123
// Process the deletion of the current input row,
109124
// potentially accumulating the result row for later.
110-
if err := d.processSourceRow(params, d.input.Values()); err != nil {
125+
if err := d.run.processSourceRow(params, d.input.Values()); err != nil {
111126
return false, err
112127
}
113128

@@ -145,18 +160,18 @@ func (d *deleteNode) BatchedNext(params runParams) (bool, error) {
145160

146161
// processSourceRow processes one row from the source for deletion and, if
147162
// result rows are needed, saves it in the result row container
148-
func (d *deleteNode) processSourceRow(params runParams, sourceVals tree.Datums) error {
163+
func (r *deleteRun) processSourceRow(params runParams, sourceVals tree.Datums) error {
149164
// Remove extra columns for partial index predicate values and AFTER triggers.
150-
deleteVals := sourceVals[:len(d.run.td.rd.FetchCols)+d.run.numPassthrough]
165+
deleteVals := sourceVals[:len(r.td.rd.FetchCols)+r.numPassthrough]
151166
sourceVals = sourceVals[len(deleteVals):]
152167

153168
// Create a set of partial index IDs to not delete from. Indexes should not
154169
// be deleted from when they are partial indexes and the row does not
155170
// satisfy the predicate and therefore do not exist in the partial index.
156171
// This set is passed as a argument to tableDeleter.row below.
157172
var pm row.PartialIndexUpdateHelper
158-
if n := len(d.run.td.tableDesc().PartialIndexes()); n > 0 {
159-
err := pm.Init(nil /* partialIndexPutVals */, sourceVals[:n], d.run.td.tableDesc())
173+
if n := len(r.td.tableDesc().PartialIndexes()); n > 0 {
174+
err := pm.Init(nil /* partialIndexPutVals */, sourceVals[:n], r.td.tableDesc())
160175
if err != nil {
161176
return err
162177
}
@@ -166,53 +181,52 @@ func (d *deleteNode) processSourceRow(params runParams, sourceVals tree.Datums)
166181
// Keep track of the vector index partitions to update. This information is
167182
// passed to tableInserter.row below.
168183
var vh row.VectorIndexUpdateHelper
169-
if n := len(d.run.td.tableDesc().VectorIndexes()); n > 0 {
170-
vh.InitForDel(sourceVals[:n], d.run.td.tableDesc())
184+
if n := len(r.td.tableDesc().VectorIndexes()); n > 0 {
185+
vh.InitForDel(sourceVals[:n], r.td.tableDesc())
171186
}
172187

173188
// Queue the deletion in the KV batch.
174-
if err := d.run.td.row(
175-
params.ctx, deleteVals, pm, vh, false /* mustValidateOldPKValues */, d.run.traceKV,
189+
if err := r.td.row(
190+
params.ctx, deleteVals, pm, vh, false /* mustValidateOldPKValues */, r.traceKV,
176191
); err != nil {
177192
return err
178193
}
179194

180195
// If result rows need to be accumulated, do it.
181-
if d.run.td.rows != nil {
196+
if r.td.rows != nil {
182197
// The new values can include all columns, so the values may contain
183198
// additional columns for every newly dropped column not visible. We do not
184199
// want them to be available for RETURNING.
185200
//
186-
// d.run.rows.NumCols() is guaranteed to only contain the requested
201+
// r.rows.NumCols() is guaranteed to only contain the requested
187202
// public columns.
188-
resultValues := make(tree.Datums, d.run.td.rows.NumCols())
189203
largestRetIdx := -1
190-
for i := range d.run.rowIdxToRetIdx {
191-
retIdx := d.run.rowIdxToRetIdx[i]
204+
for i := range r.rowIdxToRetIdx {
205+
retIdx := r.rowIdxToRetIdx[i]
192206
if retIdx >= 0 {
193207
if retIdx >= largestRetIdx {
194208
largestRetIdx = retIdx
195209
}
196-
resultValues[retIdx] = deleteVals[i]
210+
r.resultRowBuffer[retIdx] = deleteVals[i]
197211
}
198212
}
199213

200214
// At this point we've extracted all the RETURNING values that are part
201215
// of the target table. We must now extract the columns in the RETURNING
202216
// clause that refer to other tables (from the USING clause of the delete).
203-
if d.run.numPassthrough > 0 {
204-
passthroughBegin := len(d.run.td.rd.FetchCols)
205-
passthroughEnd := passthroughBegin + d.run.numPassthrough
217+
if r.numPassthrough > 0 {
218+
passthroughBegin := len(r.td.rd.FetchCols)
219+
passthroughEnd := passthroughBegin + r.numPassthrough
206220
passthroughValues := deleteVals[passthroughBegin:passthroughEnd]
207221

208-
for i := 0; i < d.run.numPassthrough; i++ {
222+
for i := 0; i < r.numPassthrough; i++ {
209223
largestRetIdx++
210-
resultValues[largestRetIdx] = passthroughValues[i]
224+
r.resultRowBuffer[largestRetIdx] = passthroughValues[i]
211225
}
212226

213227
}
214228

215-
if _, err := d.run.td.rows.AddRow(params.ctx, resultValues); err != nil {
229+
if _, err := r.td.rows.AddRow(params.ctx, r.resultRowBuffer); err != nil {
216230
return err
217231
}
218232
}

pkg/sql/update.go

Lines changed: 44 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ type updateRun struct {
4848
// BatchedNext() has completed the work already.
4949
done bool
5050

51+
// resultRowBuffer is used to prepare a result row for accumulation
52+
// into the row container above, when rowsNeeded is set.
53+
resultRowBuffer tree.Datums
54+
5155
// traceKV caches the current KV tracing flag.
5256
traceKV bool
5357

@@ -68,16 +72,26 @@ type updateRun struct {
6872
regionLocalInfo regionLocalInfoType
6973
}
7074

75+
func (r *updateRun) initRowContainer(params runParams, columns colinfo.ResultColumns) {
76+
if !r.rowsNeeded {
77+
return
78+
}
79+
r.tu.rows = rowcontainer.NewRowContainer(
80+
params.p.Mon().MakeBoundAccount(),
81+
colinfo.ColTypeInfoFromResCols(columns),
82+
)
83+
r.resultRowBuffer = make([]tree.Datum, len(columns))
84+
for i := range r.resultRowBuffer {
85+
r.resultRowBuffer[i] = tree.DNull
86+
}
87+
}
88+
7189
func (u *updateNode) startExec(params runParams) error {
7290
// cache traceKV during execution, to avoid re-evaluating it for every row.
7391
u.run.traceKV = params.p.ExtendedEvalContext().Tracing.KVTracingEnabled()
7492

75-
if u.run.rowsNeeded {
76-
u.run.tu.rows = rowcontainer.NewRowContainer(
77-
params.p.Mon().MakeBoundAccount(),
78-
colinfo.ColTypeInfoFromResCols(u.columns),
79-
)
80-
}
93+
u.run.initRowContainer(params, u.columns)
94+
8195
return u.run.tu.init(params.ctx, params.p.txn, params.EvalContext())
8296
}
8397

@@ -118,7 +132,7 @@ func (u *updateNode) BatchedNext(params runParams) (bool, error) {
118132

119133
// Process the update for the current input row, potentially
120134
// accumulating the result row for later.
121-
if err := u.processSourceRow(params, u.input.Values()); err != nil {
135+
if err := u.run.processSourceRow(params, u.input.Values()); err != nil {
122136
return false, err
123137
}
124138

@@ -156,7 +170,7 @@ func (u *updateNode) BatchedNext(params runParams) (bool, error) {
156170

157171
// processSourceRow processes one row from the source for update and, if
158172
// result rows are needed, saves it in the result row container.
159-
func (u *updateNode) processSourceRow(params runParams, sourceVals tree.Datums) error {
173+
func (r *updateRun) processSourceRow(params runParams, sourceVals tree.Datums) error {
160174
// sourceVals contains values for the columns from the table, in the order of the
161175
// table descriptor. (One per column in u.tw.ru.FetchCols)
162176
//
@@ -166,42 +180,42 @@ func (u *updateNode) processSourceRow(params runParams, sourceVals tree.Datums)
166180
// oldValues is the prefix of sourceVals that corresponds to real
167181
// stored columns in the table, that is, excluding the RHS assignment
168182
// expressions.
169-
oldValues := sourceVals[:len(u.run.tu.ru.FetchCols)]
183+
oldValues := sourceVals[:len(r.tu.ru.FetchCols)]
170184
sourceVals = sourceVals[len(oldValues):]
171185

172186
// The update values follow the fetch values and their order corresponds to the order of ru.UpdateCols.
173-
updateValues := sourceVals[:len(u.run.tu.ru.UpdateCols)]
187+
updateValues := sourceVals[:len(r.tu.ru.UpdateCols)]
174188
sourceVals = sourceVals[len(updateValues):]
175189

176190
// The passthrough values follow the update values.
177-
passthroughValues := sourceVals[:u.run.numPassthrough]
191+
passthroughValues := sourceVals[:r.numPassthrough]
178192
sourceVals = sourceVals[len(passthroughValues):]
179193

180194
// Verify the schema constraints. For consistency with INSERT/UPSERT
181195
// and compatibility with PostgreSQL, we must do this before
182196
// processing the CHECK constraints.
183-
if err := enforceNotNullConstraints(updateValues, u.run.tu.ru.UpdateCols); err != nil {
197+
if err := enforceNotNullConstraints(updateValues, r.tu.ru.UpdateCols); err != nil {
184198
return err
185199
}
186200

187201
// Run the CHECK constraints, if any. CheckHelper will either evaluate the
188202
// constraints itself, or else inspect boolean columns from the input that
189203
// contain the results of evaluation.
190-
if !u.run.checkOrds.Empty() {
204+
if !r.checkOrds.Empty() {
191205
if err := checkMutationInput(
192206
params.ctx, params.EvalContext(), &params.p.semaCtx, params.p.SessionData(),
193-
u.run.tu.tableDesc(), u.run.checkOrds, sourceVals[:u.run.checkOrds.Len()],
207+
r.tu.tableDesc(), r.checkOrds, sourceVals[:r.checkOrds.Len()],
194208
); err != nil {
195209
return err
196210
}
197-
sourceVals = sourceVals[u.run.checkOrds.Len():]
211+
sourceVals = sourceVals[r.checkOrds.Len():]
198212
}
199213

200214
// Create a set of partial index IDs to not add entries or remove entries
201215
// from. Put values are followed by del values.
202216
var pm row.PartialIndexUpdateHelper
203-
if n := len(u.run.tu.tableDesc().PartialIndexes()); n > 0 {
204-
err := pm.Init(sourceVals[:n], sourceVals[n:n*2], u.run.tu.tableDesc())
217+
if n := len(r.tu.tableDesc().PartialIndexes()); n > 0 {
218+
err := pm.Init(sourceVals[:n], sourceVals[n:n*2], r.tu.tableDesc())
205219
if err != nil {
206220
return err
207221
}
@@ -213,54 +227,53 @@ func (u *updateNode) processSourceRow(params runParams, sourceVals tree.Datums)
213227
// Order of column values is put partitions, quantized vectors, followed by
214228
// del partitions
215229
var vh row.VectorIndexUpdateHelper
216-
if n := len(u.run.tu.tableDesc().VectorIndexes()); n > 0 {
217-
vh.InitForPut(sourceVals[:n], sourceVals[n:n*2], u.run.tu.tableDesc())
218-
vh.InitForDel(sourceVals[n*2:n*3], u.run.tu.tableDesc())
230+
if n := len(r.tu.tableDesc().VectorIndexes()); n > 0 {
231+
vh.InitForPut(sourceVals[:n], sourceVals[n:n*2], r.tu.tableDesc())
232+
vh.InitForDel(sourceVals[n*2:n*3], r.tu.tableDesc())
219233
}
220234

221235
// Error out the update if the enforce_home_region session setting is on and
222236
// the row's locality doesn't match the gateway region.
223-
if err := u.run.regionLocalInfo.checkHomeRegion(updateValues); err != nil {
237+
if err := r.regionLocalInfo.checkHomeRegion(updateValues); err != nil {
224238
return err
225239
}
226240

227241
// Queue the insert in the KV batch.
228-
newValues, err := u.run.tu.rowForUpdate(
229-
params.ctx, oldValues, updateValues, pm, vh, false /* mustValidateOldPKValues */, u.run.traceKV,
242+
newValues, err := r.tu.rowForUpdate(
243+
params.ctx, oldValues, updateValues, pm, vh, false /* mustValidateOldPKValues */, r.traceKV,
230244
)
231245
if err != nil {
232246
return err
233247
}
234248

235249
// If result rows need to be accumulated, do it.
236-
if u.run.tu.rows != nil {
250+
if r.tu.rows != nil {
237251
// The new values can include all columns, so the values may contain
238252
// additional columns for every newly added column not yet visible. We do
239253
// not want them to be available for RETURNING.
240254
//
241255
// MakeUpdater guarantees that the first columns of the new values
242256
// are those specified u.columns.
243-
resultValues := make([]tree.Datum, len(u.columns))
244257
largestRetIdx := -1
245-
for i := range u.run.rowIdxToRetIdx {
246-
retIdx := u.run.rowIdxToRetIdx[i]
258+
for i := range r.rowIdxToRetIdx {
259+
retIdx := r.rowIdxToRetIdx[i]
247260
if retIdx >= 0 {
248261
if retIdx >= largestRetIdx {
249262
largestRetIdx = retIdx
250263
}
251-
resultValues[retIdx] = newValues[i]
264+
r.resultRowBuffer[retIdx] = newValues[i]
252265
}
253266
}
254267

255268
// At this point we've extracted all the RETURNING values that are part
256269
// of the target table. We must now extract the columns in the RETURNING
257270
// clause that refer to other tables (from the FROM clause of the update).
258-
for i := 0; i < u.run.numPassthrough; i++ {
271+
for i := 0; i < r.numPassthrough; i++ {
259272
largestRetIdx++
260-
resultValues[largestRetIdx] = passthroughValues[i]
273+
r.resultRowBuffer[largestRetIdx] = passthroughValues[i]
261274
}
262275

263-
if _, err := u.run.tu.rows.AddRow(params.ctx, resultValues); err != nil {
276+
if _, err := r.tu.rows.AddRow(params.ctx, r.resultRowBuffer); err != nil {
264277
return err
265278
}
266279
}

0 commit comments

Comments
 (0)