Skip to content

Commit d7f09bb

Browse files
author
James Cor
committed
sql row buffer for merge iters
1 parent a2ed59d commit d7f09bb

File tree

3 files changed

+47
-4
lines changed

3 files changed

+47
-4
lines changed

sql/expression/function/aggregation/window_partition.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func (i *WindowPartitionIter) materializeInput(ctx *sql.Context) (sql.WindowBuff
170170
}
171171
return nil, nil, err
172172
}
173-
input = append(input, append(row, j))
173+
input = append(input, append(append(sql.Row(nil), row...), j))
174174
j++
175175
}
176176

sql/rowexec/merge_join.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,11 @@ func newMergeJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode,
4646
return nil, err
4747
}
4848

49-
fullRow := make(sql.Row, len(row)+len(j.Left().Schema())+len(j.Right().Schema()))
50-
fullRow[0] = row
49+
//fullRow := make(sql.Row, len(row)+len(j.Left().Schema())+len(j.Right().Schema()))
50+
51+
rowBuf := sql.RowBufPool.Get().(*sql.RowBuffer)
52+
fullRow := rowBuf.Get(len(row) + len(j.Left().Schema()) + len(j.Right().Schema()))
53+
//fullRow[0] = row
5154
if len(row) > 0 {
5255
copy(fullRow[0:], row[:])
5356
}
@@ -83,6 +86,7 @@ func newMergeJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode,
8386
leftRowLen: len(j.Left().Schema()),
8487
rightRowLen: len(j.Right().Schema()),
8588
isReversed: j.IsReversed,
89+
rowBuffer: rowBuf,
8690
}
8791
return iter, nil
8892
}
@@ -128,6 +132,8 @@ type mergeJoinIter struct {
128132
leftRowLen int
129133
rightRowLen int
130134
parentLen int
135+
136+
rowBuffer *sql.RowBuffer
131137
}
132138

133139
var _ sql.RowIter = (*mergeJoinIter)(nil)
@@ -314,7 +320,7 @@ func (i *mergeJoinIter) Next(ctx *sql.Context) (sql.Row, error) {
314320
}
315321

316322
func (i *mergeJoinIter) copyReturnRow() sql.Row {
317-
ret := make(sql.Row, len(i.fullRow))
323+
ret := i.rowBuffer.Get(len(i.fullRow))
318324
copy(ret, i.fullRow)
319325
return ret
320326
}

sql/rows.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"fmt"
1919
"io"
2020
"strings"
21+
"sync"
2122

2223
"github.com/dolthub/vitess/go/vt/proto/query"
2324

@@ -83,6 +84,42 @@ func FormatRow(row Row) string {
8384
return sb.String()
8485
}
8586

87+
const defaultRowBuffCap = 4096
88+
89+
type RowBuffer struct {
90+
i int
91+
buf Row
92+
}
93+
94+
func NewRowBuffer() *RowBuffer {
95+
return &RowBuffer{
96+
buf: make(Row, defaultRowBuffCap),
97+
}
98+
}
99+
100+
func (b *RowBuffer) Get(n int) (res Row) {
101+
newI := b.i + n
102+
if newI >= len(b.buf) {
103+
//b.buf = append(b.buf, b.buf) // TODO: not sure if this is correct, but it seems faster
104+
buf := make(Row, len(b.buf)*2)
105+
copy(b.buf, buf)
106+
b.buf = buf
107+
}
108+
res = b.buf[b.i:newI]
109+
b.i = newI
110+
return
111+
}
112+
113+
func (b *RowBuffer) Reset() {
114+
b.i = 0
115+
}
116+
117+
var RowBufPool = sync.Pool{
118+
New: func() any {
119+
return NewRowBuffer()
120+
},
121+
}
122+
86123
// RowIter is an iterator that produces rows.
87124
// TODO: most row iters need to be Disposable for CachedResult safety
88125
type RowIter interface {

0 commit comments

Comments
 (0)