Skip to content

Commit 006ebc8

Browse files
authored
Split Iter.Next(), RowToSQL, and callback into separate threads (#3310)
1 parent 188256e commit 006ebc8

File tree

1 file changed

+156
-103
lines changed

1 file changed

+156
-103
lines changed

server/handler.go

Lines changed: 156 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -629,39 +629,6 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
629629
*err = goerrors.Join(*err, wrappedErr)
630630
}
631631
}
632-
wg := sync.WaitGroup{}
633-
wg.Add(2)
634-
635-
var r *sqltypes.Result
636-
var processedAtLeastOneBatch bool
637-
638-
// Read rows off the row iterator and send them to the row channel.
639-
iter, projs := GetDeferredProjections(iter)
640-
var rowChan = make(chan sql.Row, 512)
641-
eg.Go(func() (err error) {
642-
defer pan2err(&err)
643-
defer wg.Done()
644-
defer close(rowChan)
645-
for {
646-
select {
647-
case <-ctx.Done():
648-
return context.Cause(ctx)
649-
default:
650-
row, err := iter.Next(ctx)
651-
if err == io.EOF {
652-
return nil
653-
}
654-
if err != nil {
655-
return err
656-
}
657-
select {
658-
case rowChan <- row:
659-
case <-ctx.Done():
660-
return nil
661-
}
662-
}
663-
}
664-
})
665632

666633
// TODO: poll for closed connections should obviously also run even if
667634
// we're doing something with an OK result or a single row result, etc.
@@ -685,75 +652,133 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
685652

686653
// Wrap the callback to include a BytesBuffer.Reset() for non-cursor requests, to
687654
// clean out rows that have already been spooled.
688-
resetCallback := func(r *sqltypes.Result, more bool) error {
689-
// A server-side cursor allows the caller to fetch results cached on the server-side,
690-
// so if a cursor exists, we can't release the buffer memory yet.
691-
if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 {
655+
// A server-side cursor allows the caller to fetch results cached on the server-side,
656+
// so if a cursor exists, we can't release the buffer memory yet.
657+
if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 {
658+
callback = func(r *sqltypes.Result, more bool) error {
692659
defer buf.Reset()
660+
return callback(r, more)
693661
}
694-
return callback(r, more)
695662
}
696663

697-
// Reads rows from the channel, converts them to wire format,
698-
// and calls |callback| to give them to vitess.
664+
iter, projs := GetDeferredProjections(iter)
665+
666+
wg := sync.WaitGroup{}
667+
wg.Add(3)
668+
669+
// Read rows off the row iterator and send them to the row channel.
670+
var rowChan = make(chan sql.Row, 512)
671+
eg.Go(func() (err error) {
672+
defer pan2err(&err)
673+
defer wg.Done()
674+
defer close(rowChan)
675+
for {
676+
select {
677+
case <-ctx.Done():
678+
return context.Cause(ctx)
679+
default:
680+
row, iErr := iter.Next(ctx)
681+
if iErr == io.EOF {
682+
return nil
683+
}
684+
if iErr != nil {
685+
return iErr
686+
}
687+
select {
688+
case rowChan <- row:
689+
case <-ctx.Done():
690+
return nil
691+
}
692+
}
693+
}
694+
})
695+
696+
// Drain rows from rowChan, convert to wire format, and send to resChan
697+
var resChan = make(chan *sqltypes.Result, 4)
698+
var res *sqltypes.Result
699699
eg.Go(func() (err error) {
700700
defer pan2err(&err)
701-
defer cancelF()
702701
defer wg.Done()
702+
defer close(resChan)
703+
703704
for {
704-
if r == nil {
705-
r = &sqltypes.Result{
705+
if res == nil {
706+
res = &sqltypes.Result{
706707
Fields: resultFields,
707708
Rows: make([][]sqltypes.Value, 0, rowsBatch),
708709
}
709710
}
710-
if r.RowsAffected == rowsBatch {
711-
if err := resetCallback(r, more); err != nil {
712-
return err
713-
}
714-
r = nil
715-
processedAtLeastOneBatch = true
716-
continue
717-
}
718711

719712
select {
720713
case <-ctx.Done():
721714
return context.Cause(ctx)
715+
716+
case <-timer.C:
717+
if h.readTimeout != 0 {
718+
// Cancel and return so Vitess can call the CloseConnection callback
719+
ctx.GetLogger().Warn("connection timeout")
720+
return ErrRowTimeout.New()
721+
}
722+
722723
case row, ok := <-rowChan:
723724
if !ok {
724725
return nil
725726
}
727+
726728
if types.IsOkResult(row) {
727-
if len(r.Rows) > 0 {
729+
if res.RowsAffected > 0 {
728730
panic("Got OkResult mixed with RowResult")
729731
}
730-
r = resultFromOkResult(row[0].(types.OkResult))
732+
res = resultFromOkResult(row[0].(types.OkResult))
731733
continue
732734
}
733735

734-
outputRow, err := RowToSQL(ctx, schema, row, projs, buf)
735-
if err != nil {
736-
return err
736+
outRow, sqlErr := RowToSQL(ctx, schema, row, projs, buf)
737+
if sqlErr != nil {
738+
return sqlErr
737739
}
738740

739-
ctx.GetLogger().Tracef("spooling result row %s", outputRow)
740-
r.Rows = append(r.Rows, outputRow)
741-
r.RowsAffected++
742-
if !timer.Stop() {
743-
<-timer.C
744-
}
745-
case <-timer.C:
746-
// TODO: timer should probably go in its own thread, as rowChan is blocking
747-
if h.readTimeout != 0 {
748-
// Cancel and return so Vitess can call the CloseConnection callback
749-
ctx.GetLogger().Tracef("connection timeout")
750-
return ErrRowTimeout.New()
741+
ctx.GetLogger().Tracef("spooling result row %s", outRow)
742+
res.Rows = append(res.Rows, outRow)
743+
res.RowsAffected++
744+
745+
if res.RowsAffected == rowsBatch {
746+
select {
747+
case <-ctx.Done():
748+
return context.Cause(ctx)
749+
case resChan <- res:
750+
res = nil
751+
}
751752
}
752753
}
754+
753755
timer.Reset(waitTime)
754756
}
755757
})
756758

759+
// Drain sqltypes.Result from resChan and call callback (send to client and potentially reset buffer)
760+
var processedAtLeastOneBatch bool
761+
eg.Go(func() (err error) {
762+
defer pan2err(&err)
763+
defer cancelF()
764+
defer wg.Done()
765+
for {
766+
select {
767+
case <-ctx.Done():
768+
return context.Cause(ctx)
769+
case r, ok := <-resChan:
770+
if !ok {
771+
return nil
772+
}
773+
processedAtLeastOneBatch = true
774+
err = callback(r, more)
775+
if err != nil {
776+
return err
777+
}
778+
}
779+
}
780+
})
781+
757782
// Close() kills this PID in the process list,
758783
// wait until all rows have be sent over the wire
759784
eg.Go(func() (err error) {
@@ -770,7 +795,7 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
770795
}
771796
return nil, false, err
772797
}
773-
return r, processedAtLeastOneBatch, nil
798+
return res, processedAtLeastOneBatch, nil
774799
}
775800

776801
func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.ValueRowIter, resultFields []*querypb.Field, buf *sql.ByteBuffer, callback func(*sqltypes.Result, bool) error, more bool) (*sqltypes.Result, bool, error) {
@@ -804,22 +829,21 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
804829
timer := time.NewTimer(waitTime)
805830
defer timer.Stop()
806831

807-
wg := sync.WaitGroup{}
808-
wg.Add(2)
809-
810832
// Wrap the callback to include a BytesBuffer.Reset() for non-cursor requests, to
811833
// clean out rows that have already been spooled.
812-
resetCallback := func(r *sqltypes.Result, more bool) error {
813-
// A server-side cursor allows the caller to fetch results cached on the server-side,
814-
// so if a cursor exists, we can't release the buffer memory yet.
815-
if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 {
834+
// A server-side cursor allows the caller to fetch results cached on the server-side,
835+
// so if a cursor exists, we can't release the buffer memory yet.
836+
if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 {
837+
callback = func(r *sqltypes.Result, more bool) error {
816838
defer buf.Reset()
839+
return callback(r, more)
817840
}
818-
return callback(r, more)
819841
}
820842

821-
// TODO: send results instead of rows?
822-
// Read rows from iter and send them off
843+
wg := sync.WaitGroup{}
844+
wg.Add(3)
845+
846+
// Drain rows from iter and send to rowsChan
823847
var rowChan = make(chan sql.ValueRow, 512)
824848
eg.Go(func() (err error) {
825849
defer pan2err(&err)
@@ -830,12 +854,12 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
830854
case <-ctx.Done():
831855
return context.Cause(ctx)
832856
default:
833-
row, err := iter.NextValueRow(ctx)
834-
if err == io.EOF {
857+
row, iErr := iter.NextValueRow(ctx)
858+
if iErr == io.EOF {
835859
return nil
836860
}
837-
if err != nil {
838-
return err
861+
if iErr != nil {
862+
return iErr
839863
}
840864
select {
841865
case rowChan <- row:
@@ -846,56 +870,84 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
846870
}
847871
})
848872

873+
// Drain rows from rowChan, convert to wire format, and send to resChan
874+
var resChan = make(chan *sqltypes.Result, 4)
849875
var res *sqltypes.Result
850-
var processedAtLeastOneBatch bool
851876
eg.Go(func() (err error) {
852877
defer pan2err(&err)
853-
defer cancelF()
878+
defer close(resChan)
854879
defer wg.Done()
880+
855881
for {
856882
if res == nil {
857883
res = &sqltypes.Result{
858884
Fields: resultFields,
859-
Rows: make([][]sqltypes.Value, 0, rowsBatch),
860-
}
861-
}
862-
if res.RowsAffected == rowsBatch {
863-
if err := resetCallback(res, more); err != nil {
864-
return err
885+
Rows: make([][]sqltypes.Value, rowsBatch),
865886
}
866-
res = nil
867-
processedAtLeastOneBatch = true
868-
continue
869887
}
870888

871889
select {
872890
case <-ctx.Done():
873891
return context.Cause(ctx)
892+
874893
case <-timer.C:
875894
if h.readTimeout != 0 {
876895
// Cancel and return so Vitess can call the CloseConnection callback
877-
ctx.GetLogger().Tracef("connection timeout")
896+
ctx.GetLogger().Warn("connection timeout")
878897
return ErrRowTimeout.New()
879898
}
899+
880900
case row, ok := <-rowChan:
881901
if !ok {
882902
return nil
883903
}
884-
resRow, err := RowValueToSQLValues(ctx, schema, row, buf)
885-
if err != nil {
886-
return err
904+
905+
outRow, sqlErr := RowValueToSQLValues(ctx, schema, row, buf)
906+
if sqlErr != nil {
907+
return sqlErr
887908
}
888-
ctx.GetLogger().Tracef("spooling result row %s", resRow)
889-
res.Rows = append(res.Rows, resRow)
909+
910+
ctx.GetLogger().Tracef("spooling result row %s", outRow)
911+
res.Rows[res.RowsAffected] = outRow
890912
res.RowsAffected++
891-
if !timer.Stop() {
892-
<-timer.C
913+
914+
if res.RowsAffected == rowsBatch {
915+
select {
916+
case <-ctx.Done():
917+
return context.Cause(ctx)
918+
case resChan <- res:
919+
res = nil
920+
}
893921
}
894922
}
923+
895924
timer.Reset(waitTime)
896925
}
897926
})
898927

928+
// Drain sqltypes.Result from resChan and call callback (send to client and reset buffer)
929+
var processedAtLeastOneBatch bool
930+
eg.Go(func() (err error) {
931+
defer pan2err(&err)
932+
defer cancelF()
933+
defer wg.Done()
934+
for {
935+
select {
936+
case <-ctx.Done():
937+
return context.Cause(ctx)
938+
case r, ok := <-resChan:
939+
if !ok {
940+
return nil
941+
}
942+
processedAtLeastOneBatch = true
943+
err = callback(r, more)
944+
if err != nil {
945+
return err
946+
}
947+
}
948+
}
949+
})
950+
899951
// Close() kills this PID in the process list,
900952
// wait until all rows have be sent over the wire
901953
eg.Go(func() (err error) {
@@ -913,7 +965,8 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
913965
return nil, false, err
914966
}
915967

916-
return res, processedAtLeastOneBatch, nil
968+
res.Rows = res.Rows[:res.RowsAffected]
969+
return res, processedAtLeastOneBatch, err
917970
}
918971

919972
// See https://dev.mysql.com/doc/internals/en/status-flags.html

0 commit comments

Comments
 (0)