Skip to content

Commit c3a31c1

Browse files
author
James Cor
committed
rewrite blockIter to iterate during exec time
1 parent 84d576a commit c3a31c1

File tree

4 files changed

+131
-105
lines changed

4 files changed

+131
-105
lines changed

sql/expression/procedurereference.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,3 +470,17 @@ func (c *HandlerCondition) Matches(err error) bool {
470470
return c.Type == HandlerConditionSqlException
471471
}
472472
}
473+
474+
type HandlerIters struct {
475+
Iter sql.RowIter
476+
Cond HandlerCondition
477+
Action DeclareHandlerAction
478+
}
479+
480+
func NewHandlerIters(iter sql.RowIter, cond HandlerCondition, action DeclareHandlerAction) *HandlerIters {
481+
return &HandlerIters{
482+
Iter: iter,
483+
Cond: cond,
484+
Action: action,
485+
}
486+
}

sql/rowexec/other.go

Lines changed: 44 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -146,119 +146,63 @@ func (b *BaseBuilder) buildCachedResults(ctx *sql.Context, n *plan.CachedResults
146146
}
147147

148148
func (b *BaseBuilder) buildBlock(ctx *sql.Context, n *plan.Block, row sql.Row) (sql.RowIter, error) {
149-
var returnRows []sql.Row
150149
var returnNode sql.Node
151-
var returnSch sql.Schema
150+
var repIterIdx = len(n.Children()) - 1
151+
var subIters = make([]sql.RowIter, len(n.Children()))
152+
var subIterHandlers = make([][]*expression.HandlerIters, len(n.Children()))
153+
seenSelect := false
154+
for ci, child := range n.Children() {
155+
// build and reorganize handlers for subIter
156+
hRefs := n.Pref.InnermostScope.Handlers
157+
hIters := make([]*expression.HandlerIters, len(hRefs))
158+
for i, hRef := range hRefs {
159+
hRowIter, hErr := b.buildNodeExec(ctx, hRef.Stmt, nil)
160+
if hErr != nil {
161+
return nil, hErr
162+
}
163+
hIters[len(hRefs) - i - 1] = expression.NewHandlerIters(hRowIter, hRef.Cond, hRef.Action)
164+
}
165+
subIterHandlers[ci] = hIters
152166

153-
selectSeen := false
154-
for _, s := range n.Children() {
155-
// TODO: this should happen at iteration time, but this call is where the actual iteration happens
156-
err := startTransaction(ctx)
167+
// build subIter and save for later
168+
subIter, err := b.buildNodeExec(ctx, child, row)
157169
if err != nil {
158170
return nil, err
159171
}
172+
subIters[ci] = subIter
160173

161-
handleError := func(err error) error {
162-
scope := n.Pref.InnermostScope
163-
for i := len(scope.Handlers) - 1; i >= 0; i-- {
164-
if !scope.Handlers[i].Cond.Matches(err) {
165-
continue
166-
}
167-
168-
handlerRefVal := scope.Handlers[i]
169-
170-
handlerRowIter, err := b.buildNodeExec(ctx, handlerRefVal.Stmt, nil)
171-
if err != nil {
172-
return err
173-
}
174-
defer handlerRowIter.Close(ctx)
175-
176-
for {
177-
_, err := handlerRowIter.Next(ctx)
178-
if err == io.EOF {
179-
break
180-
} else if err != nil {
181-
return err
182-
}
183-
}
184-
switch scope.Handlers[i].Action {
185-
case expression.DeclareHandlerAction_Exit:
186-
return exitBlockError
187-
case expression.DeclareHandlerAction_Continue:
188-
return nil
189-
case expression.DeclareHandlerAction_Undo:
190-
return fmt.Errorf("DECLARE UNDO HANDLER is not supported")
191-
}
192-
}
193-
return err
174+
// the representing node is the last select node in the block
175+
// if there is no select node, the representing node is the last node in the block
176+
subIterNode := child
177+
if blockSubIter, ok := subIter.(plan.BlockRowIter); ok {
178+
subIterNode = blockSubIter.RepresentingNode()
194179
}
195-
196-
err = func() error {
197-
rowCache, disposeFunc := ctx.Memory.NewRowsCache()
198-
defer disposeFunc()
199-
200-
var isSelect bool
201-
subIter, err := b.buildNodeExec(ctx, s, row)
202-
if err != nil {
203-
newErr := handleError(err)
204-
if newErr != nil {
205-
return newErr
206-
}
207-
208-
return nil
209-
}
210-
subIterNode := s
211-
subIterSch := s.Schema()
212-
if blockSubIter, ok := subIter.(plan.BlockRowIter); ok {
213-
subIterNode = blockSubIter.RepresentingNode()
214-
subIterSch = blockSubIter.Schema()
215-
}
216-
if isSelect = plan.NodeRepresentsSelect(subIterNode); isSelect {
217-
selectSeen = true
218-
returnNode = subIterNode
219-
returnSch = subIterSch
220-
} else if !selectSeen {
221-
returnNode = subIterNode
222-
returnSch = subIterSch
223-
}
224-
225-
for {
226-
newRow, err := subIter.Next(ctx)
227-
if err == io.EOF {
228-
err := subIter.Close(ctx)
229-
if err != nil {
230-
return err
231-
}
232-
if isSelect || !selectSeen {
233-
returnRows = rowCache.Get()
234-
}
235-
break
236-
} else if err != nil {
237-
newErr := handleError(err)
238-
if newErr != nil {
239-
return newErr
240-
}
241-
}
242-
243-
if isSelect || !selectSeen {
244-
err = rowCache.Add(newRow)
245-
if err != nil {
246-
return err
247-
}
248-
}
249-
}
250-
return nil
251-
}()
252-
if err != nil {
253-
return nil, err
180+
if plan.NodeRepresentsSelect(subIterNode) {
181+
repIterIdx = ci
182+
seenSelect = true
183+
returnNode = subIterNode
184+
continue
185+
}
186+
if !seenSelect {
187+
returnNode = subIterNode
254188
}
255189
}
256190

191+
if returnNode == nil {
192+
return nil, fmt.Errorf("block does not contain any statements")
193+
}
194+
195+
returnSch := returnNode.Schema()
257196
n.SetSchema(returnSch)
258197
return &blockIter{
259-
internalIter: sql.RowsToRowIter(returnRows...),
198+
internalIter: nil,
260199
repNode: returnNode,
261-
sch: returnSch,
200+
repSch: returnSch,
201+
202+
repIterIdx: repIterIdx,
203+
subIters: subIters,
204+
205+
subIterHandlers: subIterHandlers,
262206
}, nil
263207
}
264208

sql/rowexec/other_iters.go

Lines changed: 72 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
package rowexec
1616

1717
import (
18-
"io"
18+
"fmt"
19+
"io"
1920
"sync"
2021

2122
"github.com/dolthub/go-mysql-server/sql"
23+
"github.com/dolthub/go-mysql-server/sql/expression"
2224
"github.com/dolthub/go-mysql-server/sql/plan"
2325
)
2426

@@ -115,19 +117,85 @@ func (itr *dropHistogramIter) Close(_ *sql.Context) error {
115117
type blockIter struct {
116118
internalIter sql.RowIter
117119
repNode sql.Node
118-
sch sql.Schema
120+
repSch sql.Schema
121+
122+
repIterIdx int
123+
124+
subIters []sql.RowIter
125+
subIterHandlers [][]*expression.HandlerIters
119126
}
120127

121128
var _ plan.BlockRowIter = (*blockIter)(nil)
122129

123130
// Next implements the sql.RowIter interface.
124131
func (i *blockIter) Next(ctx *sql.Context) (sql.Row, error) {
132+
// TODO: Stored Procedures are capable of returning multiple result sets. This is not implemented yet.
133+
// Instead, we just return the last select result set or just the last OkResult, and silently discard the rest.
134+
// On the first pass, we exhaust all subIters, and save the appropriate results to return.
135+
if i.internalIter == nil {
136+
// TODO: write helper methods maybe
137+
var returnRows []sql.Row
138+
for si, subIter := range i.subIters {
139+
var err error
140+
for row := sql.Row(nil); ; {
141+
row, err = subIter.Next(ctx)
142+
if err != nil {
143+
break
144+
}
145+
if si == i.repIterIdx {
146+
returnRows = append(returnRows, row)
147+
}
148+
}
149+
if cErr := subIter.Close(ctx); cErr != nil {
150+
return nil, cErr
151+
}
152+
if err == io.EOF {
153+
continue
154+
}
155+
156+
handlers := i.subIterHandlers[si]
157+
for _, handler := range handlers {
158+
if !handler.Cond.Matches(err) {
159+
continue
160+
}
161+
for {
162+
_, err = handler.Iter.Next(ctx)
163+
if err != nil {
164+
break
165+
}
166+
}
167+
if cErr := handler.Iter.Close(ctx); cErr != nil {
168+
return nil, cErr
169+
}
170+
// unhandled error
171+
if err != io.EOF {
172+
return nil, err
173+
}
174+
switch handler.Action {
175+
case expression.DeclareHandlerAction_Continue:
176+
case expression.DeclareHandlerAction_Exit:
177+
return nil, exitBlockError
178+
case expression.DeclareHandlerAction_Undo:
179+
return nil, fmt.Errorf("DECLARE UNDO HANDLER is not supported")
180+
default:
181+
return nil, fmt.Errorf("unknown handler action: %v", handler.Action)
182+
}
183+
break
184+
}
185+
}
186+
i.internalIter = sql.RowsToRowIter(returnRows...)
187+
}
125188
return i.internalIter.Next(ctx)
126189
}
127190

128191
// Close implements the sql.RowIter interface.
129192
func (i *blockIter) Close(ctx *sql.Context) error {
130-
return i.internalIter.Close(ctx)
193+
for _, subIter := range i.subIters {
194+
if err := subIter.Close(ctx); err != nil {
195+
return err
196+
}
197+
}
198+
return nil
131199
}
132200

133201
// RepresentingNode implements the sql.BlockRowIter interface.
@@ -137,7 +205,7 @@ func (i *blockIter) RepresentingNode() sql.Node {
137205

138206
// Schema implements the sql.BlockRowIter interface.
139207
func (i *blockIter) Schema() sql.Schema {
140-
return i.sch
208+
return i.repSch
141209
}
142210

143211
type prependRowIter struct {

sql/rowexec/proc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ func (b *BaseBuilder) buildLoop(ctx *sql.Context, n *plan.Loop, row sql.Row) (sq
336336
return &blockIter{
337337
internalIter: sql.RowsToRowIter(returnRows...),
338338
repNode: returnNode,
339-
sch: returnSch,
339+
repSch: returnSch,
340340
}, nil
341341
}
342342

0 commit comments

Comments
 (0)