Skip to content

Commit 160b0f5

Browse files
author
James Cor
committed
implement row2
1 parent da991de commit 160b0f5

File tree

6 files changed

+144
-2
lines changed

6 files changed

+144
-2
lines changed

server/handler.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,8 @@ func (h *Handler) doQuery(
495495
r, err = resultForEmptyIter(sqlCtx, rowIter, resultFields)
496496
} else if analyzer.FlagIsSet(qFlags, sql.QFlagMax1Row) {
497497
r, err = resultForMax1RowIter(sqlCtx, schema, rowIter, resultFields, buf)
498+
} else if ri2, ok := rowIter.(sql.RowIter2); ok && ri2.IsRowIter2(sqlCtx) {
499+
r, err = h.resultForDefaultIter2(sqlCtx, ri2, resultFields, callback, more)
498500
} else {
499501
r, processedAtLeastOneBatch, err = h.resultForDefaultIter(sqlCtx, c, schema, rowIter, callback, resultFields, more, buf)
500502
}
@@ -768,6 +770,32 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
768770
return r, processedAtLeastOneBatch, nil
769771
}
770772

773+
func (h *Handler) resultForDefaultIter2(ctx *sql.Context, iter sql.RowIter2, resultFields []*querypb.Field, callback func(*sqltypes.Result, bool) error, more bool) (*sqltypes.Result, error) {
774+
res := &sqltypes.Result{Fields: resultFields}
775+
for {
776+
if res.RowsAffected == rowsBatch {
777+
if err := callback(res, more); err != nil {
778+
return nil, err
779+
}
780+
res = nil
781+
}
782+
row, err := iter.Next2(ctx)
783+
if err == io.EOF {
784+
return res, nil
785+
}
786+
if err != nil {
787+
return nil, err
788+
}
789+
790+
outRow := make([]sqltypes.Value, len(res.Rows))
791+
for i := range row {
792+
outRow[i] = sqltypes.MakeTrusted(row[i].Typ, row[i].Val)
793+
}
794+
res.Rows = append(res.Rows, outRow)
795+
res.RowsAffected++
796+
}
797+
}
798+
771799
// See https://dev.mysql.com/doc/internals/en/status-flags.html
772800
func setConnStatusFlags(ctx *sql.Context, c *mysql.Conn) error {
773801
ok, err := isSessionAutocommit(ctx)

sql/plan/process.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,29 @@ func (i *TrackedRowIter) Next(ctx *sql.Context) (sql.Row, error) {
317317
return row, nil
318318
}
319319

320+
func (i *TrackedRowIter) Next2(ctx *sql.Context) (sql.Row2, error) {
321+
ri2, ok := i.iter.(sql.RowIter2)
322+
if !ok {
323+
panic(fmt.Sprintf("%T does not implement sql.RowIter2 interface", i.iter))
324+
}
325+
row, err := ri2.Next2(ctx)
326+
if err != nil {
327+
return nil, err
328+
}
329+
i.numRows++
330+
if i.onNext != nil {
331+
i.onNext()
332+
}
333+
return row, nil
334+
}
335+
336+
func (i *TrackedRowIter) IsRowIter2(ctx *sql.Context) bool {
337+
if ri2, ok := i.iter.(sql.RowIter2); ok {
338+
return ri2.IsRowIter2(ctx)
339+
}
340+
return false
341+
}
342+
320343
func (i *TrackedRowIter) Close(ctx *sql.Context) error {
321344
err := i.iter.Close(ctx)
322345

sql/rowexec/transaction_iters.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package rowexec
1616

1717
import (
18+
"fmt"
1819
"io"
1920

2021
"gopkg.in/src-d/go-errors.v1"
@@ -99,6 +100,21 @@ func (t *TransactionCommittingIter) Next(ctx *sql.Context) (sql.Row, error) {
99100
return t.childIter.Next(ctx)
100101
}
101102

103+
func (t *TransactionCommittingIter) Next2(ctx *sql.Context) (sql.Row2, error) {
104+
ri2, ok := t.childIter.(sql.RowIter2)
105+
if !ok {
106+
panic(fmt.Sprintf("%T does not implement sql.RowIter2 interface", t.childIter))
107+
}
108+
return ri2.Next2(ctx)
109+
}
110+
111+
func (t *TransactionCommittingIter) IsRowIter2(ctx *sql.Context) bool {
112+
if ri2, ok := t.childIter.(sql.RowIter2); ok {
113+
return ri2.IsRowIter2(ctx)
114+
}
115+
return false
116+
}
117+
102118
func (t *TransactionCommittingIter) Close(ctx *sql.Context) error {
103119
var err error
104120
if t.childIter != nil {

sql/rows.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@ type RowIter interface {
9292
Closer
9393
}
9494

95+
type RowIter2 interface {
96+
RowIter
97+
Next2(ctx *Context) (Row2, error)
98+
IsRowIter2(ctx *Context) bool
99+
}
100+
95101
// RowIterToRows converts a row iterator to a slice of rows.
96102
func RowIterToRows(ctx *Context, i RowIter) ([]Row, error) {
97103
var rows []Row
@@ -112,7 +118,7 @@ func RowIterToRows(ctx *Context, i RowIter) ([]Row, error) {
112118
return rows, i.Close(ctx)
113119
}
114120

115-
func rowFromRow2(sch Schema, r Row2) Row {
121+
func RowFromRow2(sch Schema, r Row2) Row {
116122
row := make(Row, len(sch))
117123
for i, col := range sch {
118124
switch col.Type.Type() {

sql/table_iter.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package sql
1616

1717
import (
18+
"fmt"
1819
"io"
1920
)
2021

@@ -24,6 +25,8 @@ type TableRowIter struct {
2425
partitions PartitionIter
2526
partition Partition
2627
rows RowIter
28+
29+
rows2 RowIter2
2730
}
2831

2932
var _ RowIter = (*TableRowIter)(nil)
@@ -76,6 +79,73 @@ func (i *TableRowIter) Next(ctx *Context) (Row, error) {
7679
return row, err
7780
}
7881

82+
func (i *TableRowIter) Next2(ctx *Context) (Row2, error) {
83+
select {
84+
case <-ctx.Done():
85+
return nil, ctx.Err()
86+
default:
87+
}
88+
if i.partition == nil {
89+
partition, err := i.partitions.Next(ctx)
90+
if err != nil {
91+
if err == io.EOF {
92+
if e := i.partitions.Close(ctx); e != nil {
93+
return nil, e
94+
}
95+
}
96+
97+
return nil, err
98+
}
99+
100+
i.partition = partition
101+
}
102+
103+
if i.rows2 == nil {
104+
rows, err := i.table.PartitionRows(ctx, i.partition)
105+
if err != nil {
106+
return nil, err
107+
}
108+
ri2, ok := rows.(RowIter2)
109+
if !ok {
110+
panic(fmt.Sprintf("%T does not implement RowIter2", rows))
111+
}
112+
i.rows2 = ri2
113+
}
114+
115+
row, err := i.rows2.Next2(ctx)
116+
if err != nil && err == io.EOF {
117+
if err = i.rows2.Close(ctx); err != nil {
118+
return nil, err
119+
}
120+
i.partition = nil
121+
i.rows2 = nil
122+
row, err = i.Next2(ctx)
123+
}
124+
return row, err
125+
}
126+
127+
func (i *TableRowIter) IsRowIter2(ctx *Context) bool {
128+
if i.partition == nil {
129+
partition, err := i.partitions.Next(ctx)
130+
if err != nil {
131+
return false
132+
}
133+
i.partition = partition
134+
}
135+
if i.rows2 == nil {
136+
rows, err := i.table.PartitionRows(ctx, i.partition)
137+
if err != nil {
138+
return false
139+
}
140+
ri2, ok := rows.(RowIter2)
141+
if !ok {
142+
return false
143+
}
144+
i.rows2 = ri2
145+
}
146+
return i.rows2.IsRowIter2(ctx)
147+
}
148+
79149
func (i *TableRowIter) Close(ctx *Context) error {
80150
if i.rows != nil {
81151
if err := i.rows.Close(ctx); err != nil {

sql/type.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,6 @@ func IsDecimalType(t Type) bool {
294294

295295
type Type2 interface {
296296
Type
297-
298297
// Compare2 returns an integer comparing two Values.
299298
Compare2(Value, Value) (int, error)
300299
// Convert2 converts a value of a compatible type.

0 commit comments

Comments
 (0)