Skip to content

Commit 3743f95

Browse files
author
James Cor
committed
implement for value rows
1 parent 5e26462 commit 3743f95

File tree

1 file changed

+25
-21
lines changed

1 file changed

+25
-21
lines changed

server/handler.go

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,7 @@ func (h *Handler) doQuery(
494494
} else if analyzer.FlagIsSet(qFlags, sql.QFlagMax1Row) {
495495
r, err = resultForMax1RowIter(sqlCtx, schema, rowIter, resultFields, buf)
496496
} else if vr, ok := rowIter.(sql.ValueRowIter); ok && vr.IsValueRowIter(sqlCtx) {
497-
r, processedAtLeastOneBatch, err = h.resultForValueRowIter(sqlCtx, c, schema, vr, resultFields, buf, callback, more)
497+
r, buf, processedAtLeastOneBatch, err = h.resultForValueRowIter(sqlCtx, c, schema, vr, resultFields, callback, more)
498498
} else {
499499
r, buf, processedAtLeastOneBatch, err = h.resultForDefaultIter(sqlCtx, c, schema, rowIter, callback, resultFields, more)
500500
}
@@ -768,11 +768,11 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
768768
if !ok {
769769
return nil
770770
}
771-
processedAtLeastOneBatch = true
772771
err = callback(bufRes.res, more)
773772
if err != nil {
774773
return err
775774
}
775+
processedAtLeastOneBatch = true
776776
// A server-side cursor allows the caller to fetch results cached on the server-side,
777777
// so if a cursor exists, we can't release the buffer memory yet.
778778
// TODO: In the case of a cursor, we are leaking memory
@@ -802,7 +802,7 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
802802
return res, buf, processedAtLeastOneBatch, nil
803803
}
804804

805-
func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.ValueRowIter, resultFields []*querypb.Field, buf *sql.ByteBuffer, callback func(*sqltypes.Result, bool) error, more bool) (*sqltypes.Result, bool, error) {
805+
func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.ValueRowIter, resultFields []*querypb.Field, callback func(*sqltypes.Result, bool) error, more bool) (*sqltypes.Result, *sql.ByteBuffer, bool, error) {
806806
defer trace.StartRegion(ctx, "Handler.resultForValueRowIter").End()
807807

808808
eg, ctx := ctx.NewErrgroup()
@@ -833,17 +833,6 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
833833
timer := time.NewTimer(waitTime)
834834
defer timer.Stop()
835835

836-
// Wrap the callback to include a BytesBuffer.Reset() for non-cursor requests, to
837-
// clean out rows that have already been spooled.
838-
// A server-side cursor allows the caller to fetch results cached on the server-side,
839-
// so if a cursor exists, we can't release the buffer memory yet.
840-
if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 {
841-
callback = func(r *sqltypes.Result, more bool) error {
842-
defer buf.Reset()
843-
return callback(r, more)
844-
}
845-
}
846-
847836
wg := sync.WaitGroup{}
848837
wg.Add(3)
849838

@@ -875,8 +864,13 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
875864
})
876865

877866
// Drain rows from rowChan, convert to wire format, and send to resChan
878-
var resChan = make(chan *sqltypes.Result, 4)
867+
type bufferedResult struct {
868+
res *sqltypes.Result
869+
buf *sql.ByteBuffer
870+
}
871+
var resChan = make(chan bufferedResult, 4)
879872
var res *sqltypes.Result
873+
var buf *sql.ByteBuffer
880874
eg.Go(func() (err error) {
881875
defer pan2err(&err)
882876
defer close(resChan)
@@ -888,6 +882,8 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
888882
Fields: resultFields,
889883
Rows: make([][]sqltypes.Value, rowsBatch),
890884
}
885+
buf = sql.ByteBufPool.Get().(*sql.ByteBuffer)
886+
buf.Reset()
891887
}
892888

893889
select {
@@ -919,8 +915,9 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
919915
select {
920916
case <-ctx.Done():
921917
return context.Cause(ctx)
922-
case resChan <- res:
918+
case resChan <- bufferedResult{res: res, buf: buf}:
923919
res = nil
920+
buf = nil
924921
}
925922
}
926923
}
@@ -939,15 +936,21 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
939936
select {
940937
case <-ctx.Done():
941938
return context.Cause(ctx)
942-
case r, ok := <-resChan:
939+
case bufRes, ok := <-resChan:
943940
if !ok {
944941
return nil
945942
}
946-
processedAtLeastOneBatch = true
947-
err = resetCallback(r, more)
943+
err = callback(bufRes.res, more)
948944
if err != nil {
949945
return err
950946
}
947+
processedAtLeastOneBatch = true
948+
// A server-side cursor allows the caller to fetch results cached on the server-side,
949+
// so if a cursor exists, we can't release the buffer memory yet.
950+
// TODO: In the case of a cursor, we are leaking memory
951+
if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 {
952+
sql.ByteBufPool.Put(bufRes.buf)
953+
}
951954
}
952955
}
953956
})
@@ -966,13 +969,14 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
966969
if verboseErrorLogging {
967970
fmt.Printf("Err: %+v", err)
968971
}
969-
return nil, false, err
972+
return nil, nil, false, err
970973
}
971974

972975
if res != nil {
973976
res.Rows = res.Rows[:res.RowsAffected]
974977
}
975-
return res, processedAtLeastOneBatch, err
978+
}
979+
return res, buf, processedAtLeastOneBatch, err
976980
}
977981

978982
// See https://dev.mysql.com/doc/internals/en/status-flags.html

0 commit comments

Comments
 (0)