From c3a31c1e6075164fdf2af14aba94cccac061d7ed Mon Sep 17 00:00:00 2001 From: James Cor Date: Wed, 30 Oct 2024 13:49:44 -0700 Subject: [PATCH 1/2] rewrite blockIter to iterate during exec time --- sql/expression/procedurereference.go | 14 +++ sql/rowexec/other.go | 144 ++++++++------------------- sql/rowexec/other_iters.go | 76 +++++++++++++- sql/rowexec/proc.go | 2 +- 4 files changed, 131 insertions(+), 105 deletions(-) diff --git a/sql/expression/procedurereference.go b/sql/expression/procedurereference.go index bed05f478b..6a4eb6bac5 100644 --- a/sql/expression/procedurereference.go +++ b/sql/expression/procedurereference.go @@ -470,3 +470,17 @@ func (c *HandlerCondition) Matches(err error) bool { return c.Type == HandlerConditionSqlException } } + +type HandlerIters struct { + Iter sql.RowIter + Cond HandlerCondition + Action DeclareHandlerAction +} + +func NewHandlerIters(iter sql.RowIter, cond HandlerCondition, action DeclareHandlerAction) *HandlerIters { + return &HandlerIters{ + Iter: iter, + Cond: cond, + Action: action, + } +} \ No newline at end of file diff --git a/sql/rowexec/other.go b/sql/rowexec/other.go index 0c9326024d..715a5ff115 100644 --- a/sql/rowexec/other.go +++ b/sql/rowexec/other.go @@ -146,119 +146,63 @@ func (b *BaseBuilder) buildCachedResults(ctx *sql.Context, n *plan.CachedResults } func (b *BaseBuilder) buildBlock(ctx *sql.Context, n *plan.Block, row sql.Row) (sql.RowIter, error) { - var returnRows []sql.Row var returnNode sql.Node - var returnSch sql.Schema + var repIterIdx = len(n.Children()) - 1 + var subIters = make([]sql.RowIter, len(n.Children())) + var subIterHandlers = make([][]*expression.HandlerIters, len(n.Children())) + seenSelect := false + for ci, child := range n.Children() { + // build and reorganize handlers for subIter + hRefs := n.Pref.InnermostScope.Handlers + hIters := make([]*expression.HandlerIters, len(hRefs)) + for i, hRef := range hRefs { + hRowIter, hErr := b.buildNodeExec(ctx, hRef.Stmt, nil) + if hErr != nil { + return nil, hErr + } + hIters[len(hRefs) - i - 1] = expression.NewHandlerIters(hRowIter, hRef.Cond, hRef.Action) + } + subIterHandlers[ci] = hIters - selectSeen := false - for _, s := range n.Children() { - // TODO: this should happen at iteration time, but this call is where the actual iteration happens - err := startTransaction(ctx) + // build subIter and save for later + subIter, err := b.buildNodeExec(ctx, child, row) if err != nil { return nil, err } + subIters[ci] = subIter - handleError := func(err error) error { - scope := n.Pref.InnermostScope - for i := len(scope.Handlers) - 1; i >= 0; i-- { - if !scope.Handlers[i].Cond.Matches(err) { - continue - } - - handlerRefVal := scope.Handlers[i] - - handlerRowIter, err := b.buildNodeExec(ctx, handlerRefVal.Stmt, nil) - if err != nil { - return err - } - defer handlerRowIter.Close(ctx) - - for { - _, err := handlerRowIter.Next(ctx) - if err == io.EOF { - break - } else if err != nil { - return err - } - } - switch scope.Handlers[i].Action { - case expression.DeclareHandlerAction_Exit: - return exitBlockError - case expression.DeclareHandlerAction_Continue: - return nil - case expression.DeclareHandlerAction_Undo: - return fmt.Errorf("DECLARE UNDO HANDLER is not supported") - } - } - return err + // the representing node is the last select node in the block + // if there is no select node, the representing node is the last node in the block + subIterNode := child + if blockSubIter, ok := subIter.(plan.BlockRowIter); ok { + subIterNode = blockSubIter.RepresentingNode() } - - err = func() error { - rowCache, disposeFunc := ctx.Memory.NewRowsCache() - defer disposeFunc() - - var isSelect bool - subIter, err := b.buildNodeExec(ctx, s, row) - if err != nil { - newErr := handleError(err) - if newErr != nil { - return newErr - } - - return nil - } - subIterNode := s - subIterSch := s.Schema() - if blockSubIter, ok := subIter.(plan.BlockRowIter); ok { - subIterNode = blockSubIter.RepresentingNode() - subIterSch = blockSubIter.Schema() - } - if isSelect = plan.NodeRepresentsSelect(subIterNode); isSelect { - selectSeen = true - returnNode = subIterNode - returnSch = subIterSch - } else if !selectSeen { - returnNode = subIterNode - returnSch = subIterSch - } - - for { - newRow, err := subIter.Next(ctx) - if err == io.EOF { - err := subIter.Close(ctx) - if err != nil { - return err - } - if isSelect || !selectSeen { - returnRows = rowCache.Get() - } - break - } else if err != nil { - newErr := handleError(err) - if newErr != nil { - return newErr - } - } - - if isSelect || !selectSeen { - err = rowCache.Add(newRow) - if err != nil { - return err - } - } - } - return nil - }() - if err != nil { - return nil, err + if plan.NodeRepresentsSelect(subIterNode) { + repIterIdx = ci + seenSelect = true + returnNode = subIterNode + continue + } + if !seenSelect { + returnNode = subIterNode } } + if returnNode == nil { + return nil, fmt.Errorf("block does not contain any statements") + } + + returnSch := returnNode.Schema() n.SetSchema(returnSch) return &blockIter{ - internalIter: sql.RowsToRowIter(returnRows...), + internalIter: nil, repNode: returnNode, - sch: returnSch, + repSch: returnSch, + + repIterIdx: repIterIdx, + subIters: subIters, + + subIterHandlers: subIterHandlers, }, nil } diff --git a/sql/rowexec/other_iters.go b/sql/rowexec/other_iters.go index afe1b2b53d..c24cfa99f8 100644 --- a/sql/rowexec/other_iters.go +++ b/sql/rowexec/other_iters.go @@ -15,10 +15,12 @@ package rowexec import ( - "io" + "fmt" +"io" "sync" "github.com/dolthub/go-mysql-server/sql" + "github.com/dolthub/go-mysql-server/sql/expression" "github.com/dolthub/go-mysql-server/sql/plan" ) @@ -115,19 +117,85 @@ func (itr *dropHistogramIter) Close(_ *sql.Context) error { type blockIter struct { internalIter sql.RowIter repNode sql.Node - sch sql.Schema + repSch sql.Schema + + repIterIdx int + + subIters []sql.RowIter + subIterHandlers [][]*expression.HandlerIters } var _ plan.BlockRowIter = (*blockIter)(nil) // Next implements the sql.RowIter interface. func (i *blockIter) Next(ctx *sql.Context) (sql.Row, error) { + // TODO: Stored Procedures are capable of returning multiple result sets. This is not implemented yet. + // Instead, we just return the last select result set or just the last OkResult, and silently discard the rest. + // On the first pass, we exhaust all subIters, and save the appropriate results to return. + if i.internalIter == nil { + // TODO: write helper methods maybe + var returnRows []sql.Row + for si, subIter := range i.subIters { + var err error + for row := sql.Row(nil); ; { + row, err = subIter.Next(ctx) + if err != nil { + break + } + if si == i.repIterIdx { + returnRows = append(returnRows, row) + } + } + if cErr := subIter.Close(ctx); cErr != nil { + return nil, cErr + } + if err == io.EOF { + continue + } + + handlers := i.subIterHandlers[si] + for _, handler := range handlers { + if !handler.Cond.Matches(err) { + continue + } + for { + _, err = handler.Iter.Next(ctx) + if err != nil { + break + } + } + if cErr := handler.Iter.Close(ctx); cErr != nil { + return nil, cErr + } + // unhandled error + if err != io.EOF { + return nil, err + } + switch handler.Action { + case expression.DeclareHandlerAction_Continue: + case expression.DeclareHandlerAction_Exit: + return nil, exitBlockError + case expression.DeclareHandlerAction_Undo: + return nil, fmt.Errorf("DECLARE UNDO HANDLER is not supported") + default: + return nil, fmt.Errorf("unknown handler action: %v", handler.Action) + } + break + } + } + i.internalIter = sql.RowsToRowIter(returnRows...) + } return i.internalIter.Next(ctx) } // Close implements the sql.RowIter interface. func (i *blockIter) Close(ctx *sql.Context) error { - return i.internalIter.Close(ctx) + for _, subIter := range i.subIters { + if err := subIter.Close(ctx); err != nil { + return err + } + } + return nil } // RepresentingNode implements the sql.BlockRowIter interface. @@ -137,7 +205,7 @@ func (i *blockIter) RepresentingNode() sql.Node { // Schema implements the sql.BlockRowIter interface. func (i *blockIter) Schema() sql.Schema { - return i.sch + return i.repSch } type prependRowIter struct { diff --git a/sql/rowexec/proc.go b/sql/rowexec/proc.go index c7e9d48807..6c87a0e035 100644 --- a/sql/rowexec/proc.go +++ b/sql/rowexec/proc.go @@ -336,7 +336,7 @@ func (b *BaseBuilder) buildLoop(ctx *sql.Context, n *plan.Loop, row sql.Row) (sq return &blockIter{ internalIter: sql.RowsToRowIter(returnRows...), repNode: returnNode, - sch: returnSch, + repSch: returnSch, }, nil } From d7afa51ec8403067c0f642d237d81b3f287f7c3d Mon Sep 17 00:00:00 2001 From: jycor Date: Wed, 30 Oct 2024 20:51:23 +0000 Subject: [PATCH 2/2] [ga-format-pr] Run ./format_repo.sh to fix formatting --- sql/expression/procedurereference.go | 2 +- sql/rowexec/other.go | 2 +- sql/rowexec/other_iters.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/expression/procedurereference.go b/sql/expression/procedurereference.go index 6a4eb6bac5..b68d528983 100644 --- a/sql/expression/procedurereference.go +++ b/sql/expression/procedurereference.go @@ -483,4 +483,4 @@ func NewHandlerIters(iter sql.RowIter, cond HandlerCondition, action DeclareHand Cond: cond, Action: action, } -} \ No newline at end of file +} diff --git a/sql/rowexec/other.go b/sql/rowexec/other.go index 715a5ff115..7b257694e7 100644 --- a/sql/rowexec/other.go +++ b/sql/rowexec/other.go @@ -160,7 +160,7 @@ func (b *BaseBuilder) buildBlock(ctx *sql.Context, n *plan.Block, row sql.Row) ( if hErr != nil { return nil, hErr } - hIters[len(hRefs) - i - 1] = expression.NewHandlerIters(hRowIter, hRef.Cond, hRef.Action) + hIters[len(hRefs)-i-1] = expression.NewHandlerIters(hRowIter, hRef.Cond, hRef.Action) } subIterHandlers[ci] = hIters diff --git a/sql/rowexec/other_iters.go b/sql/rowexec/other_iters.go index c24cfa99f8..b24be9de19 100644 --- a/sql/rowexec/other_iters.go +++ b/sql/rowexec/other_iters.go @@ -16,7 +16,7 @@ package rowexec import ( "fmt" -"io" + "io" "sync" "github.com/dolthub/go-mysql-server/sql"