Skip to content

Commit 5e26462

Browse files
author
James Cor
committed
pair buffers with results
1 parent db5f96c commit 5e26462

File tree

1 file changed

+27
-23
lines changed

1 file changed

+27
-23
lines changed

server/handler.go

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -480,10 +480,8 @@ func (h *Handler) doQuery(
480480
// create result before goroutines to avoid |ctx| racing
481481
resultFields := schemaToFields(sqlCtx, schema)
482482
var r *sqltypes.Result
483+
var buf *sql.ByteBuffer
483484
var processedAtLeastOneBatch bool
484-
485-
buf := sql.ByteBufPool.Get().(*sql.ByteBuffer)
486-
buf.Reset()
487485
defer func() {
488486
sql.ByteBufPool.Put(buf)
489487
}()
@@ -498,7 +496,7 @@ func (h *Handler) doQuery(
498496
} else if vr, ok := rowIter.(sql.ValueRowIter); ok && vr.IsValueRowIter(sqlCtx) {
499497
r, processedAtLeastOneBatch, err = h.resultForValueRowIter(sqlCtx, c, schema, vr, resultFields, buf, callback, more)
500498
} else {
501-
r, processedAtLeastOneBatch, err = h.resultForDefaultIter(sqlCtx, c, schema, rowIter, callback, resultFields, more, buf)
499+
r, buf, processedAtLeastOneBatch, err = h.resultForDefaultIter(sqlCtx, c, schema, rowIter, callback, resultFields, more)
502500
}
503501
if err != nil {
504502
return remainder, err
@@ -527,6 +525,8 @@ func (h *Handler) doQuery(
527525
return remainder, nil
528526
}
529527

528+
// TODO: the very last buffer needs to be released
529+
530530
return remainder, callback(r, more)
531531
}
532532

@@ -598,7 +598,8 @@ func resultForMax1RowIter(ctx *sql.Context, schema sql.Schema, iter sql.RowIter,
598598
row, err := iter.Next(ctx)
599599
if err == io.EOF {
600600
return &sqltypes.Result{Fields: resultFields}, nil
601-
} else if err != nil {
601+
}
602+
if err != nil {
602603
return nil, err
603604
}
604605

@@ -618,7 +619,7 @@ func resultForMax1RowIter(ctx *sql.Context, schema sql.Schema, iter sql.RowIter,
618619

619620
// resultForDefaultIter reads batches of rows from the iterator
620621
// and writes results into the callback function.
621-
func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.RowIter, callback func(*sqltypes.Result, bool) error, resultFields []*querypb.Field, more bool, buf *sql.ByteBuffer) (*sqltypes.Result, bool, error) {
622+
func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.RowIter, callback func(*sqltypes.Result, bool) error, resultFields []*querypb.Field, more bool) (*sqltypes.Result, *sql.ByteBuffer, bool, error) {
622623
defer trace.StartRegion(ctx, "Handler.resultForDefaultIter").End()
623624

624625
eg, ctx := ctx.NewErrgroup()
@@ -650,17 +651,6 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
650651
timer := time.NewTimer(waitTime)
651652
defer timer.Stop()
652653

653-
// Wrap the callback to include a BytesBuffer.Reset() for non-cursor requests, to
654-
// clean out rows that have already been spooled.
655-
// A server-side cursor allows the caller to fetch results cached on the server-side,
656-
// so if a cursor exists, we can't release the buffer memory yet.
657-
if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 {
658-
callback = func(r *sqltypes.Result, more bool) error {
659-
defer buf.Reset()
660-
return callback(r, more)
661-
}
662-
}
663-
664654
iter, projs := GetDeferredProjections(iter)
665655

666656
wg := sync.WaitGroup{}
@@ -694,8 +684,13 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
694684
})
695685

696686
// Drain rows from rowChan, convert to wire format, and send to resChan
697-
var resChan = make(chan *sqltypes.Result, 4)
687+
type bufferedResult struct {
688+
res *sqltypes.Result
689+
buf *sql.ByteBuffer
690+
}
691+
var resChan = make(chan bufferedResult, 4)
698692
var res *sqltypes.Result
693+
var buf *sql.ByteBuffer
699694
eg.Go(func() (err error) {
700695
defer pan2err(&err)
701696
defer wg.Done()
@@ -707,6 +702,8 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
707702
Fields: resultFields,
708703
Rows: make([][]sqltypes.Value, 0, rowsBatch),
709704
}
705+
buf = sql.ByteBufPool.Get().(*sql.ByteBuffer)
706+
buf.Reset()
710707
}
711708

712709
select {
@@ -746,8 +743,9 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
746743
select {
747744
case <-ctx.Done():
748745
return context.Cause(ctx)
749-
case resChan <- res:
746+
case resChan <- bufferedResult{res: res, buf: buf}:
750747
res = nil
748+
buf = nil
751749
}
752750
}
753751
}
@@ -766,15 +764,21 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
766764
select {
767765
case <-ctx.Done():
768766
return context.Cause(ctx)
769-
case r, ok := <-resChan:
767+
case bufRes, ok := <-resChan:
770768
if !ok {
771769
return nil
772770
}
773771
processedAtLeastOneBatch = true
774-
err = callback(r, more)
772+
err = callback(bufRes.res, more)
775773
if err != nil {
776774
return err
777775
}
776+
// A server-side cursor allows the caller to fetch results cached on the server-side,
777+
// so if a cursor exists, we can't release the buffer memory yet.
778+
// TODO: In the case of a cursor, we are leaking memory
779+
if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 {
780+
sql.ByteBufPool.Put(bufRes.buf)
781+
}
778782
}
779783
}
780784
})
@@ -793,9 +797,9 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
793797
if verboseErrorLogging {
794798
fmt.Printf("Err: %+v", err)
795799
}
796-
return nil, false, err
800+
return nil, nil, false, err
797801
}
798-
return res, processedAtLeastOneBatch, nil
802+
return res, buf, processedAtLeastOneBatch, nil
799803
}
800804

801805
func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.ValueRowIter, resultFields []*querypb.Field, buf *sql.ByteBuffer, callback func(*sqltypes.Result, bool) error, more bool) (*sqltypes.Result, bool, error) {

0 commit comments

Comments
 (0)