Skip to content

Commit 541435e

Browse files
author
James Cor
committed
split send and receive
1 parent 8666dcd commit 541435e

File tree

1 file changed

+37
-9
lines changed

1 file changed

+37
-9
lines changed

server/handler.go

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -771,7 +771,7 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
771771
}
772772

773773
func (h *Handler) resultForDefaultIter2(ctx *sql.Context, c *mysql.Conn, iter sql.RowIter2, resultFields []*querypb.Field, callback func(*sqltypes.Result, bool) error, more bool) (*sqltypes.Result, bool, error) {
774-
defer trace.StartRegion(ctx, "Handler.resultForDefaultIter").End()
774+
defer trace.StartRegion(ctx, "Handler.resultForDefaultIter2").End()
775775

776776
eg, ctx := ctx.NewErrgroup()
777777
pan2err := func(err *error) {
@@ -803,7 +803,34 @@ func (h *Handler) resultForDefaultIter2(ctx *sql.Context, c *mysql.Conn, iter sq
803803
defer timer.Stop()
804804

805805
wg := sync.WaitGroup{}
806-
wg.Add(1)
806+
wg.Add(2)
807+
808+
// TODO: this should be merged below go func
809+
var rowChan = make(chan sql.Row2, 512)
810+
eg.Go(func() (err error) {
811+
defer pan2err(&err)
812+
defer wg.Done()
813+
defer close(rowChan)
814+
for {
815+
select {
816+
case <-ctx.Done():
817+
return context.Cause(ctx)
818+
default:
819+
row, err := iter.Next2(ctx)
820+
if err == io.EOF {
821+
return nil
822+
}
823+
if err != nil {
824+
return err
825+
}
826+
select {
827+
case rowChan <- row:
828+
case <-ctx.Done():
829+
return nil
830+
}
831+
}
832+
}
833+
})
807834

808835
var res *sqltypes.Result
809836
var processedAtLeastOneBatch bool
@@ -813,7 +840,10 @@ func (h *Handler) resultForDefaultIter2(ctx *sql.Context, c *mysql.Conn, iter sq
813840
defer wg.Done()
814841
for {
815842
if res == nil {
816-
res = &sqltypes.Result{Fields: resultFields}
843+
res = &sqltypes.Result{
844+
Fields: resultFields,
845+
Rows: make([][]sqltypes.Value, 0, rowsBatch),
846+
}
817847
}
818848
if res.RowsAffected == rowsBatch {
819849
if err := callback(res, more); err != nil {
@@ -834,14 +864,12 @@ func (h *Handler) resultForDefaultIter2(ctx *sql.Context, c *mysql.Conn, iter sq
834864
ctx.GetLogger().Tracef("connection timeout")
835865
return ErrRowTimeout.New()
836866
}
837-
default:
838-
row, err := iter.Next2(ctx)
839-
if err == io.EOF {
867+
case row, ok := <-rowChan:
868+
if !ok {
840869
return nil
841870
}
842-
if err != nil {
843-
return err
844-
}
871+
// TODO: we can avoid deep copy here by redefining sql.Row2
872+
ctx.GetLogger().Tracef("spooling result row %s", row)
845873
outRow := make([]sqltypes.Value, len(row))
846874
for i := range row {
847875
outRow[i] = sqltypes.MakeTrusted(row[i].Typ, row[i].Val)

0 commit comments

Comments
 (0)