Skip to content

Commit 8725719

Browse files
authored
[no-release-notes] Rowexec refactor (#1709)
* refactor rowexec starter * core refactor, need to refactor tests * more debugging * gets gms compiling * fmt * more compiling bugs * GMS tests passing * [ga-format-pr] Run ./format_repo.sh to fix formatting * refactors for dolt * edits * [ga-format-pr] Run ./format_repo.sh to fix formatting * docstrings and interfaces * fix block schema bug * merge conflict --------- Co-authored-by: max-hoffman <[email protected]>
1 parent 77ef424 commit 8725719

File tree

193 files changed

+12296
-11336
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

193 files changed

+12296
-11336
lines changed

engine.go

Lines changed: 1 addition & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package sqle
1616

1717
import (
1818
"fmt"
19-
"os"
2019
"sync"
2120

2221
"github.com/pkg/errors"
@@ -224,7 +223,6 @@ func (e *Engine) QueryNodeWithBindings(
224223
var (
225224
analyzed sql.Node
226225
iter sql.RowIter
227-
iter2 sql.RowIter2
228226
err error
229227
)
230228

@@ -269,17 +267,7 @@ func (e *Engine) QueryNodeWithBindings(
269267
return nil, nil, err
270268
}
271269

272-
useIter2 := false
273-
if enableRowIter2 {
274-
useIter2 = allNode2(analyzed)
275-
}
276-
277-
if useIter2 {
278-
iter2, err = analyzed.(sql.Node2).RowIter2(ctx, nil)
279-
iter = iter2
280-
} else {
281-
iter, err = analyzed.RowIter(ctx, nil)
282-
}
270+
iter, err = e.Analyzer.ExecBuilder.Build(ctx, analyzed, nil)
283271
if err != nil {
284272
err2 := clearAutocommitTransaction(ctx)
285273
if err2 != nil {
@@ -289,14 +277,6 @@ func (e *Engine) QueryNodeWithBindings(
289277
return nil, nil, err
290278
}
291279

292-
if useIter2 {
293-
iter = rowFormatSelectorIter{
294-
iter: iter,
295-
iter2: iter2,
296-
isNode2: useIter2,
297-
}
298-
}
299-
300280
return analyzed.Schema(), iter, nil
301281
}
302282

@@ -480,95 +460,6 @@ func (e *Engine) analyzePreparedQuery(ctx *sql.Context, query string, analyzed s
480460
return analyzed, nil
481461
}
482462

483-
// allNode2 returns whether all the nodes in the tree implement Node2.
484-
func allNode2(n sql.Node) bool {
485-
allNode2 := true
486-
transform.Inspect(n, func(n sql.Node) bool {
487-
switch n := n.(type) {
488-
case *plan.ResolvedTable:
489-
table := n.Table
490-
if tw, ok := table.(sql.TableWrapper); ok {
491-
table = tw.Underlying()
492-
}
493-
if _, ok := table.(sql.Table2); !ok {
494-
allNode2 = false
495-
return false
496-
}
497-
}
498-
if _, ok := n.(sql.Node2); n != nil && !ok {
499-
allNode2 = false
500-
return false
501-
}
502-
return true
503-
})
504-
if !allNode2 {
505-
return allNode2
506-
}
507-
508-
// All expressions in the tree must likewise be Expression2, and all types Type2, or we can't use rowFrame iteration
509-
// TODO: likely that some nodes rely on expressions but don't implement sql.Expressioner, or implement it incompletely
510-
transform.InspectExpressions(n, func(e sql.Expression) bool {
511-
if e == nil {
512-
return false
513-
}
514-
if _, ok := e.(sql.Expression2); !ok {
515-
allNode2 = false
516-
return false
517-
}
518-
if _, ok := e.Type().(sql.Type2); !ok {
519-
allNode2 = false
520-
return false
521-
}
522-
return true
523-
})
524-
525-
return allNode2
526-
}
527-
528-
// rowFormatSelectorIter is a wrapping row iter that implements RowIterTypeSelector so that clients consuming rows from it
529-
// know whether it's safe to iterate as RowIter or RowIter2.
530-
type rowFormatSelectorIter struct {
531-
iter sql.RowIter
532-
iter2 sql.RowIter2
533-
isNode2 bool
534-
}
535-
536-
var _ sql.RowIterTypeSelector = rowFormatSelectorIter{}
537-
var _ sql.RowIter = rowFormatSelectorIter{}
538-
var _ sql.RowIter2 = rowFormatSelectorIter{}
539-
540-
func (t rowFormatSelectorIter) Next(context *sql.Context) (sql.Row, error) {
541-
return t.iter.Next(context)
542-
}
543-
544-
func (t rowFormatSelectorIter) Close(context *sql.Context) error {
545-
if t.iter2 != nil {
546-
return t.iter2.Close(context)
547-
}
548-
return t.iter.Close(context)
549-
}
550-
551-
func (t rowFormatSelectorIter) Next2(ctx *sql.Context, frame *sql.RowFrame) error {
552-
return t.iter2.Next2(ctx, frame)
553-
}
554-
555-
func (t rowFormatSelectorIter) IsNode2() bool {
556-
return t.isNode2
557-
}
558-
559-
const (
560-
enableIter2EnvVar = "ENABLE_ROW_ITER_2"
561-
)
562-
563-
var enableRowIter2 bool
564-
565-
func init() {
566-
_, ok := os.LookupEnv(enableIter2EnvVar)
567-
if ok {
568-
enableRowIter2 = true
569-
}
570-
}
571-
572463
func (e *Engine) beginTransaction(ctx *sql.Context, transactionDatabase string) error {
573464
beginNewTransaction := ctx.GetTransaction() == nil || plan.ReadCommitted(ctx)
574465
if beginNewTransaction {

enginetest/engine_only_test.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"github.com/dolthub/go-mysql-server/sql/expression/function"
4343
"github.com/dolthub/go-mysql-server/sql/parse"
4444
"github.com/dolthub/go-mysql-server/sql/plan"
45+
"github.com/dolthub/go-mysql-server/sql/rowexec"
4546
"github.com/dolthub/go-mysql-server/sql/types"
4647
)
4748

@@ -245,7 +246,7 @@ func TestShowProcessList(t *testing.T) {
245246

246247
n := plan.NewShowProcessList()
247248

248-
iter, err := n.RowIter(ctx, nil)
249+
iter, err := rowexec.DefaultBuilder.Build(ctx, n, nil)
249250
require.NoError(err)
250251
rows, err := sql.RowIterToRows(ctx, n.Schema(), iter)
251252
require.NoError(err)
@@ -320,7 +321,7 @@ func TestTrackProcess(t *testing.T) {
320321
_, ok = rhs.Table.(*plan.ProcessIndexableTable)
321322
require.True(ok)
322323

323-
iter, err := proc.RowIter(ctx, nil)
324+
iter, err := rowexec.DefaultBuilder.Build(ctx, proc, nil)
324325
require.NoError(err)
325326
_, err = sql.RowIterToRows(ctx, nil, iter)
326327
require.NoError(err)
@@ -357,7 +358,8 @@ func TestLockTables(t *testing.T) {
357358
})
358359
node.Catalog = analyzer.NewCatalog(sql.NewDatabaseProvider())
359360

360-
_, err := node.RowIter(sql.NewEmptyContext(), nil)
361+
_, err := rowexec.DefaultBuilder.Build(sql.NewEmptyContext(), node, nil)
362+
361363
require.NoError(err)
362364

363365
require.Equal(1, t1.writeLocks)
@@ -685,6 +687,8 @@ func TestTableFunctions(t *testing.T) {
685687
testDatabaseProvider := NewTestProvider(&databaseProvider, SimpleTableFunction{}, memory.IntSequenceTable{})
686688

687689
engine := enginetest.NewEngineWithProvider(t, harness, testDatabaseProvider)
690+
engine.Analyzer.ExecBuilder = rowexec.DefaultBuilder
691+
688692
engine, err := enginetest.RunSetupScripts(harness.NewContext(), engine, setup.MydbData, true)
689693
require.NoError(t, err)
690694

@@ -868,6 +872,7 @@ func TestCollationCoercion(t *testing.T) {
868872

869873
var _ sql.TableFunction = (*SimpleTableFunction)(nil)
870874
var _ sql.CollationCoercible = (*SimpleTableFunction)(nil)
875+
var _ sql.ExecSourceRel = (*SimpleTableFunction)(nil)
871876

872877
// SimpleTableFunction an extremely simple implementation of TableFunction for testing.
873878
// When evaluated, returns a single row: {"foo", 123}
@@ -879,6 +884,14 @@ func (s SimpleTableFunction) NewInstance(_ *sql.Context, _ sql.Database, _ []sql
879884
return SimpleTableFunction{}, nil
880885
}
881886

887+
func (s SimpleTableFunction) RowIter(ctx *sql.Context, r sql.Row) (sql.RowIter, error) {
888+
if s.returnedResults == true {
889+
return nil, io.EOF
890+
}
891+
s.returnedResults = true
892+
return &SimpleTableFunctionRowIter{}, nil
893+
}
894+
882895
func (s SimpleTableFunction) Resolved() bool {
883896
return true
884897
}
@@ -906,16 +919,6 @@ func (s SimpleTableFunction) Children() []sql.Node {
906919
return []sql.Node{}
907920
}
908921

909-
func (s SimpleTableFunction) RowIter(_ *sql.Context, _ sql.Row) (sql.RowIter, error) {
910-
if s.returnedResults == true {
911-
return nil, io.EOF
912-
}
913-
914-
s.returnedResults = true
915-
rowIter := &SimpleTableFunctionRowIter{}
916-
return rowIter, nil
917-
}
918-
919922
func (s SimpleTableFunction) WithChildren(_ ...sql.Node) (sql.Node, error) {
920923
return s, nil
921924
}

enginetest/evaluation.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"github.com/dolthub/go-mysql-server/enginetest/queries"
3030
"github.com/dolthub/go-mysql-server/enginetest/scriptgen/setup"
3131
"github.com/dolthub/go-mysql-server/sql"
32-
"github.com/dolthub/go-mysql-server/sql/analyzer"
3332
"github.com/dolthub/go-mysql-server/sql/expression"
3433
"github.com/dolthub/go-mysql-server/sql/parse"
3534
"github.com/dolthub/go-mysql-server/sql/plan"
@@ -766,7 +765,7 @@ func ExtractQueryNode(node sql.Node) sql.Node {
766765
switch node := node.(type) {
767766
case *plan.QueryProcess:
768767
return ExtractQueryNode(node.Child())
769-
case *analyzer.Releaser:
768+
case *plan.Releaser:
770769
return ExtractQueryNode(node.Child)
771770
default:
772771
return node

memory/sequence_table.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ var _ sql.CollationCoercible = (*IntSequenceTable)(nil)
1616
// of integers.
1717
type IntSequenceTable struct {
1818
name string
19-
len int
19+
Len int
2020
}
2121

2222
func (s IntSequenceTable) NewInstance(_ *sql.Context, _ sql.Database, args []sql.Expression) (sql.Node, error) {
@@ -39,23 +39,23 @@ func (s IntSequenceTable) NewInstance(_ *sql.Context, _ sql.Database, args []sql
3939
if !ok {
4040
return nil, fmt.Errorf("%w; sequence table expects 2nd argument to be a sequence length integer", err)
4141
}
42-
return IntSequenceTable{name: name, len: int(length.(int64))}, nil
42+
return IntSequenceTable{name: name, Len: int(length.(int64))}, nil
4343
}
4444

4545
func (s IntSequenceTable) Resolved() bool {
4646
return true
4747
}
4848

4949
func (s IntSequenceTable) String() string {
50-
return fmt.Sprintf("sequence(%s, %d)", s.name, s.len)
50+
return fmt.Sprintf("sequence(%s, %d)", s.name, s.Len)
5151
}
5252

5353
func (s IntSequenceTable) DebugString() string {
5454
pr := sql.NewTreePrinter()
5555
_ = pr.WriteNode("sequence")
5656
children := []string{
5757
fmt.Sprintf("name: %s", s.name),
58-
fmt.Sprintf("len: %d", s.len),
58+
fmt.Sprintf("len: %d", s.Len),
5959
}
6060
_ = pr.WriteChildren(children...)
6161
return pr.String()
@@ -77,7 +77,7 @@ func (s IntSequenceTable) Children() []sql.Node {
7777
}
7878

7979
func (s IntSequenceTable) RowIter(_ *sql.Context, _ sql.Row) (sql.RowIter, error) {
80-
rowIter := &SequenceTableFnRowIter{i: 0, n: s.len}
80+
rowIter := &SequenceTableFnRowIter{i: 0, n: s.Len}
8181
return rowIter, nil
8282
}
8383

@@ -125,6 +125,10 @@ type SequenceTableFnRowIter struct {
125125
i int
126126
}
127127

128+
func NewSequenceTableFnRowIter(n int) *SequenceTableFnRowIter {
129+
return &SequenceTableFnRowIter{i: 0, n: n}
130+
}
131+
128132
func (i *SequenceTableFnRowIter) Next(_ *sql.Context) (sql.Row, error) {
129133
if i.i >= i.n {
130134
return nil, io.EOF

memory/table.go

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ type Table struct {
7171
}
7272

7373
var _ sql.Table = (*Table)(nil)
74-
var _ sql.Table2 = (*Table)(nil)
7574
var _ sql.InsertableTable = (*Table)(nil)
7675
var _ sql.UpdatableTable = (*Table)(nil)
7776
var _ sql.DeletableTable = (*Table)(nil)
@@ -394,7 +393,6 @@ type tableIter struct {
394393
}
395394

396395
var _ sql.RowIter = (*tableIter)(nil)
397-
var _ sql.RowIter2 = (*tableIter)(nil)
398396

399397
func (i *tableIter) Next(ctx *sql.Context) (sql.Row, error) {
400398
row, err := i.getRow(ctx)
@@ -424,23 +422,6 @@ func (i *tableIter) Next(ctx *sql.Context) (sql.Row, error) {
424422
return row, nil
425423
}
426424

427-
func (i *tableIter) Next2(ctx *sql.Context, frame *sql.RowFrame) error {
428-
r, err := i.Next(ctx)
429-
if err != nil {
430-
return err
431-
}
432-
433-
for _, v := range r {
434-
x, err := sql.ConvertToValue(v)
435-
if err != nil {
436-
return err
437-
}
438-
frame.Append(x)
439-
}
440-
441-
return nil
442-
}
443-
444425
func (i *tableIter) colIsProjected(idx int) bool {
445426
for _, colIdx := range i.columns {
446427
if idx == colIdx {
@@ -508,7 +489,6 @@ type spatialTableIter struct {
508489
}
509490

510491
var _ sql.RowIter = (*spatialTableIter)(nil)
511-
var _ sql.RowIter2 = (*spatialTableIter)(nil)
512492

513493
func (i *spatialTableIter) Next(ctx *sql.Context) (sql.Row, error) {
514494
row, err := i.getRow(ctx)
@@ -548,23 +528,6 @@ func (i *spatialTableIter) Next(ctx *sql.Context) (sql.Row, error) {
548528
return resultRow, nil
549529
}
550530

551-
func (i *spatialTableIter) Next2(ctx *sql.Context, frame *sql.RowFrame) error {
552-
r, err := i.Next(ctx)
553-
if err != nil {
554-
return err
555-
}
556-
557-
for _, v := range r {
558-
x, err := sql.ConvertToValue(v)
559-
if err != nil {
560-
return err
561-
}
562-
frame.Append(x)
563-
}
564-
565-
return nil
566-
}
567-
568531
func (i *spatialTableIter) Close(ctx *sql.Context) error {
569532
return nil
570533
}
@@ -1855,15 +1818,6 @@ func (i *indexKeyValueIter) Close(ctx *sql.Context) error {
18551818
return i.iter.Close(ctx)
18561819
}
18571820

1858-
func (t *Table) PartitionRows2(ctx *sql.Context, partition sql.Partition) (sql.RowIter2, error) {
1859-
iter, err := t.PartitionRows(ctx, partition)
1860-
if err != nil {
1861-
return nil, err
1862-
}
1863-
1864-
return iter.(*tableIter), nil
1865-
}
1866-
18671821
func (t *Table) verifyRowTypes(row sql.Row) {
18681822
//TODO: only run this when in testing mode
18691823
if len(row) == len(t.schema.Schema) {

0 commit comments

Comments
 (0)