From a48dfebc662fc1c5daf058b89ad74d2ed4bcda9f Mon Sep 17 00:00:00 2001 From: James Cor Date: Wed, 19 Nov 2025 00:05:52 -0800 Subject: [PATCH 01/23] more threads --- server/handler.go | 38 +++++++++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/server/handler.go b/server/handler.go index 827fe3cb53..e2cc147543 100644 --- a/server/handler.go +++ b/server/handler.go @@ -650,6 +650,17 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s timer := time.NewTimer(waitTime) defer timer.Stop() + // Wait for signal on the timer.C channel, and error accordingly + eg.Go(func() (err error) { + <-timer.C + if h.readTimeout != 0 { + // Cancel and return so Vitess can call the CloseConnection callback + ctx.GetLogger().Tracef("connection timeout") + return ErrRowTimeout.New() + } + return nil + }) + // Wrap the callback to include a BytesBuffer.Reset() for non-cursor requests, to // clean out rows that have already been spooled. // A server-side cursor allows the caller to fetch results cached on the server-side, @@ -779,6 +790,29 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s } }) + // Read sqltypes.Result from resChan and send to client + eg.Go(func() (err error) { + defer pan2err(&err) + defer cancelF() + defer wg.Done() + for { + select { + case <-ctx.Done(): + return context.Cause(ctx) + + case r, ok := <-resChan: + if !ok { + return nil + } + processedAtLeastOneBatch = true + err = callback(r, more) + if err != nil { + return err + } + } + } + }) + // Close() kills this PID in the process list, // wait until all rows have be sent over the wire eg.Go(func() (err error) { @@ -965,7 +999,9 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema return nil, false, err } - res.Rows = res.Rows[:res.RowsAffected] + if res != nil { + res.Rows = res.Rows[:res.RowsAffected] + } return res, processedAtLeastOneBatch, err } From fb704892db9cca2b65dbe5001b2a5d037bacd34a Mon Sep 17 00:00:00 2001 From: James Cor Date: Wed, 19 Nov 2025 01:53:00 -0800 Subject: [PATCH 02/23] fix timer code --- server/handler.go | 51 ++++++++++++----------------------------------- 1 file changed, 13 insertions(+), 38 deletions(-) diff --git a/server/handler.go b/server/handler.go index e2cc147543..6a2a9554a8 100644 --- a/server/handler.go +++ b/server/handler.go @@ -650,17 +650,6 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s timer := time.NewTimer(waitTime) defer timer.Stop() - // Wait for signal on the timer.C channel, and error accordingly - eg.Go(func() (err error) { - <-timer.C - if h.readTimeout != 0 { - // Cancel and return so Vitess can call the CloseConnection callback - ctx.GetLogger().Tracef("connection timeout") - return ErrRowTimeout.New() - } - return nil - }) - // Wrap the callback to include a BytesBuffer.Reset() for non-cursor requests, to // clean out rows that have already been spooled. // A server-side cursor allows the caller to fetch results cached on the server-side, @@ -677,6 +666,8 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s wg := sync.WaitGroup{} wg.Add(3) + iter, projs := GetDeferredProjections(iter) + // Read rows off the row iterator and send them to the row channel. var rowChan = make(chan sql.Row, 512) eg.Go(func() (err error) { @@ -767,7 +758,7 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s } }) - // Drain sqltypes.Result from resChan and call callback (send to client and potentially reset buffer) + // Drain sqltypes.Result from resChan and call callback (send to client and reset buffer) var processedAtLeastOneBatch bool eg.Go(func() (err error) { defer pan2err(&err) @@ -790,29 +781,6 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s } }) - // Read sqltypes.Result from resChan and send to client - eg.Go(func() (err error) { - defer pan2err(&err) - defer cancelF() - defer wg.Done() - for { - select { - case <-ctx.Done(): - return context.Cause(ctx) - - case r, ok := <-resChan: - if !ok { - return nil - } - processedAtLeastOneBatch = true - err = callback(r, more) - if err != nil { - return err - } - } - } - }) - // Close() kills this PID in the process list, // wait until all rows have be sent over the wire eg.Go(func() (err error) { @@ -955,7 +923,14 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema } } - timer.Reset(waitTime) + // timer has gone off + if !timer.Reset(waitTime) { + if h.readTimeout != 0 { + // Cancel and return so Vitess can call the CloseConnection callback + ctx.GetLogger().Warn("connection timeout") + return ErrRowTimeout.New() + } + } } }) @@ -974,7 +949,7 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema return nil } processedAtLeastOneBatch = true - err = callback(r, more) + err = resetCallback(r, more) if err != nil { return err } @@ -1002,7 +977,7 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema if res != nil { res.Rows = res.Rows[:res.RowsAffected] } - return res, processedAtLeastOneBatch, err + return res, processedAtLeastOneBatch, nil } // See https://dev.mysql.com/doc/internals/en/status-flags.html From 1e598323770717e6b37f601887a8cbf3e1bbf45a Mon Sep 17 00:00:00 2001 From: James Cor Date: Wed, 19 Nov 2025 02:25:54 -0800 Subject: [PATCH 03/23] fix timer again --- server/handler.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/handler.go b/server/handler.go index 6a2a9554a8..eeeac9e4bd 100644 --- a/server/handler.go +++ b/server/handler.go @@ -499,6 +499,9 @@ func (h *Handler) doQuery( r, processedAtLeastOneBatch, err = h.resultForValueRowIter(sqlCtx, c, schema, vr, resultFields, buf, callback, more) } else { r, processedAtLeastOneBatch, err = h.resultForDefaultIter(sqlCtx, c, schema, rowIter, callback, resultFields, more, buf) + if err != nil { + return remainder, err + } } if err != nil { return remainder, err @@ -977,7 +980,7 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema if res != nil { res.Rows = res.Rows[:res.RowsAffected] } - return res, processedAtLeastOneBatch, nil + return res, processedAtLeastOneBatch, err } // See https://dev.mysql.com/doc/internals/en/status-flags.html From d7396b4c6fc421bc8a3adf8bf825d8cabce2c6c3 Mon Sep 17 00:00:00 2001 From: James Cor Date: Wed, 19 Nov 2025 10:26:02 -0800 Subject: [PATCH 04/23] cleanup --- server/handler.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/server/handler.go b/server/handler.go index eeeac9e4bd..518a882c68 100644 --- a/server/handler.go +++ b/server/handler.go @@ -926,14 +926,7 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema } } - // timer has gone off - if !timer.Reset(waitTime) { - if h.readTimeout != 0 { - // Cancel and return so Vitess can call the CloseConnection callback - ctx.GetLogger().Warn("connection timeout") - return ErrRowTimeout.New() - } - } + timer.Reset(waitTime) } }) From eefed5e066592d5772e628404a44859c6c39d811 Mon Sep 17 00:00:00 2001 From: James Cor Date: Wed, 19 Nov 2025 10:43:56 -0800 Subject: [PATCH 05/23] tidy --- server/handler.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/server/handler.go b/server/handler.go index 518a882c68..24ef456182 100644 --- a/server/handler.go +++ b/server/handler.go @@ -499,9 +499,6 @@ func (h *Handler) doQuery( r, processedAtLeastOneBatch, err = h.resultForValueRowIter(sqlCtx, c, schema, vr, resultFields, buf, callback, more) } else { r, processedAtLeastOneBatch, err = h.resultForDefaultIter(sqlCtx, c, schema, rowIter, callback, resultFields, more, buf) - if err != nil { - return remainder, err - } } if err != nil { return remainder, err @@ -669,8 +666,6 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s wg := sync.WaitGroup{} wg.Add(3) - iter, projs := GetDeferredProjections(iter) - // Read rows off the row iterator and send them to the row channel. var rowChan = make(chan sql.Row, 512) eg.Go(func() (err error) { From 1c8cc8c9708bae9886ec80aa277c19f71ae21365 Mon Sep 17 00:00:00 2001 From: James Cor Date: Tue, 18 Nov 2025 17:37:31 -0800 Subject: [PATCH 06/23] todos --- server/handler.go | 6 +++++- sql/byte_buffer.go | 6 +++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/server/handler.go b/server/handler.go index 24ef456182..1819e4da8a 100644 --- a/server/handler.go +++ b/server/handler.go @@ -483,8 +483,8 @@ func (h *Handler) doQuery( var processedAtLeastOneBatch bool buf := sql.ByteBufPool.Get().(*sql.ByteBuffer) + buf.Reset() defer func() { - buf.Reset() sql.ByteBufPool.Put(buf) }() @@ -1196,6 +1196,10 @@ func toSqlHelper(ctx *sql.Context, typ sql.Type, buf *sql.ByteBuffer, val interf if buf == nil { return typ.SQL(ctx, nil, val) } + // TODO: possible to predict max amount of space needed in backing array. + // Only number types are written to byte buffer due to strconv.Append... + // String types already create a new []byte, so it's better to not copy to backing array. + ret, err := typ.SQL(ctx, buf.Get(), val) buf.Grow(ret.Len()) // TODO: shouldn't we check capacity beforehand? return ret, err diff --git a/sql/byte_buffer.go b/sql/byte_buffer.go index f2ccfc53d5..672937329b 100644 --- a/sql/byte_buffer.go +++ b/sql/byte_buffer.go @@ -41,11 +41,10 @@ func NewByteBuffer(initCap int) *ByteBuffer { // they expect to be protected. func (b *ByteBuffer) Grow(n int) { newI := b.i - if b.i+n <= len(b.buf) { + if b.i+n <= cap(b.buf) { // Increment |b.i| if no alloc newI += n - } - if b.i+n >= len(b.buf) { + } else if b.i+n >= cap(b.buf) { // No more space, double. // An external allocation doubled the cap using the size of // the override object, which if used could lead to overall @@ -59,6 +58,7 @@ func (b *ByteBuffer) Grow(n int) { // here because the runtime only doubles based on slice // length. func (b *ByteBuffer) Double() { + // TODO: This wastes memory. The first half of b.buf won't be referenced by anything. buf := make([]byte, len(b.buf)*2) copy(buf, b.buf) b.buf = buf From db5f96cd1cf03c61ae42ef52c1339b20d52666b2 Mon Sep 17 00:00:00 2001 From: James Cor Date: Tue, 18 Nov 2025 22:37:26 -0800 Subject: [PATCH 07/23] test bad byte buffering --- sql/byte_buffer_test.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/sql/byte_buffer_test.go b/sql/byte_buffer_test.go index afe67aa1b7..29110afeaf 100644 --- a/sql/byte_buffer_test.go +++ b/sql/byte_buffer_test.go @@ -15,6 +15,8 @@ package sql import ( + "fmt" + "strconv" "testing" "github.com/stretchr/testify/require" @@ -70,3 +72,24 @@ func TestGrowByteBuffer(t *testing.T) { require.Equal(t, 40, len(b.buf)) require.Equal(t, 0, b.i) } + +func TestByteBufferDoubling(t *testing.T) { + bb := NewByteBuffer(5) + fmt.Printf("bb.buf: %v, cap: %d\n", bb.buf, cap(bb.buf)) + fmt.Printf("bb.i: %v\n", bb.i) + + i0 := bb.Get() + i0 = strconv.AppendInt(i0, 12345, 10) + bb.Grow(len(i0)) + fmt.Printf("i0: %v, cap: %d\n", i0, cap(i0)) + fmt.Printf("bb.buf: %v, cap: %d\n", bb.buf, cap(bb.buf)) + fmt.Printf("bb.i: %v\n", bb.i) + + i5 := bb.Get() + i5 = strconv.AppendInt(i5, 678901, 10) + bb.Grow(len(i5)) + fmt.Printf("i0: %v, cap: %d\n", i0, cap(i0)) + fmt.Printf("i5: %v, cap: %d\n", i5, cap(i5)) + fmt.Printf("bb.buf: %v, cap: %d\n", bb.buf, cap(bb.buf)) + fmt.Printf("bb.i: %v\n", bb.i) +} From 5e2646205d7b3d12d99f6c7da50f3e437567b703 Mon Sep 17 00:00:00 2001 From: James Cor Date: Wed, 19 Nov 2025 12:25:59 -0800 Subject: [PATCH 08/23] pair buffers with results --- server/handler.go | 50 +++++++++++++++++++++++++---------------------- 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/server/handler.go b/server/handler.go index 1819e4da8a..454f407ff8 100644 --- a/server/handler.go +++ b/server/handler.go @@ -480,10 +480,8 @@ func (h *Handler) doQuery( // create result before goroutines to avoid |ctx| racing resultFields := schemaToFields(sqlCtx, schema) var r *sqltypes.Result + var buf *sql.ByteBuffer var processedAtLeastOneBatch bool - - buf := sql.ByteBufPool.Get().(*sql.ByteBuffer) - buf.Reset() defer func() { sql.ByteBufPool.Put(buf) }() @@ -498,7 +496,7 @@ func (h *Handler) doQuery( } else if vr, ok := rowIter.(sql.ValueRowIter); ok && vr.IsValueRowIter(sqlCtx) { r, processedAtLeastOneBatch, err = h.resultForValueRowIter(sqlCtx, c, schema, vr, resultFields, buf, callback, more) } else { - r, processedAtLeastOneBatch, err = h.resultForDefaultIter(sqlCtx, c, schema, rowIter, callback, resultFields, more, buf) + r, buf, processedAtLeastOneBatch, err = h.resultForDefaultIter(sqlCtx, c, schema, rowIter, callback, resultFields, more) } if err != nil { return remainder, err @@ -527,6 +525,8 @@ func (h *Handler) doQuery( return remainder, nil } + // TODO: the very last buffer needs to be released + return remainder, callback(r, more) } @@ -598,7 +598,8 @@ func resultForMax1RowIter(ctx *sql.Context, schema sql.Schema, iter sql.RowIter, row, err := iter.Next(ctx) if err == io.EOF { return &sqltypes.Result{Fields: resultFields}, nil - } else if err != nil { + } + if err != nil { return nil, err } @@ -618,7 +619,7 @@ func resultForMax1RowIter(ctx *sql.Context, schema sql.Schema, iter sql.RowIter, // resultForDefaultIter reads batches of rows from the iterator // and writes results into the callback function. -func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.RowIter, callback func(*sqltypes.Result, bool) error, resultFields []*querypb.Field, more bool, buf *sql.ByteBuffer) (*sqltypes.Result, bool, error) { +func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.RowIter, callback func(*sqltypes.Result, bool) error, resultFields []*querypb.Field, more bool) (*sqltypes.Result, *sql.ByteBuffer, bool, error) { defer trace.StartRegion(ctx, "Handler.resultForDefaultIter").End() eg, ctx := ctx.NewErrgroup() @@ -650,17 +651,6 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s timer := time.NewTimer(waitTime) defer timer.Stop() - // Wrap the callback to include a BytesBuffer.Reset() for non-cursor requests, to - // clean out rows that have already been spooled. - // A server-side cursor allows the caller to fetch results cached on the server-side, - // so if a cursor exists, we can't release the buffer memory yet. - if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 { - callback = func(r *sqltypes.Result, more bool) error { - defer buf.Reset() - return callback(r, more) - } - } - iter, projs := GetDeferredProjections(iter) wg := sync.WaitGroup{} @@ -694,8 +684,13 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s }) // Drain rows from rowChan, convert to wire format, and send to resChan - var resChan = make(chan *sqltypes.Result, 4) + type bufferedResult struct { + res *sqltypes.Result + buf *sql.ByteBuffer + } + var resChan = make(chan bufferedResult, 4) var res *sqltypes.Result + var buf *sql.ByteBuffer eg.Go(func() (err error) { defer pan2err(&err) defer wg.Done() @@ -707,6 +702,8 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s Fields: resultFields, Rows: make([][]sqltypes.Value, 0, rowsBatch), } + buf = sql.ByteBufPool.Get().(*sql.ByteBuffer) + buf.Reset() } select { @@ -746,8 +743,9 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s select { case <-ctx.Done(): return context.Cause(ctx) - case resChan <- res: + case resChan <- bufferedResult{res: res, buf: buf}: res = nil + buf = nil } } } @@ -766,15 +764,21 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s select { case <-ctx.Done(): return context.Cause(ctx) - case r, ok := <-resChan: + case bufRes, ok := <-resChan: if !ok { return nil } processedAtLeastOneBatch = true - err = callback(r, more) + err = callback(bufRes.res, more) if err != nil { return err } + // A server-side cursor allows the caller to fetch results cached on the server-side, + // so if a cursor exists, we can't release the buffer memory yet. + // TODO: In the case of a cursor, we are leaking memory + if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 { + sql.ByteBufPool.Put(bufRes.buf) + } } } }) @@ -793,9 +797,9 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s if verboseErrorLogging { fmt.Printf("Err: %+v", err) } - return nil, false, err + return nil, nil, false, err } - return res, processedAtLeastOneBatch, nil + return res, buf, processedAtLeastOneBatch, nil } func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.ValueRowIter, resultFields []*querypb.Field, buf *sql.ByteBuffer, callback func(*sqltypes.Result, bool) error, more bool) (*sqltypes.Result, bool, error) { From 3743f95c3f6ed1ec4a9dcf4de8954ba877a9fbae Mon Sep 17 00:00:00 2001 From: James Cor Date: Wed, 19 Nov 2025 12:33:04 -0800 Subject: [PATCH 09/23] implement for value rows --- server/handler.go | 46 +++++++++++++++++++++++++--------------------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/server/handler.go b/server/handler.go index 454f407ff8..9eb2493222 100644 --- a/server/handler.go +++ b/server/handler.go @@ -494,7 +494,7 @@ func (h *Handler) doQuery( } else if analyzer.FlagIsSet(qFlags, sql.QFlagMax1Row) { r, err = resultForMax1RowIter(sqlCtx, schema, rowIter, resultFields, buf) } else if vr, ok := rowIter.(sql.ValueRowIter); ok && vr.IsValueRowIter(sqlCtx) { - r, processedAtLeastOneBatch, err = h.resultForValueRowIter(sqlCtx, c, schema, vr, resultFields, buf, callback, more) + r, buf, processedAtLeastOneBatch, err = h.resultForValueRowIter(sqlCtx, c, schema, vr, resultFields, callback, more) } else { r, buf, processedAtLeastOneBatch, err = h.resultForDefaultIter(sqlCtx, c, schema, rowIter, callback, resultFields, more) } @@ -768,11 +768,11 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s if !ok { return nil } - processedAtLeastOneBatch = true err = callback(bufRes.res, more) if err != nil { return err } + processedAtLeastOneBatch = true // A server-side cursor allows the caller to fetch results cached on the server-side, // so if a cursor exists, we can't release the buffer memory yet. // TODO: In the case of a cursor, we are leaking memory @@ -802,7 +802,7 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s return res, buf, processedAtLeastOneBatch, nil } -func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.ValueRowIter, resultFields []*querypb.Field, buf *sql.ByteBuffer, callback func(*sqltypes.Result, bool) error, more bool) (*sqltypes.Result, bool, error) { +func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.ValueRowIter, resultFields []*querypb.Field, callback func(*sqltypes.Result, bool) error, more bool) (*sqltypes.Result, *sql.ByteBuffer, bool, error) { defer trace.StartRegion(ctx, "Handler.resultForValueRowIter").End() eg, ctx := ctx.NewErrgroup() @@ -833,17 +833,6 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema timer := time.NewTimer(waitTime) defer timer.Stop() - // Wrap the callback to include a BytesBuffer.Reset() for non-cursor requests, to - // clean out rows that have already been spooled. - // A server-side cursor allows the caller to fetch results cached on the server-side, - // so if a cursor exists, we can't release the buffer memory yet. - if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 { - callback = func(r *sqltypes.Result, more bool) error { - defer buf.Reset() - return callback(r, more) - } - } - wg := sync.WaitGroup{} wg.Add(3) @@ -875,8 +864,13 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema }) // Drain rows from rowChan, convert to wire format, and send to resChan - var resChan = make(chan *sqltypes.Result, 4) + type bufferedResult struct { + res *sqltypes.Result + buf *sql.ByteBuffer + } + var resChan = make(chan bufferedResult, 4) var res *sqltypes.Result + var buf *sql.ByteBuffer eg.Go(func() (err error) { defer pan2err(&err) defer close(resChan) @@ -888,6 +882,8 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema Fields: resultFields, Rows: make([][]sqltypes.Value, rowsBatch), } + buf = sql.ByteBufPool.Get().(*sql.ByteBuffer) + buf.Reset() } select { @@ -919,8 +915,9 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema select { case <-ctx.Done(): return context.Cause(ctx) - case resChan <- res: + case resChan <- bufferedResult{res: res, buf: buf}: res = nil + buf = nil } } } @@ -939,15 +936,21 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema select { case <-ctx.Done(): return context.Cause(ctx) - case r, ok := <-resChan: + case bufRes, ok := <-resChan: if !ok { return nil } - processedAtLeastOneBatch = true - err = resetCallback(r, more) + err = callback(bufRes.res, more) if err != nil { return err } + processedAtLeastOneBatch = true + // A server-side cursor allows the caller to fetch results cached on the server-side, + // so if a cursor exists, we can't release the buffer memory yet. + // TODO: In the case of a cursor, we are leaking memory + if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 { + sql.ByteBufPool.Put(bufRes.buf) + } } } }) @@ -966,13 +969,14 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema if verboseErrorLogging { fmt.Printf("Err: %+v", err) } - return nil, false, err + return nil, nil, false, err } if res != nil { res.Rows = res.Rows[:res.RowsAffected] } - return res, processedAtLeastOneBatch, err + } + return res, buf, processedAtLeastOneBatch, err } // See https://dev.mysql.com/doc/internals/en/status-flags.html From 8738d0c0791c3728b57d5adc2f6ac9780b247b15 Mon Sep 17 00:00:00 2001 From: James Cor Date: Wed, 19 Nov 2025 14:21:43 -0800 Subject: [PATCH 10/23] rebase --- server/handler.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server/handler.go b/server/handler.go index 9eb2493222..3d1ea28c52 100644 --- a/server/handler.go +++ b/server/handler.go @@ -975,7 +975,6 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema if res != nil { res.Rows = res.Rows[:res.RowsAffected] } - } return res, buf, processedAtLeastOneBatch, err } From a9101fd1bad29a3dadabb0b2ba471cead72f4b8e Mon Sep 17 00:00:00 2001 From: James Cor Date: Wed, 19 Nov 2025 14:25:50 -0800 Subject: [PATCH 11/23] use cap --- sql/byte_buffer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/byte_buffer.go b/sql/byte_buffer.go index 672937329b..278fe59aff 100644 --- a/sql/byte_buffer.go +++ b/sql/byte_buffer.go @@ -41,10 +41,10 @@ func NewByteBuffer(initCap int) *ByteBuffer { // they expect to be protected. func (b *ByteBuffer) Grow(n int) { newI := b.i - if b.i+n <= cap(b.buf) { + if b.i+n < cap(b.buf) { // Increment |b.i| if no alloc newI += n - } else if b.i+n >= cap(b.buf) { + } else { // No more space, double. // An external allocation doubled the cap using the size of // the override object, which if used could lead to overall From f3e6ed8287e98f7eb2c36b1495a8e462236c075a Mon Sep 17 00:00:00 2001 From: James Cor Date: Wed, 19 Nov 2025 14:26:01 -0800 Subject: [PATCH 12/23] use cap --- sql/byte_buffer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/byte_buffer.go b/sql/byte_buffer.go index 278fe59aff..70c01340ec 100644 --- a/sql/byte_buffer.go +++ b/sql/byte_buffer.go @@ -59,7 +59,7 @@ func (b *ByteBuffer) Grow(n int) { // length. func (b *ByteBuffer) Double() { // TODO: This wastes memory. The first half of b.buf won't be referenced by anything. - buf := make([]byte, len(b.buf)*2) + buf := make([]byte, cap(b.buf)*2) copy(buf, b.buf) b.buf = buf } From 718fcdcc57c5695f4bc85997454638514c4ee05c Mon Sep 17 00:00:00 2001 From: James Cor Date: Wed, 19 Nov 2025 15:43:16 -0800 Subject: [PATCH 13/23] fix panic --- server/handler.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/handler.go b/server/handler.go index 3d1ea28c52..295abb3b52 100644 --- a/server/handler.go +++ b/server/handler.go @@ -483,7 +483,9 @@ func (h *Handler) doQuery( var buf *sql.ByteBuffer var processedAtLeastOneBatch bool defer func() { - sql.ByteBufPool.Put(buf) + if buf != nil { + sql.ByteBufPool.Put(buf) + } }() // zero/single return schema use spooling shortcut @@ -745,7 +747,7 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s return context.Cause(ctx) case resChan <- bufferedResult{res: res, buf: buf}: res = nil - buf = nil + buf = nil // TODO: not sure if this is necessary to prevent double Put() } } } From 4e4c21f3ab73813442040236bc9fa965fa0a885a Mon Sep 17 00:00:00 2001 From: James Cor Date: Thu, 20 Nov 2025 12:46:57 -0800 Subject: [PATCH 14/23] review byte buffer --- server/handler.go | 226 +++++++++++++++++++++++++++++----------- sql/byte_buffer.go | 48 +++------ sql/byte_buffer_test.go | 72 +------------ sql/types/bit.go | 2 + sql/types/time.go | 3 +- 5 files changed, 186 insertions(+), 165 deletions(-) diff --git a/server/handler.go b/server/handler.go index 295abb3b52..5b7b1b212b 100644 --- a/server/handler.go +++ b/server/handler.go @@ -480,10 +480,12 @@ func (h *Handler) doQuery( // create result before goroutines to avoid |ctx| racing resultFields := schemaToFields(sqlCtx, schema) var r *sqltypes.Result - var buf *sql.ByteBuffer + var bufs []*sql.ByteBuffer var processedAtLeastOneBatch bool defer func() { - if buf != nil { + // TODO: possible that errors leak memory? + for _, buf := range bufs { + // TODO: nil check? sql.ByteBufPool.Put(buf) } }() @@ -494,11 +496,11 @@ func (h *Handler) doQuery( } else if schema == nil { r, err = resultForEmptyIter(sqlCtx, rowIter, resultFields) } else if analyzer.FlagIsSet(qFlags, sql.QFlagMax1Row) { - r, err = resultForMax1RowIter(sqlCtx, schema, rowIter, resultFields, buf) + r, bufs, err = resultForMax1RowIter(sqlCtx, schema, rowIter, resultFields, bufs) } else if vr, ok := rowIter.(sql.ValueRowIter); ok && vr.IsValueRowIter(sqlCtx) { - r, buf, processedAtLeastOneBatch, err = h.resultForValueRowIter(sqlCtx, c, schema, vr, resultFields, callback, more) + r, bufs, processedAtLeastOneBatch, err = h.resultForValueRowIter(sqlCtx, c, schema, vr, resultFields, callback, more) } else { - r, buf, processedAtLeastOneBatch, err = h.resultForDefaultIter(sqlCtx, c, schema, rowIter, callback, resultFields, more) + r, bufs, processedAtLeastOneBatch, err = h.resultForDefaultIter(sqlCtx, c, schema, rowIter, callback, resultFields, more) } if err != nil { return remainder, err @@ -592,36 +594,36 @@ func GetDeferredProjections(iter sql.RowIter) (sql.RowIter, []sql.Expression) { } // resultForMax1RowIter ensures that an empty iterator returns at most one row -func resultForMax1RowIter(ctx *sql.Context, schema sql.Schema, iter sql.RowIter, resultFields []*querypb.Field, buf *sql.ByteBuffer) (*sqltypes.Result, error) { +func resultForMax1RowIter(ctx *sql.Context, schema sql.Schema, iter sql.RowIter, resultFields []*querypb.Field, bufs []*sql.ByteBuffer) (*sqltypes.Result, []*sql.ByteBuffer, error) { defer trace.StartRegion(ctx, "Handler.resultForMax1RowIter").End() defer iter.Close(ctx) row, err := iter.Next(ctx) if err == io.EOF { - return &sqltypes.Result{Fields: resultFields}, nil + return &sqltypes.Result{Fields: resultFields}, bufs, nil } if err != nil { - return nil, err + return nil, nil, err } if _, err = iter.Next(ctx); err != io.EOF { - return nil, fmt.Errorf("result max1Row iterator returned more than one row") + return nil, nil, fmt.Errorf("result max1Row iterator returned more than one row") } - outputRow, err := RowToSQL(ctx, schema, row, nil, buf) + outputRow, bufs, err := RowToSQL(ctx, schema, row, nil, bufs) if err != nil { - return nil, err + return nil, bufs, err } ctx.GetLogger().Tracef("spooling result row %s", outputRow) - return &sqltypes.Result{Fields: resultFields, Rows: [][]sqltypes.Value{outputRow}, RowsAffected: 1}, nil + return &sqltypes.Result{Fields: resultFields, Rows: [][]sqltypes.Value{outputRow}, RowsAffected: 1}, bufs, nil } // resultForDefaultIter reads batches of rows from the iterator // and writes results into the callback function. -func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.RowIter, callback func(*sqltypes.Result, bool) error, resultFields []*querypb.Field, more bool) (*sqltypes.Result, *sql.ByteBuffer, bool, error) { +func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.RowIter, callback func(*sqltypes.Result, bool) error, resultFields []*querypb.Field, more bool) (*sqltypes.Result, []*sql.ByteBuffer, bool, error) { defer trace.StartRegion(ctx, "Handler.resultForDefaultIter").End() eg, ctx := ctx.NewErrgroup() @@ -687,12 +689,12 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s // Drain rows from rowChan, convert to wire format, and send to resChan type bufferedResult struct { - res *sqltypes.Result - buf *sql.ByteBuffer + res *sqltypes.Result + bufs []*sql.ByteBuffer } var resChan = make(chan bufferedResult, 4) var res *sqltypes.Result - var buf *sql.ByteBuffer + var bufs []*sql.ByteBuffer eg.Go(func() (err error) { defer pan2err(&err) defer wg.Done() @@ -704,8 +706,9 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s Fields: resultFields, Rows: make([][]sqltypes.Value, 0, rowsBatch), } - buf = sql.ByteBufPool.Get().(*sql.ByteBuffer) + buf := sql.ByteBufPool.Get().(*sql.ByteBuffer) buf.Reset() + bufs = append(bufs, buf) } select { @@ -732,9 +735,10 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s continue } - outRow, sqlErr := RowToSQL(ctx, schema, row, projs, buf) - if sqlErr != nil { - return sqlErr + var outRow []sqltypes.Value + outRow, bufs, err = RowToSQL(ctx, schema, row, projs, bufs) + if err != nil { + return err } ctx.GetLogger().Tracef("spooling result row %s", outRow) @@ -745,9 +749,9 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s select { case <-ctx.Done(): return context.Cause(ctx) - case resChan <- bufferedResult{res: res, buf: buf}: + case resChan <- bufferedResult{res: res, bufs: bufs}: res = nil - buf = nil // TODO: not sure if this is necessary to prevent double Put() + bufs = nil } } } @@ -779,7 +783,9 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s // so if a cursor exists, we can't release the buffer memory yet. // TODO: In the case of a cursor, we are leaking memory if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 { - sql.ByteBufPool.Put(bufRes.buf) + for _, buf := range bufRes.bufs { + sql.ByteBufPool.Put(buf) + } } } } @@ -801,10 +807,10 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s } return nil, nil, false, err } - return res, buf, processedAtLeastOneBatch, nil + return res, bufs, processedAtLeastOneBatch, nil } -func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.ValueRowIter, resultFields []*querypb.Field, callback func(*sqltypes.Result, bool) error, more bool) (*sqltypes.Result, *sql.ByteBuffer, bool, error) { +func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.ValueRowIter, resultFields []*querypb.Field, callback func(*sqltypes.Result, bool) error, more bool) (*sqltypes.Result, []*sql.ByteBuffer, bool, error) { defer trace.StartRegion(ctx, "Handler.resultForValueRowIter").End() eg, ctx := ctx.NewErrgroup() @@ -867,12 +873,12 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema // Drain rows from rowChan, convert to wire format, and send to resChan type bufferedResult struct { - res *sqltypes.Result - buf *sql.ByteBuffer + res *sqltypes.Result + bufs []*sql.ByteBuffer } var resChan = make(chan bufferedResult, 4) var res *sqltypes.Result - var buf *sql.ByteBuffer + var bufs []*sql.ByteBuffer eg.Go(func() (err error) { defer pan2err(&err) defer close(resChan) @@ -884,8 +890,9 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema Fields: resultFields, Rows: make([][]sqltypes.Value, rowsBatch), } - buf = sql.ByteBufPool.Get().(*sql.ByteBuffer) + buf := sql.ByteBufPool.Get().(*sql.ByteBuffer) buf.Reset() + bufs = append(bufs, buf) } select { @@ -904,9 +911,10 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema return nil } - outRow, sqlErr := RowValueToSQLValues(ctx, schema, row, buf) - if sqlErr != nil { - return sqlErr + var outRow []sqltypes.Value + outRow, bufs, err = RowValueToSQLValues(ctx, schema, row, bufs) + if err != nil { + return err } ctx.GetLogger().Tracef("spooling result row %s", outRow) @@ -917,9 +925,9 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema select { case <-ctx.Done(): return context.Cause(ctx) - case resChan <- bufferedResult{res: res, buf: buf}: + case resChan <- bufferedResult{res: res, bufs: bufs}: res = nil - buf = nil + bufs = nil } } } @@ -951,7 +959,9 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema // so if a cursor exists, we can't release the buffer memory yet. // TODO: In the case of a cursor, we are leaking memory if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 { - sql.ByteBufPool.Put(bufRes.buf) + for _, buf := range bufRes.bufs { + sql.ByteBufPool.Put(buf) + } } } } @@ -977,7 +987,7 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema if res != nil { res.Rows = res.Rows[:res.RowsAffected] } - return res, buf, processedAtLeastOneBatch, err + return res, bufs, processedAtLeastOneBatch, err } // See https://dev.mysql.com/doc/internals/en/status-flags.html @@ -1201,25 +1211,91 @@ func updateMaxUsedConnectionsStatusVariable() { }() } -func toSqlHelper(ctx *sql.Context, typ sql.Type, buf *sql.ByteBuffer, val interface{}) (sqltypes.Value, error) { - if buf == nil { - return typ.SQL(ctx, nil, val) +func getMaxTypeCapacity(ctx *sql.Context, typ sql.Type) (res int) { + switch typ.Type() { + case sqltypes.Int8: + // Longest possible int8 is len("-128") = 4 + res = 4 + case sqltypes.Int16: + // Longest possible int16 is len("-32768") = 6 + res = 6 + case sqltypes.Int24: + // Longest possible int24 is len("-8388608") = 8 + res = 8 + case sqltypes.Int32: + // Longest possible int32 is len("-2147483648") = 11 + res = 11 + case sqltypes.Int64: + // Longest possible int64 is len("-9223372036854775808") = 20 + res = 20 + case sqltypes.Uint8: + // Longest possible uint8 is len("255") = 3 + res = 3 + case sqltypes.Uint16: + // Longest possible uint16 is len("65535") = 5 + res = 5 + case sqltypes.Uint24: + // Longest possible uint24 is len("16777215") = 8 + res = 8 + case sqltypes.Uint32: + // Longest possible uint32 is len("4294967295") = 10 + res = 10 + case sqltypes.Uint64: + // Longest possible uint64 is len("18446744073709551615") = 20 + res = 20 + case sqltypes.Float32: + // Longest possible 'g' format float32 is len("-3.4028235e+38") = 14 + res = 14 + case sqltypes.Float64: + // Longest possible 'g' format float64 is len("-1.7976931348623157e+308") = 24 + res = 24 + case sqltypes.Time: + // Longest possible Time format is len("-00:00:00.000000") = 16 + res = 16 + case sqltypes.Date: + // Longest possible Date format is len("0000-00-00") = 10s + res = 10 + case sqltypes.Datetime, sqltypes.Timestamp: + // Longest possible Datetime format is len("2006-01-02 15:04:05.999999") = 26 + res = 26 + case sqltypes.Bit: + res = int(typ.MaxTextResponseByteLength(ctx)) + default: + // These types do not use sql.ByteBuffer + res = 0 + } + return +} + +func toSqlHelper(ctx *sql.Context, typ sql.Type, buf *sql.ByteBuffer, val interface{}) (sqltypes.Value, *sql.ByteBuffer, error) { + // Determine the maximum required capacity + // Only numeric types are written to byte buffer through strconv.Append... + // String types already have []byte or string allocated, so they should not use a backing array. + maxCap := getMaxTypeCapacity(ctx, typ) + if maxCap == 0 { + ret, err := typ.SQL(ctx, nil, val) + return ret, buf, err + } + + if !buf.HasCapacity(maxCap) { + buf = sql.ByteBufPool.Get().(*sql.ByteBuffer) + buf.Reset() } - // TODO: possible to predict max amount of space needed in backing array. - // Only number types are written to byte buffer due to strconv.Append... - // String types already create a new []byte, so it's better to not copy to backing array. ret, err := typ.SQL(ctx, buf.Get(), val) - buf.Grow(ret.Len()) // TODO: shouldn't we check capacity beforehand? - return ret, err + buf.Grow(ret.Len()) + return ret, buf, err } -func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Expression, buf *sql.ByteBuffer) ([]sqltypes.Value, error) { +func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Expression, bufs []*sql.ByteBuffer) ([]sqltypes.Value, []*sql.ByteBuffer, error) { // need to make sure the schema is not null as some plan schema is defined as null (e.g. IfElseBlock) if len(sch) == 0 { - return []sqltypes.Value{}, nil + return []sqltypes.Value{}, nil, nil } + // TODO: is it possible for this to utilize more than 1 buffer? + buf := bufs[len(bufs)-1] + outVals := make([]sqltypes.Value, len(sch)) var err error if len(projs) == 0 { @@ -1228,34 +1304,50 @@ func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Express outVals[i] = sqltypes.NULL continue } - outVals[i], err = toSqlHelper(ctx, col.Type, buf, row[i]) + + var newBuf *sql.ByteBuffer + outVals[i], newBuf, err = toSqlHelper(ctx, col.Type, buf, row[i]) if err != nil { - return nil, err + return nil, nil, err + } + + // allocated new backing array + if newBuf != nil && newBuf != buf { + bufs = append(bufs, newBuf) } } - return outVals, nil + return outVals, bufs, nil } for i, col := range sch { - field, err := projs[i].Eval(ctx, row) + var field any + field, err = projs[i].Eval(ctx, row) if err != nil { - return nil, err + return nil, nil, err } if field == nil { outVals[i] = sqltypes.NULL continue } - outVals[i], err = toSqlHelper(ctx, col.Type, buf, field) + + var newBuf *sql.ByteBuffer + outVals[i], newBuf, err = toSqlHelper(ctx, col.Type, buf, field) if err != nil { - return nil, err + return nil, nil, err + } + + // allocated new backing array + if newBuf != nil && newBuf != buf { + bufs = append(bufs, newBuf) } } - return outVals, nil + return outVals, bufs, nil } -func RowValueToSQLValues(ctx *sql.Context, sch sql.Schema, row sql.ValueRow, buf *sql.ByteBuffer) ([]sqltypes.Value, error) { +func RowValueToSQLValues(ctx *sql.Context, sch sql.Schema, row sql.ValueRow, bufs []*sql.ByteBuffer) ([]sqltypes.Value, []*sql.ByteBuffer, error) { + // TODO: we check for empty schema in doQuery, shouldn't need to check here if len(sch) == 0 { - return []sqltypes.Value{}, nil + return []sqltypes.Value{}, nil, nil } var err error outVals := make([]sqltypes.Value, len(sch)) @@ -1270,20 +1362,32 @@ func RowValueToSQLValues(ctx *sql.Context, sch sql.Schema, row sql.ValueRow, buf outVals[i] = sqltypes.MakeTrusted(row[i].Typ, row[i].Val) continue } - if buf == nil { + + // TODO: schema remains constant throughout this query, so no need to recalc this every time + maxCap := getMaxTypeCapacity(ctx, valType) + if maxCap == 0 { outVals[i], err = valType.SQLValue(ctx, row[i], nil) if err != nil { - return nil, err + return nil, nil, err } - continue + return outVals, bufs, nil } + + buf := bufs[len(bufs)-1] + if !buf.HasCapacity(maxCap) { + buf = sql.ByteBufPool.Get().(*sql.ByteBuffer) + buf.Reset() + bufs = append(bufs, buf) + } + outVals[i], err = valType.SQLValue(ctx, row[i], buf.Get()) if err != nil { - return nil, err + // TODO: might be important to return bufs even during err to prevent memory leaks + return nil, nil, err } buf.Grow(outVals[i].Len()) } - return outVals, nil + return outVals, bufs, nil } func schemaToFields(ctx *sql.Context, s sql.Schema) []*querypb.Field { diff --git a/sql/byte_buffer.go b/sql/byte_buffer.go index 70c01340ec..148992891b 100644 --- a/sql/byte_buffer.go +++ b/sql/byte_buffer.go @@ -18,58 +18,42 @@ import ( "sync" ) -const defaultByteBuffCap = 4096 +// TODO: find optimal size +const buffCap = 4096 // 4KB var ByteBufPool = sync.Pool{ New: func() any { - return NewByteBuffer(defaultByteBuffCap) + return NewByteBuffer() }, } type ByteBuffer struct { - buf []byte - i int + pos uint16 + buf [buffCap]byte } -func NewByteBuffer(initCap int) *ByteBuffer { - buf := make([]byte, initCap) - return &ByteBuffer{buf: buf} +func NewByteBuffer() *ByteBuffer { + return &ByteBuffer{} +} + +// HasCapacity indicates if this buffer has `n` bytes worth of capacity left +func (b *ByteBuffer) HasCapacity(n int) bool { + return int(b.pos)+n < buffCap } // Grow records the latest used byte position. Callers // are responsible for accurately reporting which bytes // they expect to be protected. func (b *ByteBuffer) Grow(n int) { - newI := b.i - if b.i+n < cap(b.buf) { - // Increment |b.i| if no alloc - newI += n - } else { - // No more space, double. - // An external allocation doubled the cap using the size of - // the override object, which if used could lead to overall - // shrinking behavior. - b.Double() - } - b.i = newI -} - -// Double expands the backing array by 2x. We do this -// here because the runtime only doubles based on slice -// length. -func (b *ByteBuffer) Double() { - // TODO: This wastes memory. The first half of b.buf won't be referenced by anything. - buf := make([]byte, cap(b.buf)*2) - copy(buf, b.buf) - b.buf = buf + b.pos += uint16(n) } -// Get returns a zero length slice beginning at a safe +// Get returns a zero-length slice beginning at a safe // write position. func (b *ByteBuffer) Get() []byte { - return b.buf[b.i:b.i] + return b.buf[b.pos:b.pos] } func (b *ByteBuffer) Reset() { - b.i = 0 + b.pos = 0 } diff --git a/sql/byte_buffer_test.go b/sql/byte_buffer_test.go index 29110afeaf..22cb199cfa 100644 --- a/sql/byte_buffer_test.go +++ b/sql/byte_buffer_test.go @@ -15,81 +15,13 @@ package sql import ( - "fmt" - "strconv" "testing" - - "github.com/stretchr/testify/require" ) func TestGrowByteBuffer(t *testing.T) { - b := NewByteBuffer(10) - - // grow less than boundary - src1 := []byte{1, 1, 1} - obj1 := append(b.Get(), src1...) - b.Grow(len(src1)) - - require.Equal(t, 10, len(b.buf)) - require.Equal(t, 3, b.i) - require.Equal(t, 10, cap(obj1)) - - // grow to boundary - src2 := []byte{0, 0, 0, 0, 0, 0, 0} - obj2 := append(b.Get(), src2...) - b.Grow(len(src2)) - - require.Equal(t, 20, len(b.buf)) - require.Equal(t, 10, b.i) - require.Equal(t, 7, cap(obj2)) - - src3 := []byte{2, 2, 2, 2, 2} - obj3 := append(b.Get(), src3...) - b.Grow(len(src3)) - - require.Equal(t, 20, len(b.buf)) - require.Equal(t, 15, b.i) - require.Equal(t, 10, cap(obj3)) - - // grow exceeds boundary - - src4 := []byte{3, 3, 3, 3, 3, 3, 3, 3} - obj4 := append(b.Get(), src4...) - b.Grow(len(src4)) - - require.Equal(t, 40, len(b.buf)) - require.Equal(t, 15, b.i) - require.Equal(t, 16, cap(obj4)) - - // objects are all valid after doubling - require.Equal(t, src1, obj1) - require.Equal(t, src2, obj2) - require.Equal(t, src3, obj3) - require.Equal(t, src4, obj4) - - // reset - b.Reset() - require.Equal(t, 40, len(b.buf)) - require.Equal(t, 0, b.i) + // TODO } func TestByteBufferDoubling(t *testing.T) { - bb := NewByteBuffer(5) - fmt.Printf("bb.buf: %v, cap: %d\n", bb.buf, cap(bb.buf)) - fmt.Printf("bb.i: %v\n", bb.i) - - i0 := bb.Get() - i0 = strconv.AppendInt(i0, 12345, 10) - bb.Grow(len(i0)) - fmt.Printf("i0: %v, cap: %d\n", i0, cap(i0)) - fmt.Printf("bb.buf: %v, cap: %d\n", bb.buf, cap(bb.buf)) - fmt.Printf("bb.i: %v\n", bb.i) - - i5 := bb.Get() - i5 = strconv.AppendInt(i5, 678901, 10) - bb.Grow(len(i5)) - fmt.Printf("i0: %v, cap: %d\n", i0, cap(i0)) - fmt.Printf("i5: %v, cap: %d\n", i5, cap(i5)) - fmt.Printf("bb.buf: %v, cap: %d\n", bb.buf, cap(bb.buf)) - fmt.Printf("bb.i: %v\n", bb.i) + // TODO } diff --git a/sql/types/bit.go b/sql/types/bit.go index 50a5b71cc3..b23d503b31 100644 --- a/sql/types/bit.go +++ b/sql/types/bit.go @@ -224,6 +224,7 @@ func (t BitType_) SQL(ctx *sql.Context, dest []byte, v interface{}) (sqltypes.Va } bitVal := value.(uint64) + // TODO: this should append to dest var data []byte for i := uint64(0); i < uint64(t.numOfBits); i += 8 { data = append(data, byte(bitVal>>i)) @@ -252,6 +253,7 @@ func (t BitType_) SQLValue(ctx *sql.Context, v sql.Value, dest []byte) (sqltypes } v.Val = v.Val[:numBytes] + // TODO: can cut out a loop here by just appending backwards from v.Val // want the results in big endian dest = append(dest, v.Val...) for i, j := 0, len(dest)-1; i < j; i, j = i+1, j-1 { diff --git a/sql/types/time.go b/sql/types/time.go index 29ac916948..dd9dbe3ae4 100644 --- a/sql/types/time.go +++ b/sql/types/time.go @@ -268,8 +268,7 @@ func (t TimespanType_) SQL(_ *sql.Context, dest []byte, v interface{}) (sqltypes if err != nil { return sqltypes.Value{}, err } - - val := ti.Bytes() + val := ti.AppendBytes(dest) return sqltypes.MakeTrusted(sqltypes.Time, val), nil } From ac17187f84b0e4e18b6eb1262f447ba300f491bd Mon Sep 17 00:00:00 2001 From: James Cor Date: Thu, 20 Nov 2025 14:35:55 -0800 Subject: [PATCH 15/23] tidy --- enginetest/enginetests.go | 3 +-- server/handler.go | 1 + sql/types/sql_test.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/enginetest/enginetests.go b/enginetest/enginetests.go index 7997aff395..4779a866e9 100644 --- a/enginetest/enginetests.go +++ b/enginetest/enginetests.go @@ -5834,7 +5834,6 @@ func TestTypesOverWire(t *testing.T, harness ClientHarness, sessionBuilder serve require.NoError(t, err) expectedRowSet := script.Results[queryIdx] expectedRowIdx := 0 - buf := sql.NewByteBuffer(1000) var engineRow sql.Row for engineRow, err = engineIter.Next(ctx); err == nil; engineRow, err = engineIter.Next(ctx) { if !assert.True(t, r.Next()) { @@ -5852,7 +5851,7 @@ func TestTypesOverWire(t *testing.T, harness ClientHarness, sessionBuilder serve break } expectedEngineRow := make([]*string, len(engineRow)) - row, err := server.RowToSQL(ctx, sch, engineRow, nil, buf) + row, _, err := server.RowToSQL(ctx, sch, engineRow, nil, nil) if !assert.NoError(t, err) { break } diff --git a/server/handler.go b/server/handler.go index 5b7b1b212b..369d36bede 100644 --- a/server/handler.go +++ b/server/handler.go @@ -1261,6 +1261,7 @@ func getMaxTypeCapacity(ctx *sql.Context, typ sql.Type) (res int) { case sqltypes.Bit: res = int(typ.MaxTextResponseByteLength(ctx)) default: + // TODO: StringType can use backing array depending on the type of // These types do not use sql.ByteBuffer res = 0 } diff --git a/sql/types/sql_test.go b/sql/types/sql_test.go index 5c90f4ddf0..3e1b3701ef 100644 --- a/sql/types/sql_test.go +++ b/sql/types/sql_test.go @@ -24,7 +24,7 @@ func BenchmarkNumI64SQL(b *testing.B) { func BenchmarkVarchar10SQL(b *testing.B) { var res sqltypes.Value t := MustCreateStringWithDefaults(sqltypes.VarChar, 10) - buf := sql.NewByteBuffer(1000) + buf := sql.NewByteBuffer() ctx := sql.NewEmptyContext() for i := 0; i < b.N; i++ { res, _ = t.SQL(ctx, buf.Get(), "char") From 666568b259e99d36b1f0b0883e934d4d29af80d7 Mon Sep 17 00:00:00 2001 From: James Cor Date: Thu, 20 Nov 2025 16:27:35 -0800 Subject: [PATCH 16/23] buffer manager implementation --- server/handler.go | 192 +++++++++++++++++++++------------------------ sql/byte_buffer.go | 79 +++++++++++++------ 2 files changed, 145 insertions(+), 126 deletions(-) diff --git a/server/handler.go b/server/handler.go index 369d36bede..34b712165f 100644 --- a/server/handler.go +++ b/server/handler.go @@ -480,13 +480,11 @@ func (h *Handler) doQuery( // create result before goroutines to avoid |ctx| racing resultFields := schemaToFields(sqlCtx, schema) var r *sqltypes.Result - var bufs []*sql.ByteBuffer + var bm *sql.ByteBufferManager var processedAtLeastOneBatch bool defer func() { - // TODO: possible that errors leak memory? - for _, buf := range bufs { - // TODO: nil check? - sql.ByteBufPool.Put(buf) + if bm != nil { + bm.PutAll() } }() @@ -496,11 +494,11 @@ func (h *Handler) doQuery( } else if schema == nil { r, err = resultForEmptyIter(sqlCtx, rowIter, resultFields) } else if analyzer.FlagIsSet(qFlags, sql.QFlagMax1Row) { - r, bufs, err = resultForMax1RowIter(sqlCtx, schema, rowIter, resultFields, bufs) + r, bm, err = resultForMax1RowIter(sqlCtx, schema, rowIter, resultFields, bm) } else if vr, ok := rowIter.(sql.ValueRowIter); ok && vr.IsValueRowIter(sqlCtx) { - r, bufs, processedAtLeastOneBatch, err = h.resultForValueRowIter(sqlCtx, c, schema, vr, resultFields, callback, more) + r, bm, processedAtLeastOneBatch, err = h.resultForValueRowIter(sqlCtx, c, schema, vr, resultFields, callback, more) } else { - r, bufs, processedAtLeastOneBatch, err = h.resultForDefaultIter(sqlCtx, c, schema, rowIter, callback, resultFields, more) + r, bm, processedAtLeastOneBatch, err = h.resultForDefaultIter(sqlCtx, c, schema, rowIter, callback, resultFields, more) } if err != nil { return remainder, err @@ -594,14 +592,19 @@ func GetDeferredProjections(iter sql.RowIter) (sql.RowIter, []sql.Expression) { } // resultForMax1RowIter ensures that an empty iterator returns at most one row -func resultForMax1RowIter(ctx *sql.Context, schema sql.Schema, iter sql.RowIter, resultFields []*querypb.Field, bufs []*sql.ByteBuffer) (*sqltypes.Result, []*sql.ByteBuffer, error) { +func resultForMax1RowIter( + ctx *sql.Context, + schema sql.Schema, + iter sql.RowIter, + resultFields []*querypb.Field, + bm *sql.ByteBufferManager, +) (*sqltypes.Result, *sql.ByteBufferManager, error) { defer trace.StartRegion(ctx, "Handler.resultForMax1RowIter").End() - defer iter.Close(ctx) row, err := iter.Next(ctx) if err == io.EOF { - return &sqltypes.Result{Fields: resultFields}, bufs, nil + return &sqltypes.Result{Fields: resultFields}, bm, nil } if err != nil { return nil, nil, err @@ -611,19 +614,29 @@ func resultForMax1RowIter(ctx *sql.Context, schema sql.Schema, iter sql.RowIter, return nil, nil, fmt.Errorf("result max1Row iterator returned more than one row") } - outputRow, bufs, err := RowToSQL(ctx, schema, row, nil, bufs) + bm = sql.NewByteBufferManager() + outputRow, err := RowToSQL(ctx, schema, row, nil, bm) if err != nil { - return nil, bufs, err + // Important to return ByteBufferManager even in error, as we still need to release any allocated memory. + return nil, bm, err } ctx.GetLogger().Tracef("spooling result row %s", outputRow) - return &sqltypes.Result{Fields: resultFields, Rows: [][]sqltypes.Value{outputRow}, RowsAffected: 1}, bufs, nil + return &sqltypes.Result{Fields: resultFields, Rows: [][]sqltypes.Value{outputRow}, RowsAffected: 1}, bm, nil } // resultForDefaultIter reads batches of rows from the iterator // and writes results into the callback function. -func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.RowIter, callback func(*sqltypes.Result, bool) error, resultFields []*querypb.Field, more bool) (*sqltypes.Result, []*sql.ByteBuffer, bool, error) { +func (h *Handler) resultForDefaultIter( + ctx *sql.Context, + c *mysql.Conn, + schema sql.Schema, + iter sql.RowIter, + callback func(*sqltypes.Result, bool) error, + resultFields []*querypb.Field, + more bool, +) (*sqltypes.Result, *sql.ByteBufferManager, bool, error) { defer trace.StartRegion(ctx, "Handler.resultForDefaultIter").End() eg, ctx := ctx.NewErrgroup() @@ -688,13 +701,13 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s }) // Drain rows from rowChan, convert to wire format, and send to resChan - type bufferedResult struct { - res *sqltypes.Result - bufs []*sql.ByteBuffer + type managedResult struct { + res *sqltypes.Result + bm *sql.ByteBufferManager } - var resChan = make(chan bufferedResult, 4) + var resChan = make(chan managedResult, 4) var res *sqltypes.Result - var bufs []*sql.ByteBuffer + var bm *sql.ByteBufferManager eg.Go(func() (err error) { defer pan2err(&err) defer wg.Done() @@ -706,9 +719,7 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s Fields: resultFields, Rows: make([][]sqltypes.Value, 0, rowsBatch), } - buf := sql.ByteBufPool.Get().(*sql.ByteBuffer) - buf.Reset() - bufs = append(bufs, buf) + bm = sql.NewByteBufferManager() } select { @@ -736,7 +747,7 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s } var outRow []sqltypes.Value - outRow, bufs, err = RowToSQL(ctx, schema, row, projs, bufs) + outRow, err = RowToSQL(ctx, schema, row, projs, bm) if err != nil { return err } @@ -749,9 +760,9 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s select { case <-ctx.Done(): return context.Cause(ctx) - case resChan <- bufferedResult{res: res, bufs: bufs}: + case resChan <- managedResult{res: res, bm: bm}: res = nil - bufs = nil + bm = nil } } } @@ -783,9 +794,7 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s // so if a cursor exists, we can't release the buffer memory yet. // TODO: In the case of a cursor, we are leaking memory if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 { - for _, buf := range bufRes.bufs { - sql.ByteBufPool.Put(buf) - } + bufRes.bm.PutAll() // TODO: recycle buffer manager? } } } @@ -807,10 +816,17 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s } return nil, nil, false, err } - return res, bufs, processedAtLeastOneBatch, nil + return res, bm, processedAtLeastOneBatch, nil } -func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.ValueRowIter, resultFields []*querypb.Field, callback func(*sqltypes.Result, bool) error, more bool) (*sqltypes.Result, []*sql.ByteBuffer, bool, error) { +func (h *Handler) resultForValueRowIter( + ctx *sql.Context, + c *mysql.Conn, + schema sql.Schema, + iter sql.ValueRowIter, + resultFields []*querypb.Field, + callback func(*sqltypes.Result, bool) error, more bool, +) (*sqltypes.Result, *sql.ByteBufferManager, bool, error) { defer trace.StartRegion(ctx, "Handler.resultForValueRowIter").End() eg, ctx := ctx.NewErrgroup() @@ -873,12 +889,12 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema // Drain rows from rowChan, convert to wire format, and send to resChan type bufferedResult struct { - res *sqltypes.Result - bufs []*sql.ByteBuffer + res *sqltypes.Result + bm *sql.ByteBufferManager } var resChan = make(chan bufferedResult, 4) var res *sqltypes.Result - var bufs []*sql.ByteBuffer + var bm *sql.ByteBufferManager eg.Go(func() (err error) { defer pan2err(&err) defer close(resChan) @@ -890,9 +906,7 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema Fields: resultFields, Rows: make([][]sqltypes.Value, rowsBatch), } - buf := sql.ByteBufPool.Get().(*sql.ByteBuffer) - buf.Reset() - bufs = append(bufs, buf) + bm = sql.NewByteBufferManager() } select { @@ -912,7 +926,7 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema } var outRow []sqltypes.Value - outRow, bufs, err = RowValueToSQLValues(ctx, schema, row, bufs) + outRow, err = RowValueToSQLValues(ctx, schema, row, bm) if err != nil { return err } @@ -925,9 +939,9 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema select { case <-ctx.Done(): return context.Cause(ctx) - case resChan <- bufferedResult{res: res, bufs: bufs}: + case resChan <- bufferedResult{res: res, bm: bm}: res = nil - bufs = nil + bm = nil } } } @@ -959,9 +973,7 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema // so if a cursor exists, we can't release the buffer memory yet. // TODO: In the case of a cursor, we are leaking memory if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 { - for _, buf := range bufRes.bufs { - sql.ByteBufPool.Put(buf) - } + bm.PutAll() } } } @@ -987,7 +999,7 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema if res != nil { res.Rows = res.Rows[:res.RowsAffected] } - return res, bufs, processedAtLeastOneBatch, err + return res, bm, processedAtLeastOneBatch, err } // See https://dev.mysql.com/doc/internals/en/status-flags.html @@ -1211,7 +1223,10 @@ func updateMaxUsedConnectionsStatusVariable() { }() } +// getMaxTypeCapacity determines the maximum required capacity for sql.Type `typ`. func getMaxTypeCapacity(ctx *sql.Context, typ sql.Type) (res int) { + // Only numeric types are written to byte buffer through strconv.Append... + // String types already have []byte or string allocated, so they should not use a backing array. switch typ.Type() { case sqltypes.Int8: // Longest possible int8 is len("-128") = 4 @@ -1261,42 +1276,32 @@ func getMaxTypeCapacity(ctx *sql.Context, typ sql.Type) (res int) { case sqltypes.Bit: res = int(typ.MaxTextResponseByteLength(ctx)) default: - // TODO: StringType can use backing array depending on the type of - // These types do not use sql.ByteBuffer + // TODO: StringType can use backing array depending on the built-in type of the value. + // These types do not use sql.byteBuffer res = 0 } return } -func toSqlHelper(ctx *sql.Context, typ sql.Type, buf *sql.ByteBuffer, val interface{}) (sqltypes.Value, *sql.ByteBuffer, error) { - // Determine the maximum required capacity - // Only numeric types are written to byte buffer through strconv.Append... - // String types already have []byte or string allocated, so they should not use a backing array. +func toSQL(ctx *sql.Context, typ sql.Type, bm *sql.ByteBufferManager, val any) (sqltypes.Value, error) { maxCap := getMaxTypeCapacity(ctx, typ) if maxCap == 0 { - ret, err := typ.SQL(ctx, nil, val) - return ret, buf, err + return typ.SQL(ctx, nil, val) } - - if !buf.HasCapacity(maxCap) { - buf = sql.ByteBufPool.Get().(*sql.ByteBuffer) - buf.Reset() + ret, err := typ.SQL(ctx, bm.Get(maxCap), val) + if err != nil { + return sqltypes.Value{}, err } - - ret, err := typ.SQL(ctx, buf.Get(), val) - buf.Grow(ret.Len()) - return ret, buf, err + bm.Grow(ret.Len()) + return ret, nil } -func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Expression, bufs []*sql.ByteBuffer) ([]sqltypes.Value, []*sql.ByteBuffer, error) { - // need to make sure the schema is not null as some plan schema is defined as null (e.g. IfElseBlock) +func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Expression, bm *sql.ByteBufferManager) ([]sqltypes.Value, error) { + // need to make sure the schema is not null as some plan schema is defined as null (e.g. IfElseBlock) // TODO: do we really? if len(sch) == 0 { - return []sqltypes.Value{}, nil, nil + return []sqltypes.Value{}, nil } - // TODO: is it possible for this to utilize more than 1 buffer? - buf := bufs[len(bufs)-1] - outVals := make([]sqltypes.Value, len(sch)) var err error if len(projs) == 0 { @@ -1305,55 +1310,42 @@ func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Express outVals[i] = sqltypes.NULL continue } - - var newBuf *sql.ByteBuffer - outVals[i], newBuf, err = toSqlHelper(ctx, col.Type, buf, row[i]) + outVals[i], err = toSQL(ctx, col.Type, bm, row[i]) if err != nil { - return nil, nil, err - } - - // allocated new backing array - if newBuf != nil && newBuf != buf { - bufs = append(bufs, newBuf) + return nil, err } } - return outVals, bufs, nil + return outVals, nil } for i, col := range sch { var field any field, err = projs[i].Eval(ctx, row) if err != nil { - return nil, nil, err + return nil, err } if field == nil { outVals[i] = sqltypes.NULL continue } - - var newBuf *sql.ByteBuffer - outVals[i], newBuf, err = toSqlHelper(ctx, col.Type, buf, field) + outVals[i], err = toSQL(ctx, col.Type, bm, row[i]) if err != nil { - return nil, nil, err - } - - // allocated new backing array - if newBuf != nil && newBuf != buf { - bufs = append(bufs, newBuf) + return nil, err } } - return outVals, bufs, nil + return outVals, nil } -func RowValueToSQLValues(ctx *sql.Context, sch sql.Schema, row sql.ValueRow, bufs []*sql.ByteBuffer) ([]sqltypes.Value, []*sql.ByteBuffer, error) { +func RowValueToSQLValues(ctx *sql.Context, sch sql.Schema, row sql.ValueRow, bm *sql.ByteBufferManager) ([]sqltypes.Value, error) { // TODO: we check for empty schema in doQuery, shouldn't need to check here if len(sch) == 0 { - return []sqltypes.Value{}, nil, nil + return []sqltypes.Value{}, nil } + var err error outVals := make([]sqltypes.Value, len(sch)) for i, col := range sch { - // TODO: remove this check once all Types implement this + // TODO: remove this check once all Types implement sql.ValueType valType, ok := col.Type.(sql.ValueType) if !ok { if row[i].IsNull() { @@ -1369,26 +1361,18 @@ func RowValueToSQLValues(ctx *sql.Context, sch sql.Schema, row sql.ValueRow, buf if maxCap == 0 { outVals[i], err = valType.SQLValue(ctx, row[i], nil) if err != nil { - return nil, nil, err + return nil, err } - return outVals, bufs, nil - } - - buf := bufs[len(bufs)-1] - if !buf.HasCapacity(maxCap) { - buf = sql.ByteBufPool.Get().(*sql.ByteBuffer) - buf.Reset() - bufs = append(bufs, buf) + continue } - outVals[i], err = valType.SQLValue(ctx, row[i], buf.Get()) + outVals[i], err = valType.SQLValue(ctx, row[i], bm.Get(maxCap)) if err != nil { - // TODO: might be important to return bufs even during err to prevent memory leaks - return nil, nil, err + return nil, err } - buf.Grow(outVals[i].Len()) + bm.Grow(outVals[i].Len()) } - return outVals, bufs, nil + return outVals, nil } func schemaToFields(ctx *sql.Context, s sql.Schema) []*querypb.Field { diff --git a/sql/byte_buffer.go b/sql/byte_buffer.go index 148992891b..3a091474f3 100644 --- a/sql/byte_buffer.go +++ b/sql/byte_buffer.go @@ -19,41 +19,76 @@ import ( ) // TODO: find optimal size -const buffCap = 4096 // 4KB +const bufCap = 4096 // 4KB -var ByteBufPool = sync.Pool{ - New: func() any { - return NewByteBuffer() - }, -} - -type ByteBuffer struct { +// byteBuffer serves as a statically sized backing array used to the wire methods (types.SQL() and types.SQLValue()) +type byteBuffer struct { pos uint16 - buf [buffCap]byte + buf [bufCap]byte } -func NewByteBuffer() *ByteBuffer { - return &ByteBuffer{} +var bufferPool = sync.Pool{ + New: func() any { + return &byteBuffer{} + }, } -// HasCapacity indicates if this buffer has `n` bytes worth of capacity left -func (b *ByteBuffer) HasCapacity(n int) bool { - return int(b.pos)+n < buffCap +// hasCapacity indicates if this buffer has `cap` bytes worth of capacity left +func (b *byteBuffer) hasCapacity(cap int) bool { + return int(b.pos)+cap < bufCap } -// Grow records the latest used byte position. Callers -// are responsible for accurately reporting which bytes -// they expect to be protected. -func (b *ByteBuffer) Grow(n int) { +// Grow records the latest used byte position. +// Callers are responsible for accurately reporting which bytes they expect to be protected. +func (b *byteBuffer) grow(n int) { b.pos += uint16(n) } -// Get returns a zero-length slice beginning at a safe -// write position. -func (b *ByteBuffer) Get() []byte { +// Get returns a zero-length slice beginning at a safe writing position. +func (b *byteBuffer) get() []byte { return b.buf[b.pos:b.pos] } -func (b *ByteBuffer) Reset() { +func (b *byteBuffer) reset() { b.pos = 0 } + +// ByteBufferManager is responsible for handling all byteBuffers retrieved from byteBufferPool. +type ByteBufferManager struct { + bufs []*byteBuffer + cur *byteBuffer +} + +// NewByteBufferManager returns a ByteBufferManager with one byteBuffer already allocated. +func NewByteBufferManager() *ByteBufferManager { + cur := bufferPool.Get().(*byteBuffer) + cur.reset() + return &ByteBufferManager{ + cur: cur, + bufs: make([]*byteBuffer, 0), + } +} + +// Get returns a zero-length slice guaranteed to have capacity for `cap` bytes. +// This function will retrieve any necessary byteBuffers from bufferPool. +func (b *ByteBufferManager) Get(cap int) []byte { + if !b.cur.hasCapacity(cap) { + b.bufs = append(b.bufs, b.cur) + b.cur = bufferPool.Get().(*byteBuffer) + b.cur.reset() + } + return b.cur.get() +} + +// Grow shifts the safe writing position of the current byteBuffer. +func (b *ByteBufferManager) Grow(n int) { + b.cur.grow(n) +} + +// PutAll releases all allocated byteBuffers back into bufferPool. +func (b *ByteBufferManager) PutAll() { + for _, buf := range b.bufs { + bufferPool.Put(buf) + } + bufferPool.Put(b.cur) +} From 176ac6918540b81ff80dd41b7f26485ff0db4f4a Mon Sep 17 00:00:00 2001 From: James Cor Date: Thu, 20 Nov 2025 17:18:59 -0800 Subject: [PATCH 17/23] use slice instead --- sql/byte_buffer.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/byte_buffer.go b/sql/byte_buffer.go index 3a091474f3..0d66c50252 100644 --- a/sql/byte_buffer.go +++ b/sql/byte_buffer.go @@ -24,12 +24,14 @@ const bufCap = 4096 // 4KB // byteBuffer serves as a statically sized backing array used to the wire methods (types.SQL() and types.SQLValue()) type byteBuffer struct { pos uint16 - buf [bufCap]byte + buf []byte } var bufferPool = sync.Pool{ New: func() any { - return &byteBuffer{} + return &byteBuffer{ + buf: make([]byte, bufCap), + } }, } From 9e60588d518d35d6a0d1cd7b3eab226e2d97eab4 Mon Sep 17 00:00:00 2001 From: James Cor Date: Fri, 21 Nov 2025 00:01:46 -0800 Subject: [PATCH 18/23] fixes --- server/handler.go | 5 ++++- sql/types/sql_test.go | 20 ++++++++++---------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/server/handler.go b/server/handler.go index 34b712165f..216bd8670d 100644 --- a/server/handler.go +++ b/server/handler.go @@ -1264,6 +1264,9 @@ func getMaxTypeCapacity(ctx *sql.Context, typ sql.Type) (res int) { case sqltypes.Float64: // Longest possible 'g' format float64 is len("-1.7976931348623157e+308") = 24 res = 24 + case sqltypes.Year: + // Longest possible Year is len("2155") = 4 + res = 4 case sqltypes.Time: // Longest possible Time format is len("-00:00:00.000000") = 16 res = 16 @@ -1328,7 +1331,7 @@ func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Express outVals[i] = sqltypes.NULL continue } - outVals[i], err = toSQL(ctx, col.Type, bm, row[i]) + outVals[i], err = toSQL(ctx, col.Type, bm, field) if err != nil { return nil, err } diff --git a/sql/types/sql_test.go b/sql/types/sql_test.go index 3e1b3701ef..817bf7985a 100644 --- a/sql/types/sql_test.go +++ b/sql/types/sql_test.go @@ -22,16 +22,16 @@ func BenchmarkNumI64SQL(b *testing.B) { } func BenchmarkVarchar10SQL(b *testing.B) { - var res sqltypes.Value - t := MustCreateStringWithDefaults(sqltypes.VarChar, 10) - buf := sql.NewByteBuffer() - ctx := sql.NewEmptyContext() - for i := 0; i < b.N; i++ { - res, _ = t.SQL(ctx, buf.Get(), "char") - buf.Grow(res.Len()) - buf.Reset() - } - result_ = res + //var res sqltypes.Value + //t := MustCreateStringWithDefaults(sqltypes.VarChar, 10) + //buf := sql.NewByteBuffer() + //ctx := sql.NewEmptyContext() + //for i := 0; i < b.N; i++ { + // res, _ = t.SQL(ctx, buf.Get(), "char") + // buf.Grow(res.Len()) + // buf.Reset() + //} + //result_ = res } func BenchmarkTimespanSQL(b *testing.B) { From f3068cfc76d586108166d45752a69d6955da1b4a Mon Sep 17 00:00:00 2001 From: James Cor Date: Fri, 21 Nov 2025 00:35:38 -0800 Subject: [PATCH 19/23] fixes again --- enginetest/enginetests.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/enginetest/enginetests.go b/enginetest/enginetests.go index 4779a866e9..c18e6da318 100644 --- a/enginetest/enginetests.go +++ b/enginetest/enginetests.go @@ -5851,7 +5851,7 @@ func TestTypesOverWire(t *testing.T, harness ClientHarness, sessionBuilder serve break } expectedEngineRow := make([]*string, len(engineRow)) - row, _, err := server.RowToSQL(ctx, sch, engineRow, nil, nil) + row, err := server.RowToSQL(ctx, sch, engineRow, nil, nil) if !assert.NoError(t, err) { break } From 8d2ac20876ed13d4967735b4f0e373c3e40aaeb1 Mon Sep 17 00:00:00 2001 From: James Cor Date: Fri, 21 Nov 2025 00:55:16 -0800 Subject: [PATCH 20/23] opt --- enginetest/enginetests.go | 2 +- server/handler.go | 41 +++++++++++++++++++++------------------ 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/enginetest/enginetests.go b/enginetest/enginetests.go index c18e6da318..2e5f539b0a 100644 --- a/enginetest/enginetests.go +++ b/enginetest/enginetests.go @@ -5851,7 +5851,7 @@ func TestTypesOverWire(t *testing.T, harness ClientHarness, sessionBuilder serve break } expectedEngineRow := make([]*string, len(engineRow)) - row, err := server.RowToSQL(ctx, sch, engineRow, nil, nil) + row, err := server.RowToSQL(ctx, sch, engineRow, nil, nil, nil) if !assert.NoError(t, err) { break } diff --git a/server/handler.go b/server/handler.go index 216bd8670d..4ca08721af 100644 --- a/server/handler.go +++ b/server/handler.go @@ -491,7 +491,7 @@ func (h *Handler) doQuery( // zero/single return schema use spooling shortcut if types.IsOkResultSchema(schema) { r, err = resultForOkIter(sqlCtx, rowIter) - } else if schema == nil { + } else if len(schema) == 0 { r, err = resultForEmptyIter(sqlCtx, rowIter, resultFields) } else if analyzer.FlagIsSet(qFlags, sql.QFlagMax1Row) { r, bm, err = resultForMax1RowIter(sqlCtx, schema, rowIter, resultFields, bm) @@ -615,7 +615,11 @@ func resultForMax1RowIter( } bm = sql.NewByteBufferManager() - outputRow, err := RowToSQL(ctx, schema, row, nil, bm) + maxCaps := make([]int, len(schema)) + for i, col := range schema { + maxCaps[i] = getMaxTypeCapacity(ctx, col.Type) + } + outputRow, err := RowToSQL(ctx, schema, row, nil, maxCaps, bm) if err != nil { // Important to return ByteBufferManager even in error, as we still need to release any allocated memory. return nil, bm, err @@ -708,6 +712,12 @@ func (h *Handler) resultForDefaultIter( var resChan = make(chan managedResult, 4) var res *sqltypes.Result var bm *sql.ByteBufferManager + + // TODO: find good place to put this + maxCaps := make([]int, len(schema)) + for i, col := range schema { + maxCaps[i] = getMaxTypeCapacity(ctx, col.Type) + } eg.Go(func() (err error) { defer pan2err(&err) defer wg.Done() @@ -747,7 +757,7 @@ func (h *Handler) resultForDefaultIter( } var outRow []sqltypes.Value - outRow, err = RowToSQL(ctx, schema, row, projs, bm) + outRow, err = RowToSQL(ctx, schema, row, projs, maxCaps, bm) if err != nil { return err } @@ -895,6 +905,7 @@ func (h *Handler) resultForValueRowIter( var resChan = make(chan bufferedResult, 4) var res *sqltypes.Result var bm *sql.ByteBufferManager + maxCaps := make([]int, len(schema)) eg.Go(func() (err error) { defer pan2err(&err) defer close(resChan) @@ -926,7 +937,7 @@ func (h *Handler) resultForValueRowIter( } var outRow []sqltypes.Value - outRow, err = RowValueToSQLValues(ctx, schema, row, bm) + outRow, err = RowValueToSQLValues(ctx, schema, row, maxCaps, bm) if err != nil { return err } @@ -1286,8 +1297,7 @@ func getMaxTypeCapacity(ctx *sql.Context, typ sql.Type) (res int) { return } -func toSQL(ctx *sql.Context, typ sql.Type, bm *sql.ByteBufferManager, val any) (sqltypes.Value, error) { - maxCap := getMaxTypeCapacity(ctx, typ) +func toSQL(ctx *sql.Context, typ sql.Type, maxCap int, bm *sql.ByteBufferManager, val any) (sqltypes.Value, error) { if maxCap == 0 { return typ.SQL(ctx, nil, val) } @@ -1299,7 +1309,7 @@ func toSQL(ctx *sql.Context, typ sql.Type, bm *sql.ByteBufferManager, val any) ( return ret, nil } -func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Expression, bm *sql.ByteBufferManager) ([]sqltypes.Value, error) { +func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Expression, maxCaps []int, bm *sql.ByteBufferManager) ([]sqltypes.Value, error) { // need to make sure the schema is not null as some plan schema is defined as null (e.g. IfElseBlock) // TODO: do we really? if len(sch) == 0 { return []sqltypes.Value{}, nil @@ -1313,7 +1323,7 @@ func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Express outVals[i] = sqltypes.NULL continue } - outVals[i], err = toSQL(ctx, col.Type, bm, row[i]) + outVals[i], err = toSQL(ctx, col.Type, maxCaps[i], bm, row[i]) if err != nil { return nil, err } @@ -1331,7 +1341,7 @@ func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Express outVals[i] = sqltypes.NULL continue } - outVals[i], err = toSQL(ctx, col.Type, bm, field) + outVals[i], err = toSQL(ctx, col.Type, maxCaps[i], bm, field) if err != nil { return nil, err } @@ -1339,12 +1349,7 @@ func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Express return outVals, nil } -func RowValueToSQLValues(ctx *sql.Context, sch sql.Schema, row sql.ValueRow, bm *sql.ByteBufferManager) ([]sqltypes.Value, error) { - // TODO: we check for empty schema in doQuery, shouldn't need to check here - if len(sch) == 0 { - return []sqltypes.Value{}, nil - } - +func RowValueToSQLValues(ctx *sql.Context, sch sql.Schema, row sql.ValueRow, maxCaps []int, bm *sql.ByteBufferManager) ([]sqltypes.Value, error) { var err error outVals := make([]sqltypes.Value, len(sch)) for i, col := range sch { @@ -1359,9 +1364,7 @@ func RowValueToSQLValues(ctx *sql.Context, sch sql.Schema, row sql.ValueRow, bm continue } - // TODO: schema remains constant throughout this query, so no need to recalc this every time - maxCap := getMaxTypeCapacity(ctx, valType) - if maxCap == 0 { + if maxCaps[i] == 0 { outVals[i], err = valType.SQLValue(ctx, row[i], nil) if err != nil { return nil, err @@ -1369,7 +1372,7 @@ func RowValueToSQLValues(ctx *sql.Context, sch sql.Schema, row sql.ValueRow, bm continue } - outVals[i], err = valType.SQLValue(ctx, row[i], bm.Get(maxCap)) + outVals[i], err = valType.SQLValue(ctx, row[i], bm.Get(maxCaps[i])) if err != nil { return nil, err } From 2b22455b3e5aa78595171c8fd20f832ce6c6b36e Mon Sep 17 00:00:00 2001 From: James Cor Date: Fri, 21 Nov 2025 01:00:14 -0800 Subject: [PATCH 21/23] aaaa --- server/handler.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/handler.go b/server/handler.go index 4ca08721af..7bc0963020 100644 --- a/server/handler.go +++ b/server/handler.go @@ -906,6 +906,9 @@ func (h *Handler) resultForValueRowIter( var res *sqltypes.Result var bm *sql.ByteBufferManager maxCaps := make([]int, len(schema)) + for i, col := range schema { + maxCaps[i] = getMaxTypeCapacity(ctx, col.Type) + } eg.Go(func() (err error) { defer pan2err(&err) defer close(resChan) From 0df519a35d0d9a7e4dc6d90d866dfff52b18d01e Mon Sep 17 00:00:00 2001 From: James Cor Date: Fri, 21 Nov 2025 12:19:12 -0800 Subject: [PATCH 22/23] test --- sql/byte_buffer.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/byte_buffer.go b/sql/byte_buffer.go index 0d66c50252..f7f10d03f7 100644 --- a/sql/byte_buffer.go +++ b/sql/byte_buffer.go @@ -30,7 +30,7 @@ type byteBuffer struct { var bufferPool = sync.Pool{ New: func() any { return &byteBuffer{ - buf: make([]byte, bufCap), + buf: make([]byte, 4096*128*64), } }, } @@ -66,8 +66,7 @@ func NewByteBufferManager() *ByteBufferManager { cur := bufferPool.Get().(*byteBuffer) cur.reset() return &ByteBufferManager{ - cur: cur, - bufs: make([]*byteBuffer, 0), + cur: cur, } } From 56a9431eb976f37d15019b5d761e0c79f0daf149 Mon Sep 17 00:00:00 2001 From: James Cor Date: Fri, 21 Nov 2025 12:28:02 -0800 Subject: [PATCH 23/23] ok don't do that --- sql/byte_buffer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/byte_buffer.go b/sql/byte_buffer.go index f7f10d03f7..11b87b954c 100644 --- a/sql/byte_buffer.go +++ b/sql/byte_buffer.go @@ -30,7 +30,7 @@ type byteBuffer struct { var bufferPool = sync.Pool{ New: func() any { return &byteBuffer{ - buf: make([]byte, 4096*128*64), + buf: make([]byte, bufCap), } }, }