diff --git a/server/handler.go b/server/handler.go index 9f15da4678..53647a8fcf 100644 --- a/server/handler.go +++ b/server/handler.go @@ -701,7 +701,10 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s defer wg.Done() for { if r == nil { - r = &sqltypes.Result{Fields: resultFields} + r = &sqltypes.Result{ + Fields: resultFields, + Rows: make([][]sqltypes.Value, 0, rowsBatch), + } } if r.RowsAffected == rowsBatch { if err := resetCallback(r, more); err != nil { diff --git a/sql/expression/function/aggregation/window_partition.go b/sql/expression/function/aggregation/window_partition.go index 9bf15e8dd9..88ba356b25 100644 --- a/sql/expression/function/aggregation/window_partition.go +++ b/sql/expression/function/aggregation/window_partition.go @@ -170,7 +170,7 @@ func (i *WindowPartitionIter) materializeInput(ctx *sql.Context) (sql.WindowBuff } return nil, nil, err } - input = append(input, append(row, j)) + input = append(input, append(append(sql.Row(nil), row...), j)) j++ } diff --git a/sql/rowexec/join_iters.go b/sql/rowexec/join_iters.go index 30d7de4100..9170e21775 100644 --- a/sql/rowexec/join_iters.go +++ b/sql/rowexec/join_iters.go @@ -48,6 +48,8 @@ type joinIter struct { rowSize int scopeLen int parentLen int + + rowBuffer *sql.RowBuffer } func newJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, row sql.Row) (sql.RowIter, error) { @@ -75,9 +77,10 @@ func newJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, row return nil, err } - parentLen := len(row) + rowBuffer := sql.RowBufPool.Get().(*sql.RowBuffer) - primaryRow := make(sql.Row, parentLen+len(j.Left().Schema())) + parentLen := len(row) + primaryRow := rowBuffer.Get(parentLen + len(j.Left().Schema())) copy(primaryRow, row) return sql.NewSpanIter(span, &joinIter{ @@ -94,6 +97,8 @@ func newJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, row rowSize: parentLen + len(j.Left().Schema()) + len(j.Right().Schema()), scopeLen: j.ScopeLen, parentLen: parentLen, + + rowBuffer: rowBuffer, }), nil } @@ -200,13 +205,17 @@ func (i *joinIter) removeParentRow(r sql.Row) sql.Row { // buildRow builds the result set row using the rows from the primary and secondary tables func (i *joinIter) buildRow(primary, secondary sql.Row) sql.Row { - row := make(sql.Row, i.rowSize) + row := i.rowBuffer.Get(i.rowSize) copy(row, primary) copy(row[len(primary):], secondary) return row } func (i *joinIter) Close(ctx *sql.Context) (err error) { + //i.rowBuffer.Reset() + //sql.RowBufPool.Put(i.rowBuffer) + i.rowBuffer = nil + if i.primary != nil { if err = i.primary.Close(ctx); err != nil { if i.secondary != nil { @@ -232,11 +241,13 @@ func newExistsIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, ro parentLen := len(row) + rowBuffer := sql.RowBufPool.Get().(*sql.RowBuffer) + rowSize := parentLen + len(j.Left().Schema()) + len(j.Right().Schema()) - fullRow := make(sql.Row, rowSize) + fullRow := rowBuffer.Get(rowSize) copy(fullRow, row) - primaryRow := make(sql.Row, parentLen+len(j.Left().Schema())) + primaryRow := rowBuffer.Get(parentLen + len(j.Left().Schema())) copy(primaryRow, row) return &existsIter{ @@ -251,6 +262,7 @@ func newExistsIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, ro scopeLen: j.ScopeLen, rowSize: rowSize, nullRej: !(j.Filter != nil && plan.IsNullRejecting(j.Filter)), + rowBuffer: rowBuffer, }, nil } @@ -271,6 +283,8 @@ type existsIter struct { nullRej bool rightIterNonEmpty bool + + rowBuffer *sql.RowBuffer } type existsState uint8 @@ -396,13 +410,17 @@ func (i *existsIter) removeParentRow(r sql.Row) sql.Row { // buildRow builds the result set row using the rows from the primary and secondary tables func (i *existsIter) buildRow(primary, secondary sql.Row) sql.Row { - row := make(sql.Row, i.rowSize) + row := i.rowBuffer.Get(i.rowSize) copy(row, primary) copy(row[len(primary):], secondary) return row } func (i *existsIter) Close(ctx *sql.Context) (err error) { + i.rowBuffer = nil + //i.rowBuffer.Reset() + //sql.RowBufPool.Put(i.rowBuffer) + if i.primary != nil { if err = i.primary.Close(ctx); err != nil { return err @@ -411,26 +429,6 @@ func (i *existsIter) Close(ctx *sql.Context) (err error) { return err } -func newFullJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, row sql.Row) (sql.RowIter, error) { - leftIter, err := b.Build(ctx, j.Left(), row) - if err != nil { - return nil, err - } - return &fullJoinIter{ - parentRow: row, - l: leftIter, - rp: j.Right(), - cond: j.Filter, - scopeLen: j.ScopeLen, - rowSize: len(row) + len(j.Left().Schema()) + len(j.Right().Schema()), - seenLeft: make(map[uint64]struct{}), - seenRight: make(map[uint64]struct{}), - leftLen: len(j.Left().Schema()), - rightLen: len(j.Right().Schema()), - b: b, - }, nil -} - // fullJoinIter implements full join as a union of left and right join: // FJ(A,B) => U(LJ(A,B), RJ(A,B)). The current algorithm will have a // runtime and memory complexity O(m+n). @@ -451,6 +449,30 @@ type fullJoinIter struct { leftDone bool seenLeft map[uint64]struct{} seenRight map[uint64]struct{} + + rowBuffer *sql.RowBuffer +} + +func newFullJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, row sql.Row) (sql.RowIter, error) { + leftIter, err := b.Build(ctx, j.Left(), row) + if err != nil { + return nil, err + } + return &fullJoinIter{ + parentRow: row, + l: leftIter, + rp: j.Right(), + cond: j.Filter, + scopeLen: j.ScopeLen, + rowSize: len(row) + len(j.Left().Schema()) + len(j.Right().Schema()), + seenLeft: make(map[uint64]struct{}), + seenRight: make(map[uint64]struct{}), + leftLen: len(j.Left().Schema()), + rightLen: len(j.Right().Schema()), + b: b, + + rowBuffer: sql.RowBufPool.Get().(*sql.RowBuffer), + }, nil } func (i *fullJoinIter) Next(ctx *sql.Context) (sql.Row, error) { @@ -546,7 +568,7 @@ func (i *fullJoinIter) Next(ctx *sql.Context) (sql.Row, error) { continue } // (null, right) only if we haven't matched right - ret := make(sql.Row, i.rowSize) + ret := i.rowBuffer.Get(i.rowSize) copy(ret[i.leftLen:], rightRow) return i.removeParentRow(ret), nil } @@ -560,13 +582,17 @@ func (i *fullJoinIter) removeParentRow(r sql.Row) sql.Row { // buildRow builds the result set row using the rows from the primary and secondary tables func (i *fullJoinIter) buildRow(primary, secondary sql.Row) sql.Row { - row := make(sql.Row, i.rowSize) + row := i.rowBuffer.Get(i.rowSize) copy(row, primary) copy(row[len(primary):], secondary) return row } func (i *fullJoinIter) Close(ctx *sql.Context) (err error) { + i.rowBuffer = nil + //i.rowBuffer.Reset() + //sql.RowBufPool.Put(i.rowBuffer) + if i.l != nil { err = i.l.Close(ctx) } @@ -593,6 +619,8 @@ type crossJoinIterator struct { rowSize int scopeLen int parentLen int + + rowBuffer *sql.RowBuffer } func newCrossJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, row sql.Row) (sql.RowIter, error) { @@ -620,9 +648,10 @@ func newCrossJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, return nil, err } - parentLen := len(row) + rowBuffer := sql.RowBufPool.Get().(*sql.RowBuffer) - primaryRow := make(sql.Row, parentLen+len(j.Left().Schema())) + parentLen := len(row) + primaryRow := rowBuffer.Get(parentLen + len(j.Left().Schema())) copy(primaryRow, row) return sql.NewSpanIter(span, &crossJoinIterator{ @@ -635,6 +664,8 @@ func newCrossJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, rowSize: len(row) + len(j.Left().Schema()) + len(j.Right().Schema()), scopeLen: j.ScopeLen, parentLen: parentLen, + + rowBuffer: rowBuffer, }), nil } @@ -664,7 +695,7 @@ func (i *crossJoinIterator) Next(ctx *sql.Context) (sql.Row, error) { return nil, err } - row := make(sql.Row, i.rowSize) + row := i.rowBuffer.Get(i.rowSize) copy(row, i.primaryRow) copy(row[len(i.primaryRow):], rightRow) return i.removeParentRow(row), nil @@ -678,6 +709,10 @@ func (i *crossJoinIterator) removeParentRow(r sql.Row) sql.Row { } func (i *crossJoinIterator) Close(ctx *sql.Context) (err error) { + i.rowBuffer = nil + //i.rowBuffer.Reset() + //sql.RowBufPool.Put(i.rowBuffer) + if i.l != nil { err = i.l.Close(ctx) } @@ -734,6 +769,8 @@ type lateralJoinIterator struct { foundMatch bool b sql.NodeExecBuilder + + rowBuffer *sql.RowBuffer } func newLateralJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, row sql.Row) (sql.RowIter, error) { @@ -769,6 +806,8 @@ func newLateralJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNod rowSize: len(row) + len(j.Left().Schema()) + len(j.Right().Schema()), scopeLen: j.ScopeLen, b: b, + + rowBuffer: sql.RowBufPool.Get().(*sql.RowBuffer), }), nil } @@ -811,7 +850,7 @@ func (i *lateralJoinIterator) loadRight(ctx *sql.Context) error { } func (i *lateralJoinIterator) buildRow(lRow, rRow sql.Row) sql.Row { - row := make(sql.Row, i.rowSize) + row := i.rowBuffer.Get(i.rowSize) copy(row, lRow) copy(row[len(lRow):], rRow) return row @@ -874,6 +913,10 @@ func (i *lateralJoinIterator) Next(ctx *sql.Context) (sql.Row, error) { } func (i *lateralJoinIterator) Close(ctx *sql.Context) error { + i.rowBuffer = nil + //i.rowBuffer.Reset() + //sql.RowBufPool.Put(i.rowBuffer) + var lerr, rerr error if i.lIter != nil { lerr = i.lIter.Close(ctx) diff --git a/sql/rowexec/merge_join.go b/sql/rowexec/merge_join.go index 5a92bcd9f2..41d753f5e0 100644 --- a/sql/rowexec/merge_join.go +++ b/sql/rowexec/merge_join.go @@ -46,8 +46,11 @@ func newMergeJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, return nil, err } - fullRow := make(sql.Row, len(row)+len(j.Left().Schema())+len(j.Right().Schema())) - fullRow[0] = row + //fullRow := make(sql.Row, len(row)+len(j.Left().Schema())+len(j.Right().Schema())) + + rowBuf := sql.RowBufPool.Get().(*sql.RowBuffer) + fullRow := rowBuf.Get(len(row) + len(j.Left().Schema()) + len(j.Right().Schema())) + //fullRow[0] = row if len(row) > 0 { copy(fullRow[0:], row[:]) } @@ -83,6 +86,7 @@ func newMergeJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, leftRowLen: len(j.Left().Schema()), rightRowLen: len(j.Right().Schema()), isReversed: j.IsReversed, + rowBuffer: rowBuf, } return iter, nil } @@ -128,6 +132,8 @@ type mergeJoinIter struct { leftRowLen int rightRowLen int parentLen int + + rowBuffer *sql.RowBuffer } var _ sql.RowIter = (*mergeJoinIter)(nil) @@ -314,7 +320,7 @@ func (i *mergeJoinIter) Next(ctx *sql.Context) (sql.Row, error) { } func (i *mergeJoinIter) copyReturnRow() sql.Row { - ret := make(sql.Row, len(i.fullRow)) + ret := i.rowBuffer.Get(len(i.fullRow)) copy(ret, i.fullRow) return ret } diff --git a/sql/rowexec/range_heap_iter.go b/sql/rowexec/range_heap_iter.go index 12182bbfcf..737c20917b 100644 --- a/sql/rowexec/range_heap_iter.go +++ b/sql/rowexec/range_heap_iter.go @@ -36,6 +36,8 @@ type rangeHeapJoinIter struct { childRowIter sql.RowIter pendingRow sql.Row activeRanges []sql.Row + + rowBuffer *sql.RowBuffer } func newRangeHeapJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, row sql.Row) (sql.RowIter, error) { @@ -68,9 +70,11 @@ func newRangeHeapJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinN return nil, errors.New("right side of join must be a range heap") } + rowBuffer := sql.RowBufPool.Get().(*sql.RowBuffer) + parentLen := len(row) - primaryRow := make(sql.Row, parentLen+len(j.Left().Schema())) + primaryRow := rowBuffer.Get(parentLen + len(j.Left().Schema())) copy(primaryRow, row) return sql.NewSpanIter(span, &rangeHeapJoinIter{ @@ -88,6 +92,8 @@ func newRangeHeapJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinN parentLen: parentLen, rangeHeapPlan: rhp, + + rowBuffer: rowBuffer, }), nil } @@ -202,13 +208,16 @@ func (iter *rangeHeapJoinIter) removeParentRow(r sql.Row) sql.Row { // buildRow builds the result set row using the rows from the primary and secondary tables func (iter *rangeHeapJoinIter) buildRow(primary, secondary sql.Row) sql.Row { - row := make(sql.Row, iter.rowSize) + row := iter.rowBuffer.Get(iter.rowSize) copy(row, primary) copy(row[len(primary):], secondary) return row } func (iter *rangeHeapJoinIter) Close(ctx *sql.Context) (err error) { + iter.rowBuffer.Reset() + sql.RowBufPool.Put(iter.rowBuffer) + if iter.primary != nil { if err = iter.primary.Close(ctx); err != nil { if iter.secondary != nil { diff --git a/sql/rows.go b/sql/rows.go index bd89fd7cd4..e7868ee318 100644 --- a/sql/rows.go +++ b/sql/rows.go @@ -18,6 +18,7 @@ import ( "fmt" "io" "strings" + "sync" "github.com/dolthub/vitess/go/vt/proto/query" @@ -83,6 +84,42 @@ func FormatRow(row Row) string { return sb.String() } +const defaultRowBuffCap = 4096 + +type RowBuffer struct { + i int + buf Row +} + +func NewRowBuffer() *RowBuffer { + return &RowBuffer{ + buf: make(Row, defaultRowBuffCap), + } +} + +func (b *RowBuffer) Get(n int) (res Row) { + newI := b.i + n + if newI >= len(b.buf) { + //b.buf = append(b.buf, b.buf) // TODO: not sure if this is correct, but it seems faster + buf := make(Row, len(b.buf)*2) + copy(b.buf, buf) + b.buf = buf + } + res = b.buf[b.i:newI] + b.i = newI + return +} + +func (b *RowBuffer) Reset() { + b.i = 0 +} + +var RowBufPool = sync.Pool{ + New: func() any { + return NewRowBuffer() + }, +} + // RowIter is an iterator that produces rows. // TODO: most row iters need to be Disposable for CachedResult safety type RowIter interface { @@ -106,7 +143,7 @@ func RowIterToRows(ctx *Context, i RowIter) ([]Row, error) { return nil, err } - rows = append(rows, row) + rows = append(rows, row.Copy()) } return rows, i.Close(ctx)