Skip to content

Commit 31e6671

Browse files
author
James Cor
committed
add row buffer to all joins
1 parent 7c431e7 commit 31e6671

File tree

1 file changed

+70
-32
lines changed

1 file changed

+70
-32
lines changed

sql/rowexec/join_iters.go

Lines changed: 70 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ type joinIter struct {
4848
rowSize int
4949
scopeLen int
5050
parentLen int
51+
52+
rowBuffer *sql.RowBuffer
5153
}
5254

5355
func newJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, row sql.Row) (sql.RowIter, error) {
@@ -75,9 +77,10 @@ func newJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, row
7577
return nil, err
7678
}
7779

78-
parentLen := len(row)
80+
rowBuffer := sql.RowBufPool.Get().(*sql.RowBuffer)
7981

80-
primaryRow := make(sql.Row, parentLen+len(j.Left().Schema()))
82+
parentLen := len(row)
83+
primaryRow := rowBuffer.Get(parentLen + len(j.Left().Schema()))
8184
copy(primaryRow, row)
8285

8386
return sql.NewSpanIter(span, &joinIter{
@@ -94,6 +97,8 @@ func newJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, row
9497
rowSize: parentLen + len(j.Left().Schema()) + len(j.Right().Schema()),
9598
scopeLen: j.ScopeLen,
9699
parentLen: parentLen,
100+
101+
rowBuffer: rowBuffer,
97102
}), nil
98103
}
99104

@@ -200,13 +205,16 @@ func (i *joinIter) removeParentRow(r sql.Row) sql.Row {
200205

201206
// buildRow builds the result set row using the rows from the primary and secondary tables
202207
func (i *joinIter) buildRow(primary, secondary sql.Row) sql.Row {
203-
row := make(sql.Row, i.rowSize)
208+
row := i.rowBuffer.Get(i.rowSize)
204209
copy(row, primary)
205210
copy(row[len(primary):], secondary)
206211
return row
207212
}
208213

209214
func (i *joinIter) Close(ctx *sql.Context) (err error) {
215+
i.rowBuffer.Reset()
216+
sql.RowBufPool.Put(i.rowBuffer)
217+
210218
if i.primary != nil {
211219
if err = i.primary.Close(ctx); err != nil {
212220
if i.secondary != nil {
@@ -232,11 +240,13 @@ func newExistsIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, ro
232240

233241
parentLen := len(row)
234242

243+
rowBuffer := sql.RowBufPool.Get().(*sql.RowBuffer)
244+
235245
rowSize := parentLen + len(j.Left().Schema()) + len(j.Right().Schema())
236-
fullRow := make(sql.Row, rowSize)
246+
fullRow := rowBuffer.Get(rowSize)
237247
copy(fullRow, row)
238248

239-
primaryRow := make(sql.Row, parentLen+len(j.Left().Schema()))
249+
primaryRow := rowBuffer.Get(parentLen + len(j.Left().Schema()))
240250
copy(primaryRow, row)
241251

242252
return &existsIter{
@@ -251,6 +261,7 @@ func newExistsIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, ro
251261
scopeLen: j.ScopeLen,
252262
rowSize: rowSize,
253263
nullRej: !(j.Filter != nil && plan.IsNullRejecting(j.Filter)),
264+
rowBuffer: rowBuffer,
254265
}, nil
255266
}
256267

@@ -271,6 +282,8 @@ type existsIter struct {
271282

272283
nullRej bool
273284
rightIterNonEmpty bool
285+
286+
rowBuffer *sql.RowBuffer
274287
}
275288

276289
type existsState uint8
@@ -396,13 +409,16 @@ func (i *existsIter) removeParentRow(r sql.Row) sql.Row {
396409

397410
// buildRow builds the result set row using the rows from the primary and secondary tables
398411
func (i *existsIter) buildRow(primary, secondary sql.Row) sql.Row {
399-
row := make(sql.Row, i.rowSize)
412+
row := i.rowBuffer.Get(i.rowSize)
400413
copy(row, primary)
401414
copy(row[len(primary):], secondary)
402415
return row
403416
}
404417

405418
func (i *existsIter) Close(ctx *sql.Context) (err error) {
419+
i.rowBuffer.Reset()
420+
sql.RowBufPool.Put(i.rowBuffer)
421+
406422
if i.primary != nil {
407423
if err = i.primary.Close(ctx); err != nil {
408424
return err
@@ -411,26 +427,6 @@ func (i *existsIter) Close(ctx *sql.Context) (err error) {
411427
return err
412428
}
413429

414-
func newFullJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, row sql.Row) (sql.RowIter, error) {
415-
leftIter, err := b.Build(ctx, j.Left(), row)
416-
if err != nil {
417-
return nil, err
418-
}
419-
return &fullJoinIter{
420-
parentRow: row,
421-
l: leftIter,
422-
rp: j.Right(),
423-
cond: j.Filter,
424-
scopeLen: j.ScopeLen,
425-
rowSize: len(row) + len(j.Left().Schema()) + len(j.Right().Schema()),
426-
seenLeft: make(map[uint64]struct{}),
427-
seenRight: make(map[uint64]struct{}),
428-
leftLen: len(j.Left().Schema()),
429-
rightLen: len(j.Right().Schema()),
430-
b: b,
431-
}, nil
432-
}
433-
434430
// fullJoinIter implements full join as a union of left and right join:
435431
// FJ(A,B) => U(LJ(A,B), RJ(A,B)). The current algorithm will have a
436432
// runtime and memory complexity O(m+n).
@@ -451,6 +447,30 @@ type fullJoinIter struct {
451447
leftDone bool
452448
seenLeft map[uint64]struct{}
453449
seenRight map[uint64]struct{}
450+
451+
rowBuffer *sql.RowBuffer
452+
}
453+
454+
func newFullJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, row sql.Row) (sql.RowIter, error) {
455+
leftIter, err := b.Build(ctx, j.Left(), row)
456+
if err != nil {
457+
return nil, err
458+
}
459+
return &fullJoinIter{
460+
parentRow: row,
461+
l: leftIter,
462+
rp: j.Right(),
463+
cond: j.Filter,
464+
scopeLen: j.ScopeLen,
465+
rowSize: len(row) + len(j.Left().Schema()) + len(j.Right().Schema()),
466+
seenLeft: make(map[uint64]struct{}),
467+
seenRight: make(map[uint64]struct{}),
468+
leftLen: len(j.Left().Schema()),
469+
rightLen: len(j.Right().Schema()),
470+
b: b,
471+
472+
rowBuffer: sql.RowBufPool.Get().(*sql.RowBuffer),
473+
}, nil
454474
}
455475

456476
func (i *fullJoinIter) Next(ctx *sql.Context) (sql.Row, error) {
@@ -546,7 +566,7 @@ func (i *fullJoinIter) Next(ctx *sql.Context) (sql.Row, error) {
546566
continue
547567
}
548568
// (null, right) only if we haven't matched right
549-
ret := make(sql.Row, i.rowSize)
569+
ret := i.rowBuffer.Get(i.rowSize)
550570
copy(ret[i.leftLen:], rightRow)
551571
return i.removeParentRow(ret), nil
552572
}
@@ -560,13 +580,16 @@ func (i *fullJoinIter) removeParentRow(r sql.Row) sql.Row {
560580

561581
// buildRow builds the result set row using the rows from the primary and secondary tables
562582
func (i *fullJoinIter) buildRow(primary, secondary sql.Row) sql.Row {
563-
row := make(sql.Row, i.rowSize)
583+
row := i.rowBuffer.Get(i.rowSize)
564584
copy(row, primary)
565585
copy(row[len(primary):], secondary)
566586
return row
567587
}
568588

569589
func (i *fullJoinIter) Close(ctx *sql.Context) (err error) {
590+
i.rowBuffer.Reset()
591+
sql.RowBufPool.Put(i.rowBuffer)
592+
570593
if i.l != nil {
571594
err = i.l.Close(ctx)
572595
}
@@ -593,6 +616,8 @@ type crossJoinIterator struct {
593616
rowSize int
594617
scopeLen int
595618
parentLen int
619+
620+
rowBuffer *sql.RowBuffer
596621
}
597622

598623
func newCrossJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, row sql.Row) (sql.RowIter, error) {
@@ -620,9 +645,10 @@ func newCrossJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode,
620645
return nil, err
621646
}
622647

623-
parentLen := len(row)
648+
rowBuffer := sql.RowBufPool.Get().(*sql.RowBuffer)
624649

625-
primaryRow := make(sql.Row, parentLen+len(j.Left().Schema()))
650+
parentLen := len(row)
651+
primaryRow := rowBuffer.Get(parentLen + len(j.Left().Schema()))
626652
copy(primaryRow, row)
627653

628654
return sql.NewSpanIter(span, &crossJoinIterator{
@@ -635,6 +661,8 @@ func newCrossJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode,
635661
rowSize: len(row) + len(j.Left().Schema()) + len(j.Right().Schema()),
636662
scopeLen: j.ScopeLen,
637663
parentLen: parentLen,
664+
665+
rowBuffer: rowBuffer,
638666
}), nil
639667
}
640668

@@ -664,7 +692,7 @@ func (i *crossJoinIterator) Next(ctx *sql.Context) (sql.Row, error) {
664692
return nil, err
665693
}
666694

667-
row := make(sql.Row, i.rowSize)
695+
row := i.rowBuffer.Get(i.rowSize)
668696
copy(row, i.primaryRow)
669697
copy(row[len(i.primaryRow):], rightRow)
670698
return i.removeParentRow(row), nil
@@ -678,6 +706,9 @@ func (i *crossJoinIterator) removeParentRow(r sql.Row) sql.Row {
678706
}
679707

680708
func (i *crossJoinIterator) Close(ctx *sql.Context) (err error) {
709+
i.rowBuffer.Reset() // TODO: just set i.rowBuffer = nil?
710+
sql.RowBufPool.Put(i.rowBuffer)
711+
681712
if i.l != nil {
682713
err = i.l.Close(ctx)
683714
}
@@ -734,6 +765,8 @@ type lateralJoinIterator struct {
734765
foundMatch bool
735766

736767
b sql.NodeExecBuilder
768+
769+
rowBuffer *sql.RowBuffer
737770
}
738771

739772
func newLateralJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, row sql.Row) (sql.RowIter, error) {
@@ -769,6 +802,8 @@ func newLateralJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNod
769802
rowSize: len(row) + len(j.Left().Schema()) + len(j.Right().Schema()),
770803
scopeLen: j.ScopeLen,
771804
b: b,
805+
806+
rowBuffer: sql.RowBufPool.Get().(*sql.RowBuffer),
772807
}), nil
773808
}
774809

@@ -811,7 +846,7 @@ func (i *lateralJoinIterator) loadRight(ctx *sql.Context) error {
811846
}
812847

813848
func (i *lateralJoinIterator) buildRow(lRow, rRow sql.Row) sql.Row {
814-
row := make(sql.Row, i.rowSize)
849+
row := i.rowBuffer.Get(i.rowSize)
815850
copy(row, lRow)
816851
copy(row[len(lRow):], rRow)
817852
return row
@@ -874,6 +909,9 @@ func (i *lateralJoinIterator) Next(ctx *sql.Context) (sql.Row, error) {
874909
}
875910

876911
func (i *lateralJoinIterator) Close(ctx *sql.Context) error {
912+
i.rowBuffer.Reset()
913+
sql.RowBufPool.Put(i.rowBuffer)
914+
877915
var lerr, rerr error
878916
if i.lIter != nil {
879917
lerr = i.lIter.Close(ctx)

0 commit comments

Comments
 (0)