Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions sql/expression/procedurereference.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,3 +470,17 @@ func (c *HandlerCondition) Matches(err error) bool {
return c.Type == HandlerConditionSqlException
}
}

type HandlerIters struct {
Iter sql.RowIter
Cond HandlerCondition
Action DeclareHandlerAction
}

func NewHandlerIters(iter sql.RowIter, cond HandlerCondition, action DeclareHandlerAction) *HandlerIters {
return &HandlerIters{
Iter: iter,
Cond: cond,
Action: action,
}
}
144 changes: 44 additions & 100 deletions sql/rowexec/other.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,119 +146,63 @@ func (b *BaseBuilder) buildCachedResults(ctx *sql.Context, n *plan.CachedResults
}

func (b *BaseBuilder) buildBlock(ctx *sql.Context, n *plan.Block, row sql.Row) (sql.RowIter, error) {
var returnRows []sql.Row
var returnNode sql.Node
var returnSch sql.Schema
var repIterIdx = len(n.Children()) - 1
var subIters = make([]sql.RowIter, len(n.Children()))
var subIterHandlers = make([][]*expression.HandlerIters, len(n.Children()))
seenSelect := false
for ci, child := range n.Children() {
// build and reorganize handlers for subIter
hRefs := n.Pref.InnermostScope.Handlers
hIters := make([]*expression.HandlerIters, len(hRefs))
for i, hRef := range hRefs {
hRowIter, hErr := b.buildNodeExec(ctx, hRef.Stmt, nil)
if hErr != nil {
return nil, hErr
}
hIters[len(hRefs)-i-1] = expression.NewHandlerIters(hRowIter, hRef.Cond, hRef.Action)
}
subIterHandlers[ci] = hIters

selectSeen := false
for _, s := range n.Children() {
// TODO: this should happen at iteration time, but this call is where the actual iteration happens
err := startTransaction(ctx)
// build subIter and save for later
subIter, err := b.buildNodeExec(ctx, child, row)
if err != nil {
return nil, err
}
subIters[ci] = subIter

handleError := func(err error) error {
scope := n.Pref.InnermostScope
for i := len(scope.Handlers) - 1; i >= 0; i-- {
if !scope.Handlers[i].Cond.Matches(err) {
continue
}

handlerRefVal := scope.Handlers[i]

handlerRowIter, err := b.buildNodeExec(ctx, handlerRefVal.Stmt, nil)
if err != nil {
return err
}
defer handlerRowIter.Close(ctx)

for {
_, err := handlerRowIter.Next(ctx)
if err == io.EOF {
break
} else if err != nil {
return err
}
}
switch scope.Handlers[i].Action {
case expression.DeclareHandlerAction_Exit:
return exitBlockError
case expression.DeclareHandlerAction_Continue:
return nil
case expression.DeclareHandlerAction_Undo:
return fmt.Errorf("DECLARE UNDO HANDLER is not supported")
}
}
return err
// the representing node is the last select node in the block
// if there is no select node, the representing node is the last node in the block
subIterNode := child
if blockSubIter, ok := subIter.(plan.BlockRowIter); ok {
subIterNode = blockSubIter.RepresentingNode()
}

err = func() error {
rowCache, disposeFunc := ctx.Memory.NewRowsCache()
defer disposeFunc()

var isSelect bool
subIter, err := b.buildNodeExec(ctx, s, row)
if err != nil {
newErr := handleError(err)
if newErr != nil {
return newErr
}

return nil
}
subIterNode := s
subIterSch := s.Schema()
if blockSubIter, ok := subIter.(plan.BlockRowIter); ok {
subIterNode = blockSubIter.RepresentingNode()
subIterSch = blockSubIter.Schema()
}
if isSelect = plan.NodeRepresentsSelect(subIterNode); isSelect {
selectSeen = true
returnNode = subIterNode
returnSch = subIterSch
} else if !selectSeen {
returnNode = subIterNode
returnSch = subIterSch
}

for {
newRow, err := subIter.Next(ctx)
if err == io.EOF {
err := subIter.Close(ctx)
if err != nil {
return err
}
if isSelect || !selectSeen {
returnRows = rowCache.Get()
}
break
} else if err != nil {
newErr := handleError(err)
if newErr != nil {
return newErr
}
}

if isSelect || !selectSeen {
err = rowCache.Add(newRow)
if err != nil {
return err
}
}
}
return nil
}()
if err != nil {
return nil, err
if plan.NodeRepresentsSelect(subIterNode) {
repIterIdx = ci
seenSelect = true
returnNode = subIterNode
continue
}
if !seenSelect {
returnNode = subIterNode
}
}

if returnNode == nil {
return nil, fmt.Errorf("block does not contain any statements")
}

returnSch := returnNode.Schema()
n.SetSchema(returnSch)
return &blockIter{
internalIter: sql.RowsToRowIter(returnRows...),
internalIter: nil,
repNode: returnNode,
sch: returnSch,
repSch: returnSch,

repIterIdx: repIterIdx,
subIters: subIters,

subIterHandlers: subIterHandlers,
}, nil
}

Expand Down
74 changes: 71 additions & 3 deletions sql/rowexec/other_iters.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
package rowexec

import (
"fmt"
"io"
"sync"

"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/expression"
"github.com/dolthub/go-mysql-server/sql/plan"
)

Expand Down Expand Up @@ -115,19 +117,85 @@ func (itr *dropHistogramIter) Close(_ *sql.Context) error {
type blockIter struct {
internalIter sql.RowIter
repNode sql.Node
sch sql.Schema
repSch sql.Schema

repIterIdx int

subIters []sql.RowIter
subIterHandlers [][]*expression.HandlerIters
}

var _ plan.BlockRowIter = (*blockIter)(nil)

// Next implements the sql.RowIter interface.
func (i *blockIter) Next(ctx *sql.Context) (sql.Row, error) {
// TODO: Stored Procedures are capable of returning multiple result sets. This is not implemented yet.
// Instead, we just return the last select result set or just the last OkResult, and silently discard the rest.
// On the first pass, we exhaust all subIters, and save the appropriate results to return.
if i.internalIter == nil {
// TODO: write helper methods maybe
var returnRows []sql.Row
for si, subIter := range i.subIters {
var err error
for row := sql.Row(nil); ; {
row, err = subIter.Next(ctx)
if err != nil {
break
}
if si == i.repIterIdx {
returnRows = append(returnRows, row)
}
}
if cErr := subIter.Close(ctx); cErr != nil {
return nil, cErr
}
if err == io.EOF {
continue
}

handlers := i.subIterHandlers[si]
for _, handler := range handlers {
if !handler.Cond.Matches(err) {
continue
}
for {
_, err = handler.Iter.Next(ctx)
if err != nil {
break
}
}
if cErr := handler.Iter.Close(ctx); cErr != nil {
return nil, cErr
}
// unhandled error
if err != io.EOF {
return nil, err
}
switch handler.Action {
case expression.DeclareHandlerAction_Continue:
case expression.DeclareHandlerAction_Exit:
return nil, exitBlockError
case expression.DeclareHandlerAction_Undo:
return nil, fmt.Errorf("DECLARE UNDO HANDLER is not supported")
default:
return nil, fmt.Errorf("unknown handler action: %v", handler.Action)
}
break
}
}
i.internalIter = sql.RowsToRowIter(returnRows...)
}
return i.internalIter.Next(ctx)
}

// Close implements the sql.RowIter interface.
func (i *blockIter) Close(ctx *sql.Context) error {
return i.internalIter.Close(ctx)
for _, subIter := range i.subIters {
if err := subIter.Close(ctx); err != nil {
return err
}
}
return nil
}

// RepresentingNode implements the sql.BlockRowIter interface.
Expand All @@ -137,7 +205,7 @@ func (i *blockIter) RepresentingNode() sql.Node {

// Schema implements the sql.BlockRowIter interface.
func (i *blockIter) Schema() sql.Schema {
return i.sch
return i.repSch
}

type prependRowIter struct {
Expand Down
2 changes: 1 addition & 1 deletion sql/rowexec/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func (b *BaseBuilder) buildLoop(ctx *sql.Context, n *plan.Loop, row sql.Row) (sq
return &blockIter{
internalIter: sql.RowsToRowIter(returnRows...),
repNode: returnNode,
sch: returnSch,
repSch: returnSch,
}, nil
}

Expand Down
Loading