Skip to content
This repository was archived by the owner on Jan 28, 2021. It is now read-only.

Commit 2308dd3

Browse files
committed
add experimental optional feature to use in memory joins
Signed-off-by: Miguel Molina <[email protected]>
1 parent 5a90754 commit 2308dd3

File tree

2 files changed

+154
-6
lines changed

2 files changed

+154
-6
lines changed

sql/plan/innerjoin.go

Lines changed: 104 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,17 @@ package plan
22

33
import (
44
"io"
5+
"os"
56
"reflect"
67

78
opentracing "github.com/opentracing/opentracing-go"
89
"gopkg.in/src-d/go-mysql-server.v0/sql"
910
)
1011

12+
const experimentalInMemoryJoinKey = "EXPERIMENTAL_IN_MEMORY_JOIN"
13+
14+
var useInMemoryJoins = os.Getenv(experimentalInMemoryJoinKey) != ""
15+
1116
// InnerJoin is an inner join between two tables.
1217
type InnerJoin struct {
1318
BinaryNode
@@ -61,12 +66,30 @@ func (j *InnerJoin) RowIter(ctx *sql.Context) (sql.RowIter, error) {
6166
return nil, err
6267
}
6368

64-
return sql.NewSpanIter(span, &innerJoinIter{
65-
l: l,
66-
rp: j.Right,
67-
ctx: ctx,
68-
cond: j.Cond,
69-
}), nil
69+
var iter sql.RowIter
70+
if useInMemoryJoins {
71+
r, err := j.Right.RowIter(ctx)
72+
if err != nil {
73+
span.Finish()
74+
return nil, err
75+
}
76+
77+
iter = &innerJoinMemoryIter{
78+
l: l,
79+
r: r,
80+
ctx: ctx,
81+
cond: j.Cond,
82+
}
83+
} else {
84+
iter = &innerJoinIter{
85+
l: l,
86+
rp: j.Right,
87+
ctx: ctx,
88+
cond: j.Cond,
89+
}
90+
}
91+
92+
return sql.NewSpanIter(span, iter), nil
7093
}
7194

7295
// TransformUp implements the Transformable interface.
@@ -196,3 +219,78 @@ func (i *innerJoinIter) Close() error {
196219

197220
return nil
198221
}
222+
223+
type innerJoinMemoryIter struct {
224+
l sql.RowIter
225+
r sql.RowIter
226+
ctx *sql.Context
227+
cond sql.Expression
228+
pos int
229+
leftRow sql.Row
230+
right []sql.Row
231+
}
232+
233+
func (i *innerJoinMemoryIter) Next() (sql.Row, error) {
234+
for {
235+
if i.leftRow == nil {
236+
r, err := i.l.Next()
237+
if err != nil {
238+
return nil, err
239+
}
240+
241+
i.leftRow = r
242+
}
243+
244+
if i.r != nil {
245+
for {
246+
row, err := i.r.Next()
247+
if err != nil {
248+
if err == io.EOF {
249+
break
250+
}
251+
return nil, err
252+
}
253+
254+
i.right = append(i.right, row)
255+
}
256+
i.r = nil
257+
}
258+
259+
if i.pos >= len(i.right) {
260+
i.pos = 0
261+
i.leftRow = nil
262+
continue
263+
}
264+
265+
rightRow := i.right[i.pos]
266+
var row = make(sql.Row, len(i.leftRow)+len(rightRow))
267+
copy(row, i.leftRow)
268+
copy(row[len(i.leftRow):], rightRow)
269+
270+
i.pos++
271+
272+
v, err := i.cond.Eval(i.ctx, row)
273+
if err != nil {
274+
return nil, err
275+
}
276+
277+
if v == true {
278+
return row, nil
279+
}
280+
}
281+
}
282+
283+
func (i *innerJoinMemoryIter) Close() error {
284+
if err := i.l.Close(); err != nil {
285+
if i.r != nil {
286+
_ = i.r.Close()
287+
}
288+
return err
289+
}
290+
291+
if i.r != nil {
292+
return i.r.Close()
293+
}
294+
295+
return nil
296+
}

sql/plan/innerjoin_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,39 @@ func TestInnerJoin(t *testing.T) {
3838
}, rows)
3939
}
4040

41+
func TestInMemoryInnerJoin(t *testing.T) {
42+
useInMemoryJoins = true
43+
defer func() {
44+
useInMemoryJoins = false
45+
}()
46+
47+
require := require.New(t)
48+
finalSchema := append(lSchema, rSchema...)
49+
50+
ltable := mem.NewTable("left", lSchema)
51+
rtable := mem.NewTable("right", rSchema)
52+
insertData(t, ltable)
53+
insertData(t, rtable)
54+
55+
j := NewInnerJoin(
56+
NewResolvedTable(ltable),
57+
NewResolvedTable(rtable),
58+
expression.NewEquals(
59+
expression.NewGetField(0, sql.Text, "lcol1", false),
60+
expression.NewGetField(4, sql.Text, "rcol1", false),
61+
))
62+
63+
require.Equal(finalSchema, j.Schema())
64+
65+
rows := collectRows(t, j)
66+
require.Len(rows, 2)
67+
68+
require.Equal([]sql.Row{
69+
{"col1_1", "col2_1", int32(1111), int64(2222), "col1_1", "col2_1", int32(1111), int64(2222)},
70+
{"col1_2", "col2_2", int32(3333), int64(4444), "col1_2", "col2_2", int32(3333), int64(4444)},
71+
}, rows)
72+
}
73+
4174
func TestInnerJoinEmpty(t *testing.T) {
4275
require := require.New(t)
4376
ctx := sql.NewEmptyContext()
@@ -118,6 +151,23 @@ func BenchmarkInnerJoin(b *testing.B) {
118151
}
119152
})
120153

154+
b.Run("in memory inner join", func(b *testing.B) {
155+
useInMemoryJoins = true
156+
require := require.New(b)
157+
158+
for i := 0; i < b.N; i++ {
159+
iter, err := n1.RowIter(ctx)
160+
require.NoError(err)
161+
162+
rows, err := sql.RowIterToRows(iter)
163+
require.NoError(err)
164+
165+
require.Equal(expected, rows)
166+
}
167+
168+
useInMemoryJoins = false
169+
})
170+
121171
b.Run("cross join with filter", func(b *testing.B) {
122172
require := require.New(b)
123173

0 commit comments

Comments
 (0)