Skip to content

Commit 2d46231

Browse files
author
James Cor
committed
implement for value rows
1 parent c08ecfa commit 2d46231

File tree

1 file changed

+27
-23
lines changed

1 file changed

+27
-23
lines changed

server/handler.go

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,7 @@ func (h *Handler) doQuery(
494494
} else if analyzer.FlagIsSet(qFlags, sql.QFlagMax1Row) {
495495
r, err = resultForMax1RowIter(sqlCtx, schema, rowIter, resultFields, buf)
496496
} else if vr, ok := rowIter.(sql.ValueRowIter); ok && vr.IsValueRowIter(sqlCtx) {
497-
r, processedAtLeastOneBatch, err = h.resultForValueRowIter(sqlCtx, c, schema, vr, resultFields, buf, callback, more)
497+
r, buf, processedAtLeastOneBatch, err = h.resultForValueRowIter(sqlCtx, c, schema, vr, resultFields, callback, more)
498498
} else {
499499
r, buf, processedAtLeastOneBatch, err = h.resultForDefaultIter(sqlCtx, c, schema, rowIter, callback, resultFields, more)
500500
}
@@ -768,11 +768,11 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
768768
if !ok {
769769
return nil
770770
}
771-
processedAtLeastOneBatch = true
772771
err = callback(bufRes.res, more)
773772
if err != nil {
774773
return err
775774
}
775+
processedAtLeastOneBatch = true
776776
// A server-side cursor allows the caller to fetch results cached on the server-side,
777777
// so if a cursor exists, we can't release the buffer memory yet.
778778
// TODO: In the case of a cursor, we are leaking memory
@@ -802,7 +802,7 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
802802
return res, buf, processedAtLeastOneBatch, nil
803803
}
804804

805-
func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.ValueRowIter, resultFields []*querypb.Field, buf *sql.ByteBuffer, callback func(*sqltypes.Result, bool) error, more bool) (*sqltypes.Result, bool, error) {
805+
func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.ValueRowIter, resultFields []*querypb.Field, callback func(*sqltypes.Result, bool) error, more bool) (*sqltypes.Result, *sql.ByteBuffer, bool, error) {
806806
defer trace.StartRegion(ctx, "Handler.resultForValueRowIter").End()
807807

808808
eg, ctx := ctx.NewErrgroup()
@@ -833,18 +833,6 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
833833
timer := time.NewTimer(waitTime)
834834
defer timer.Stop()
835835

836-
// Wrap the callback to include a BytesBuffer.Reset() for non-cursor requests, to
837-
// clean out rows that have already been spooled.
838-
// A server-side cursor allows the caller to fetch results cached on the server-side,
839-
// so if a cursor exists, we can't release the buffer memory yet.
840-
resetCallback := callback
841-
if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 {
842-
resetCallback = func(r *sqltypes.Result, more bool) error {
843-
defer buf.Reset()
844-
return callback(r, more)
845-
}
846-
}
847-
848836
wg := sync.WaitGroup{}
849837
wg.Add(3)
850838

@@ -876,8 +864,13 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
876864
})
877865

878866
// Drain rows from rowChan, convert to wire format, and send to resChan
879-
var resChan = make(chan *sqltypes.Result, 4)
867+
type bufferedResult struct {
868+
res *sqltypes.Result
869+
buf *sql.ByteBuffer
870+
}
871+
var resChan = make(chan bufferedResult, 4)
880872
var res *sqltypes.Result
873+
var buf *sql.ByteBuffer
881874
eg.Go(func() (err error) {
882875
defer pan2err(&err)
883876
defer close(resChan)
@@ -889,6 +882,8 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
889882
Fields: resultFields,
890883
Rows: make([][]sqltypes.Value, rowsBatch),
891884
}
885+
buf = sql.ByteBufPool.Get().(*sql.ByteBuffer)
886+
buf.Reset()
892887
}
893888

894889
select {
@@ -920,8 +915,9 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
920915
select {
921916
case <-ctx.Done():
922917
return context.Cause(ctx)
923-
case resChan <- res:
918+
case resChan <- bufferedResult{res: res, buf: buf}:
924919
res = nil
920+
buf = nil
925921
}
926922
}
927923
}
@@ -940,15 +936,21 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
940936
select {
941937
case <-ctx.Done():
942938
return context.Cause(ctx)
943-
case r, ok := <-resChan:
939+
case bufRes, ok := <-resChan:
944940
if !ok {
945941
return nil
946942
}
947-
processedAtLeastOneBatch = true
948-
err = resetCallback(r, more)
943+
err = callback(bufRes.res, more)
949944
if err != nil {
950945
return err
951946
}
947+
processedAtLeastOneBatch = true
948+
// A server-side cursor allows the caller to fetch results cached on the server-side,
949+
// so if a cursor exists, we can't release the buffer memory yet.
950+
// TODO: In the case of a cursor, we are leaking memory
951+
if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 {
952+
sql.ByteBufPool.Put(bufRes.buf)
953+
}
952954
}
953955
}
954956
})
@@ -967,11 +969,13 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
967969
if verboseErrorLogging {
968970
fmt.Printf("Err: %+v", err)
969971
}
970-
return nil, false, err
972+
return nil, nil, false, err
971973
}
972974

973-
res.Rows = res.Rows[:res.RowsAffected]
974-
return res, processedAtLeastOneBatch, err
975+
if res != nil {
976+
res.Rows = res.Rows[:res.RowsAffected]
977+
}
978+
return res, buf, processedAtLeastOneBatch, err
975979
}
976980

977981
// See https://dev.mysql.com/doc/internals/en/status-flags.html

0 commit comments

Comments
 (0)