Skip to content

Commit c08ecfa

Browse files
author
James Cor
committed
pair buffers with results
1 parent 47c756d commit c08ecfa

File tree

1 file changed

+27
-24
lines changed

1 file changed

+27
-24
lines changed

server/handler.go

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -480,10 +480,8 @@ func (h *Handler) doQuery(
480480
// create result before goroutines to avoid |ctx| racing
481481
resultFields := schemaToFields(sqlCtx, schema)
482482
var r *sqltypes.Result
483+
var buf *sql.ByteBuffer
483484
var processedAtLeastOneBatch bool
484-
485-
buf := sql.ByteBufPool.Get().(*sql.ByteBuffer)
486-
buf.Reset()
487485
defer func() {
488486
sql.ByteBufPool.Put(buf)
489487
}()
@@ -498,7 +496,7 @@ func (h *Handler) doQuery(
498496
} else if vr, ok := rowIter.(sql.ValueRowIter); ok && vr.IsValueRowIter(sqlCtx) {
499497
r, processedAtLeastOneBatch, err = h.resultForValueRowIter(sqlCtx, c, schema, vr, resultFields, buf, callback, more)
500498
} else {
501-
r, processedAtLeastOneBatch, err = h.resultForDefaultIter(sqlCtx, c, schema, rowIter, callback, resultFields, more, buf)
499+
r, buf, processedAtLeastOneBatch, err = h.resultForDefaultIter(sqlCtx, c, schema, rowIter, callback, resultFields, more)
502500
}
503501
if err != nil {
504502
return remainder, err
@@ -527,6 +525,8 @@ func (h *Handler) doQuery(
527525
return remainder, nil
528526
}
529527

528+
// TODO: the very last buffer needs to be released
529+
530530
return remainder, callback(r, more)
531531
}
532532

@@ -598,7 +598,8 @@ func resultForMax1RowIter(ctx *sql.Context, schema sql.Schema, iter sql.RowIter,
598598
row, err := iter.Next(ctx)
599599
if err == io.EOF {
600600
return &sqltypes.Result{Fields: resultFields}, nil
601-
} else if err != nil {
601+
}
602+
if err != nil {
602603
return nil, err
603604
}
604605

@@ -618,7 +619,7 @@ func resultForMax1RowIter(ctx *sql.Context, schema sql.Schema, iter sql.RowIter,
618619

619620
// resultForDefaultIter reads batches of rows from the iterator
620621
// and writes results into the callback function.
621-
func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.RowIter, callback func(*sqltypes.Result, bool) error, resultFields []*querypb.Field, more bool, buf *sql.ByteBuffer) (*sqltypes.Result, bool, error) {
622+
func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.RowIter, callback func(*sqltypes.Result, bool) error, resultFields []*querypb.Field, more bool) (*sqltypes.Result, *sql.ByteBuffer, bool, error) {
622623
defer trace.StartRegion(ctx, "Handler.resultForDefaultIter").End()
623624

624625
eg, ctx := ctx.NewErrgroup()
@@ -650,18 +651,6 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
650651
timer := time.NewTimer(waitTime)
651652
defer timer.Stop()
652653

653-
// Wrap the callback to include a BytesBuffer.Reset() for non-cursor requests, to
654-
// clean out rows that have already been spooled.
655-
// A server-side cursor allows the caller to fetch results cached on the server-side,
656-
// so if a cursor exists, we can't release the buffer memory yet.
657-
resetCallback := callback
658-
if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 {
659-
resetCallback = func(r *sqltypes.Result, more bool) error {
660-
defer buf.Reset()
661-
return callback(r, more)
662-
}
663-
}
664-
665654
iter, projs := GetDeferredProjections(iter)
666655

667656
wg := sync.WaitGroup{}
@@ -695,8 +684,13 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
695684
})
696685

697686
// Drain rows from rowChan, convert to wire format, and send to resChan
698-
var resChan = make(chan *sqltypes.Result, 4)
687+
type bufferedResult struct {
688+
res *sqltypes.Result
689+
buf *sql.ByteBuffer
690+
}
691+
var resChan = make(chan bufferedResult, 4)
699692
var res *sqltypes.Result
693+
var buf *sql.ByteBuffer
700694
eg.Go(func() (err error) {
701695
defer pan2err(&err)
702696
defer wg.Done()
@@ -708,6 +702,8 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
708702
Fields: resultFields,
709703
Rows: make([][]sqltypes.Value, 0, rowsBatch),
710704
}
705+
buf = sql.ByteBufPool.Get().(*sql.ByteBuffer)
706+
buf.Reset()
711707
}
712708

713709
select {
@@ -747,8 +743,9 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
747743
select {
748744
case <-ctx.Done():
749745
return context.Cause(ctx)
750-
case resChan <- res:
746+
case resChan <- bufferedResult{res: res, buf: buf}:
751747
res = nil
748+
buf = nil
752749
}
753750
}
754751
}
@@ -767,15 +764,21 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
767764
select {
768765
case <-ctx.Done():
769766
return context.Cause(ctx)
770-
case r, ok := <-resChan:
767+
case bufRes, ok := <-resChan:
771768
if !ok {
772769
return nil
773770
}
774771
processedAtLeastOneBatch = true
775-
err = resetCallback(r, more)
772+
err = callback(bufRes.res, more)
776773
if err != nil {
777774
return err
778775
}
776+
// A server-side cursor allows the caller to fetch results cached on the server-side,
777+
// so if a cursor exists, we can't release the buffer memory yet.
778+
// TODO: In the case of a cursor, we are leaking memory
779+
if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 {
780+
sql.ByteBufPool.Put(bufRes.buf)
781+
}
779782
}
780783
}
781784
})
@@ -794,9 +797,9 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
794797
if verboseErrorLogging {
795798
fmt.Printf("Err: %+v", err)
796799
}
797-
return nil, false, err
800+
return nil, nil, false, err
798801
}
799-
return res, processedAtLeastOneBatch, nil
802+
return res, buf, processedAtLeastOneBatch, nil
800803
}
801804

802805
func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.ValueRowIter, resultFields []*querypb.Field, buf *sql.ByteBuffer, callback func(*sqltypes.Result, bool) error, more bool) (*sqltypes.Result, bool, error) {

0 commit comments

Comments
 (0)