Skip to content

Commit b19f8d7

Browse files
author
James Cor
committed
remove query process
1 parent baa759c commit b19f8d7

16 files changed

+51
-318
lines changed

engine.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ func (e *Engine) QueryWithBindings(ctx *sql.Context, query string, parsed sqlpar
447447
return nil, nil, nil, err
448448
}
449449

450-
iter = finalizeIters(analyzed, qFlags, iter)
450+
iter = finalizeIters(ctx, analyzed, qFlags, iter)
451451

452452
return analyzed.Schema(), iter, qFlags, nil
453453
}
@@ -482,7 +482,7 @@ func (e *Engine) PrepQueryPlanForExecution(ctx *sql.Context, _ string, plan sql.
482482
return nil, nil, nil, err
483483
}
484484

485-
iter = finalizeIters(plan, nil, iter)
485+
iter = finalizeIters(ctx, plan, nil, iter)
486486

487487
return plan.Schema(), iter, nil, nil
488488
}
@@ -830,7 +830,7 @@ func (e *Engine) executeEvent(ctx *sql.Context, dbName, createEventStatement, us
830830
return err
831831
}
832832

833-
iter = finalizeIters(definitionNode, nil, iter)
833+
iter = finalizeIters(ctx, definitionNode, nil, iter)
834834

835835
// Drain the iterate to execute the event body/definition
836836
// NOTE: No row data is returned for an event; we just need to execute the statements
@@ -865,8 +865,9 @@ func findCreateEventNode(planTree sql.Node) (*plan.CreateEvent, error) {
865865
}
866866

867867
// finalizeIters applies the final transformations on sql.RowIter before execution.
868-
func finalizeIters(analyzed sql.Node, qFlags *sql.QueryFlags, iter sql.RowIter) sql.RowIter {
869-
iter = rowexec.AddTransactionCommittingIter(iter, qFlags)
868+
func finalizeIters(ctx *sql.Context, analyzed sql.Node, qFlags *sql.QueryFlags, iter sql.RowIter) sql.RowIter {
869+
iter = plan.AddTrackedRowIter(ctx, analyzed, iter)
870+
iter = rowexec.AddTransactionCommittingIter(qFlags, iter)
870871
iter = rowexec.AddExpressionCloser(analyzed, iter)
871872
return iter
872873
}

enginetest/engine_only_test.go

Lines changed: 0 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -293,74 +293,6 @@ b (2/6 partitions)
293293
require.ElementsMatch(expected, rows)
294294
}
295295

296-
// TODO: this was an analyzer test, but we don't have a mock process list for it to use, so it has to be here
297-
func TestTrackProcess(t *testing.T) {
298-
require := require.New(t)
299-
db := memory.NewDatabase("db")
300-
provider := memory.NewDBProvider(db)
301-
a := analyzer.NewDefault(provider)
302-
sess := memory.NewSession(sql.NewBaseSession(), provider)
303-
304-
node := plan.NewInnerJoin(
305-
plan.NewResolvedTable(&nonIndexableTable{memory.NewPartitionedTable(db.BaseDatabase, "foo", sql.PrimaryKeySchema{}, nil, 2)}, nil, nil),
306-
plan.NewResolvedTable(memory.NewPartitionedTable(db.BaseDatabase, "bar", sql.PrimaryKeySchema{}, nil, 4), nil, nil),
307-
expression.NewLiteral(int64(1), types.Int64),
308-
)
309-
310-
pl := sqle.NewProcessList()
311-
312-
ctx := sql.NewContext(context.Background(), sql.WithPid(1), sql.WithProcessList(pl), sql.WithSession(sess))
313-
pl.AddConnection(ctx.Session.ID(), "localhost")
314-
pl.ConnectionReady(ctx.Session)
315-
ctx, err := ctx.ProcessList.BeginQuery(ctx, "SELECT foo")
316-
require.NoError(err)
317-
318-
rule := getRuleFrom(analyzer.OnceAfterAll, analyzer.TrackProcessId)
319-
result, _, err := rule.Apply(ctx, a, node, nil, analyzer.DefaultRuleSelector, nil)
320-
require.NoError(err)
321-
322-
processes := ctx.ProcessList.Processes()
323-
require.Len(processes, 1)
324-
require.Equal("SELECT foo", processes[0].Query)
325-
require.Equal(
326-
map[string]sql.TableProgress{
327-
"foo": sql.TableProgress{
328-
Progress: sql.Progress{Name: "foo", Done: 0, Total: 2},
329-
PartitionsProgress: map[string]sql.PartitionProgress{}},
330-
"bar": sql.TableProgress{
331-
Progress: sql.Progress{Name: "bar", Done: 0, Total: 4},
332-
PartitionsProgress: map[string]sql.PartitionProgress{}},
333-
},
334-
processes[0].Progress)
335-
336-
proc, ok := result.(*plan.QueryProcess)
337-
require.True(ok)
338-
339-
join, ok := proc.Child().(*plan.JoinNode)
340-
require.True(ok)
341-
require.Equal(join.JoinType(), plan.JoinTypeInner)
342-
343-
lhs, ok := join.Left().(*plan.ResolvedTable)
344-
require.True(ok)
345-
_, ok = lhs.Table.(*plan.ProcessTable)
346-
require.True(ok)
347-
348-
rhs, ok := join.Right().(*plan.ResolvedTable)
349-
require.True(ok)
350-
_, ok = rhs.Table.(*plan.ProcessTable)
351-
require.True(ok)
352-
353-
iter, err := rowexec.DefaultBuilder.Build(ctx, proc, nil)
354-
require.NoError(err)
355-
_, err = sql.RowIterToRows(ctx, iter)
356-
require.NoError(err)
357-
358-
procs := ctx.ProcessList.Processes()
359-
require.Len(procs, 1)
360-
require.Equal(procs[0].Command, sql.ProcessCommandSleep)
361-
require.Error(ctx.Err())
362-
}
363-
364296
func TestConcurrentProcessList(t *testing.T) {
365297
enginetest.TestConcurrentProcessList(t, enginetest.NewDefaultMemoryHarness())
366298
}
@@ -515,7 +447,6 @@ func TestAnalyzer_Exp(t *testing.T) {
515447
require.NoError(t, err)
516448

517449
analyzed, err := e.EngineAnalyzer().Analyze(ctx, parsed, nil, nil)
518-
analyzed = analyzer.StripPassthroughNodes(analyzed)
519450
if tt.err != nil {
520451
require.Error(t, err)
521452
assert.True(t, tt.err.Is(err))
@@ -527,10 +458,6 @@ func TestAnalyzer_Exp(t *testing.T) {
527458
}
528459

529460
func assertNodesEqualWithDiff(t *testing.T, expected, actual sql.Node) {
530-
if x, ok := actual.(*plan.QueryProcess); ok {
531-
actual = x.Child()
532-
}
533-
534461
if !assert.Equal(t, expected, actual) {
535462
expectedStr := sql.DebugString(expected)
536463
actualStr := sql.DebugString(actual)

sql/analyzer/describe.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,5 @@ func resolveDescribeQuery(ctx *sql.Context, a *Analyzer, n sql.Node, scope *plan
3232
return nil, transform.SameTree, err
3333
}
3434

35-
return d.WithQuery(StripPassthroughNodes(q)), transform.NewTree, nil
35+
return d.WithQuery(q), transform.NewTree, nil
3636
}

sql/analyzer/inserts.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,6 @@ func resolveInsertRows(ctx *sql.Context, a *Analyzer, n sql.Node, scope *plan.Sc
6262
if err != nil {
6363
return nil, transform.SameTree, err
6464
}
65-
66-
source = StripPassthroughNodes(source)
6765
}
6866

6967
dstSchema := insertable.Schema()

sql/analyzer/parallelize.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,7 @@ func parallelize(ctx *sql.Context, a *Analyzer, node sql.Node, scope *plan.Scope
6363
return node, transform.SameTree, nil
6464
}
6565

66-
proc, ok := node.(*plan.QueryProcess)
67-
if (ok && !shouldParallelize(proc.Child(), nil)) || !shouldParallelize(node, scope) {
66+
if !shouldParallelize(node, scope) {
6867
return node, transform.SameTree, nil
6968
}
7069

sql/analyzer/process.go

Lines changed: 2 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,10 @@ func trackProcess(ctx *sql.Context, a *Analyzer, n sql.Node, scope *plan.Scope,
3939
if !n.Resolved() {
4040
return n, transform.SameTree, nil
4141
}
42-
43-
if _, ok := n.(*plan.QueryProcess); ok {
44-
return n, transform.SameTree, nil
45-
}
46-
4742
processList := ctx.ProcessList
4843

4944
var seen = make(map[string]struct{})
50-
n, _, err := transform.Node(n, func(n sql.Node) (sql.Node, transform.TreeIdentity, error) {
45+
n, same, err := transform.Node(n, func(n sql.Node) (sql.Node, transform.TreeIdentity, error) {
5146
switch n := n.(type) {
5247
case *plan.ResolvedTable:
5348
switch n.Table.(type) {
@@ -106,41 +101,9 @@ func trackProcess(ctx *sql.Context, a *Analyzer, n sql.Node, scope *plan.Scope,
106101
return n, transform.SameTree, nil
107102
}
108103
})
109-
if err != nil {
110-
return nil, transform.SameTree, err
111-
}
112-
113-
// Don't wrap CreateIndex in a QueryProcess, as it is a CreateIndexProcess.
114-
// CreateIndex will take care of marking the process as done on its own.
115-
if _, ok := n.(*plan.CreateIndex); ok {
116-
return n, transform.SameTree, nil
117-
}
118104

119-
// Remove QueryProcess nodes from the subqueries and trigger bodies. Otherwise, the process
120-
// will be marked as done as soon as a subquery / trigger finishes.
121-
node, _, err := transform.Node(n, func(n sql.Node) (sql.Node, transform.TreeIdentity, error) {
122-
if sq, ok := n.(*plan.SubqueryAlias); ok {
123-
if qp, ok := sq.Child.(*plan.QueryProcess); ok {
124-
n, err := sq.WithChildren(qp.Child())
125-
return n, transform.NewTree, err
126-
}
127-
}
128-
if t, ok := n.(*plan.TriggerExecutor); ok {
129-
if qp, ok := t.Right().(*plan.QueryProcess); ok {
130-
n, err := t.WithChildren(t.Left(), qp.Child())
131-
return n, transform.NewTree, err
132-
}
133-
}
134-
return n, transform.SameTree, nil
135-
})
136105
if err != nil {
137106
return nil, transform.SameTree, err
138107
}
139-
140-
return plan.NewQueryProcess(node, func() {
141-
processList.EndQuery(ctx)
142-
if span := ctx.RootSpan(); span != nil {
143-
span.End()
144-
}
145-
}), transform.NewTree, nil
108+
return n, same, nil
146109
}

sql/analyzer/process_test.go

Lines changed: 0 additions & 70 deletions
This file was deleted.

sql/analyzer/resolve_create_select.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func resolveCreateSelect(ctx *sql.Context, a *Analyzer, n sql.Node, scope *plan.
5555
return nil, transform.SameTree, err
5656
}
5757

58-
return plan.NewTableCopier(ct.Database(), StripPassthroughNodes(analyzedCreate), StripPassthroughNodes(analyzedSelect), plan.CopierProps{}), transform.NewTree, nil
58+
return plan.NewTableCopier(ct.Database(), analyzedCreate, analyzedSelect, plan.CopierProps{}), transform.NewTree, nil
5959
}
6060

6161
// stripSchema removes all non-type information from a schema, such as the key info, default value, etc.

sql/analyzer/resolve_subqueries.go

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ func analyzeSubqueryExpression(ctx *sql.Context, a *Analyzer, n sql.Node, sq *pl
252252
// to the expense of positive errors, where a rule reports a change when the plan
253253
// is the same before/after.
254254
// .Resolved() might be useful for fixing these bugs.
255-
return sq.WithQuery(StripPassthroughNodes(analyzed)).WithExecBuilder(a.ExecBuilder), transform.NewTree, nil
255+
return sq.WithQuery(analyzed).WithExecBuilder(a.ExecBuilder), transform.NewTree, nil
256256
}
257257

258258
// analyzeSubqueryAlias runs analysis on the specified subquery alias, |sqa|. The |finalize| parameter indicates if this is
@@ -282,28 +282,10 @@ func analyzeSubqueryAlias(ctx *sql.Context, a *Analyzer, sqa *plan.SubqueryAlias
282282
if same {
283283
return sqa, transform.SameTree, nil
284284
}
285-
newn, err := sqa.WithChildren(StripPassthroughNodes(child))
285+
newn, err := sqa.WithChildren(child)
286286
return newn, transform.NewTree, err
287287
}
288288

289-
// StripPassthroughNodes strips all top-level passthrough nodes meant to apply only to top-level queries (query
290-
// tracking, transaction logic, etc) from the node tree given and return the first non-passthrough child element. This
291-
// is useful for when we invoke the analyzer recursively when e.g. analyzing subqueries or triggers
292-
// TODO: instead of stripping this node off after analysis, it would be better to just not add it in the first place.
293-
func StripPassthroughNodes(n sql.Node) sql.Node {
294-
nodeIsPassthrough := true
295-
for nodeIsPassthrough {
296-
switch tn := n.(type) {
297-
case *plan.QueryProcess:
298-
n = tn.Child()
299-
default:
300-
nodeIsPassthrough = false
301-
}
302-
}
303-
304-
return n
305-
}
306-
307289
// cacheSubqueryAlisesInJoins will look for joins against subquery aliases that
308290
// will repeatedly execute the subquery, and will insert a *plan.CachedResults
309291
// node on top of those nodes. The left-most child of a join root is an exception

sql/analyzer/resolve_unions.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func resolveUnions(ctx *sql.Context, a *Analyzer, n sql.Node, scope *plan.Scope,
5151
return nil, transform.SameTree, err
5252
}
5353

54-
ret, err := n.WithChildren(StripPassthroughNodes(left), StripPassthroughNodes(right))
54+
ret, err := n.WithChildren(left, right)
5555
if err != nil {
5656
return nil, transform.SameTree, err
5757
}
@@ -95,7 +95,7 @@ func finalizeUnions(ctx *sql.Context, a *Analyzer, n sql.Node, scope *plan.Scope
9595

9696
scope.SetJoin(false)
9797

98-
newn, err := n.WithChildren(StripPassthroughNodes(left), StripPassthroughNodes(right))
98+
newn, err := n.WithChildren(left, right)
9999
if err != nil {
100100
return nil, transform.SameTree, err
101101
}

0 commit comments

Comments
 (0)