Skip to content

Commit a992c84

Browse files
author
James Cor
committed
some fixes
1 parent b19f8d7 commit a992c84

File tree

3 files changed

+67
-45
lines changed

3 files changed

+67
-45
lines changed

enginetest/engine_only_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,73 @@ b (2/6 partitions)
293293
require.ElementsMatch(expected, rows)
294294
}
295295

296+
// TODO: this was an analyzer test, but we don't have a mock process list for it to use, so it has to be here
297+
func TestTrackProcess(t *testing.T) {
298+
require := require.New(t)
299+
db := memory.NewDatabase("db")
300+
provider := memory.NewDBProvider(db)
301+
a := analyzer.NewDefault(provider)
302+
sess := memory.NewSession(sql.NewBaseSession(), provider)
303+
304+
node := plan.NewInnerJoin(
305+
plan.NewResolvedTable(&nonIndexableTable{memory.NewPartitionedTable(db.BaseDatabase, "foo", sql.PrimaryKeySchema{}, nil, 2)}, nil, nil),
306+
plan.NewResolvedTable(memory.NewPartitionedTable(db.BaseDatabase, "bar", sql.PrimaryKeySchema{}, nil, 4), nil, nil),
307+
expression.NewLiteral(int64(1), types.Int64),
308+
)
309+
310+
pl := sqle.NewProcessList()
311+
312+
ctx := sql.NewContext(context.Background(), sql.WithPid(1), sql.WithProcessList(pl), sql.WithSession(sess))
313+
pl.AddConnection(ctx.Session.ID(), "localhost")
314+
pl.ConnectionReady(ctx.Session)
315+
ctx, err := ctx.ProcessList.BeginQuery(ctx, "SELECT foo")
316+
require.NoError(err)
317+
318+
rule := getRuleFrom(analyzer.OnceAfterAll, analyzer.TrackProcessId)
319+
result, _, err := rule.Apply(ctx, a, node, nil, analyzer.DefaultRuleSelector, nil)
320+
require.NoError(err)
321+
322+
processes := ctx.ProcessList.Processes()
323+
require.Len(processes, 1)
324+
require.Equal("SELECT foo", processes[0].Query)
325+
require.Equal(
326+
map[string]sql.TableProgress{
327+
"foo": {
328+
Progress: sql.Progress{Name: "foo", Done: 0, Total: 2},
329+
PartitionsProgress: map[string]sql.PartitionProgress{},
330+
},
331+
"bar": {
332+
Progress: sql.Progress{Name: "bar", Done: 0, Total: 4},
333+
PartitionsProgress: map[string]sql.PartitionProgress{},
334+
},
335+
},
336+
processes[0].Progress)
337+
338+
join, ok := result.(*plan.JoinNode)
339+
require.True(ok)
340+
require.Equal(join.JoinType(), plan.JoinTypeInner)
341+
342+
lhs, ok := join.Left().(*plan.ResolvedTable)
343+
require.True(ok)
344+
_, ok = lhs.Table.(*plan.ProcessTable)
345+
require.True(ok)
346+
347+
rhs, ok := join.Right().(*plan.ResolvedTable)
348+
require.True(ok)
349+
_, ok = rhs.Table.(*plan.ProcessTable)
350+
require.True(ok)
351+
352+
iter, err := rowexec.DefaultBuilder.Build(ctx, result, nil)
353+
require.NoError(err)
354+
_, err = sql.RowIterToRows(ctx, iter)
355+
require.NoError(err)
356+
357+
procs := ctx.ProcessList.Processes()
358+
require.Len(procs, 1)
359+
require.Equal(sql.ProcessCommandSleep, procs[0].Command)
360+
require.Error(ctx.Err())
361+
}
362+
296363
func TestConcurrentProcessList(t *testing.T) {
297364
enginetest.TestConcurrentProcessList(t, enginetest.NewDefaultMemoryHarness())
298365
}

enginetest/evaluation.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1080,8 +1080,6 @@ func assertSchemasEqualWithDefaults(t *testing.T, expected, actual sql.Schema) b
10801080

10811081
func ExtractQueryNode(node sql.Node) sql.Node {
10821082
switch node := node.(type) {
1083-
case *plan.QueryProcess:
1084-
return ExtractQueryNode(node.Child())
10851083
case *plan.Releaser:
10861084
return ExtractQueryNode(node.Child)
10871085
default:

sql/rowexec/process_test.go

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -26,49 +26,6 @@ import (
2626
"github.com/dolthub/go-mysql-server/sql/types"
2727
)
2828

29-
func TestQueryProcess(t *testing.T) {
30-
require := require.New(t)
31-
32-
db := memory.NewDatabase("test")
33-
pro := memory.NewDBProvider(db)
34-
ctx := newContext(pro)
35-
36-
table := memory.NewTable(db.BaseDatabase, "foo", sql.NewPrimaryKeySchema(sql.Schema{
37-
{Name: "a", Type: types.Int64},
38-
}), nil)
39-
40-
table.Insert(ctx, sql.NewRow(int64(1)))
41-
table.Insert(ctx, sql.NewRow(int64(2)))
42-
43-
var notifications int
44-
45-
node := plan.NewQueryProcess(
46-
plan.NewProject(
47-
[]sql.Expression{
48-
expression.NewGetField(0, types.Int64, "a", false),
49-
},
50-
plan.NewResolvedTable(table, nil, nil),
51-
),
52-
func() {
53-
notifications++
54-
},
55-
)
56-
57-
iter, err := DefaultBuilder.Build(ctx, node, nil)
58-
require.NoError(err)
59-
60-
rows, err := sql.RowIterToRows(ctx, iter)
61-
require.NoError(err)
62-
63-
expected := []sql.Row{
64-
{int64(1)},
65-
{int64(2)},
66-
}
67-
68-
require.ElementsMatch(expected, rows)
69-
require.Equal(1, notifications)
70-
}
71-
7229
func TestProcessTable(t *testing.T) {
7330
require := require.New(t)
7431

0 commit comments

Comments
 (0)