From bafac99786d528d3135852a6f902518a90753bab Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Wed, 18 Dec 2024 13:51:22 -0800 Subject: [PATCH 01/22] rough cut buf --- enginetest/enginetests.go | 2 +- server/handler.go | 30 ++++++++++++++---------------- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/enginetest/enginetests.go b/enginetest/enginetests.go index 1857872043..feee1256a4 100644 --- a/enginetest/enginetests.go +++ b/enginetest/enginetests.go @@ -5760,7 +5760,7 @@ func TestTypesOverWire(t *testing.T, harness ClientHarness, sessionBuilder serve break } expectedEngineRow := make([]*string, len(engineRow)) - row, err := server.RowToSQL(ctx, sch, engineRow, nil) + row, err := server.RowToSQL(ctx, sch, engineRow, nil, nil) if !assert.NoError(t, err) { break } diff --git a/server/handler.go b/server/handler.go index 6dc7625f4f..d3a73ae3e7 100644 --- a/server/handler.go +++ b/server/handler.go @@ -442,15 +442,20 @@ func (h *Handler) doQuery( var r *sqltypes.Result var processedAtLeastOneBatch bool + buf := sql.SingletonBuf + defer func() { + sql.SingletonBuf.Reset() + }() + // zero/single return schema use spooling shortcut if types.IsOkResultSchema(schema) { r, err = resultForOkIter(sqlCtx, rowIter) } else if schema == nil { r, err = resultForEmptyIter(sqlCtx, rowIter, resultFields) } else if analyzer.FlagIsSet(qFlags, sql.QFlagMax1Row) { - r, err = resultForMax1RowIter(sqlCtx, schema, rowIter, resultFields) + r, err = resultForMax1RowIter(sqlCtx, schema, rowIter, resultFields, buf) } else { - r, processedAtLeastOneBatch, err = h.resultForDefaultIter(sqlCtx, c, schema, rowIter, callback, resultFields, more) + r, processedAtLeastOneBatch, err = h.resultForDefaultIter(sqlCtx, c, schema, rowIter, callback, resultFields, more, buf) } if err != nil { return remainder, err @@ -542,7 +547,7 @@ func GetDeferredProjections(iter sql.RowIter) (sql.RowIter, []sql.Expression) { } // resultForMax1RowIter ensures that an empty iterator returns at most one row -func resultForMax1RowIter(ctx *sql.Context, schema sql.Schema, iter sql.RowIter, resultFields []*querypb.Field) (*sqltypes.Result, error) { +func resultForMax1RowIter(ctx *sql.Context, schema sql.Schema, iter sql.RowIter, resultFields []*querypb.Field, buf *sql.ByteBuffer) (*sqltypes.Result, error) { defer trace.StartRegion(ctx, "Handler.resultForMax1RowIter").End() row, err := iter.Next(ctx) if err == io.EOF { @@ -557,7 +562,7 @@ func resultForMax1RowIter(ctx *sql.Context, schema sql.Schema, iter sql.RowIter, if err := iter.Close(ctx); err != nil { return nil, err } - outputRow, err := RowToSQL(ctx, schema, row, nil) + outputRow, err := RowToSQL(ctx, schema, row, nil, buf) if err != nil { return nil, err } @@ -569,14 +574,7 @@ func resultForMax1RowIter(ctx *sql.Context, schema sql.Schema, iter sql.RowIter, // resultForDefaultIter reads batches of rows from the iterator // and writes results into the callback function. -func (h *Handler) resultForDefaultIter( - ctx *sql.Context, - c *mysql.Conn, - schema sql.Schema, - iter sql.RowIter, - callback func(*sqltypes.Result, bool) error, - resultFields []*querypb.Field, - more bool) (r *sqltypes.Result, processedAtLeastOneBatch bool, returnErr error) { +func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.RowIter, callback func(*sqltypes.Result, bool) error, resultFields []*querypb.Field, more bool, buf *sql.ByteBuffer) (r *sqltypes.Result, processedAtLeastOneBatch bool, returnErr error) { defer trace.StartRegion(ctx, "Handler.resultForDefaultIter").End() eg, ctx := ctx.NewErrgroup() @@ -669,7 +667,7 @@ func (h *Handler) resultForDefaultIter( continue } - outputRow, err := RowToSQL(ctx, schema, row, projs) + outputRow, err := RowToSQL(ctx, schema, row, projs, buf) if err != nil { return err } @@ -932,7 +930,7 @@ func updateMaxUsedConnectionsStatusVariable() { }() } -func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Expression) ([]sqltypes.Value, error) { +func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Expression, buf *sql.ByteBuffer) ([]sqltypes.Value, error) { // need to make sure the schema is not null as some plan schema is defined as null (e.g. IfElseBlock) if len(sch) == 0 { return []sqltypes.Value{}, nil @@ -946,7 +944,7 @@ func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Express continue } var err error - outVals[i], err = col.Type.SQL(ctx, nil, row[i]) + outVals[i], err = col.Type.SQL(ctx, buf.Get(100), row[i]) if err != nil { return nil, err } @@ -963,7 +961,7 @@ func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Express outVals[i] = sqltypes.NULL continue } - outVals[i], err = col.Type.SQL(ctx, nil, field) + outVals[i], err = col.Type.SQL(ctx, buf.Get(100), field) if err != nil { return nil, err } From 6f2abf1df636e225d0fac2676917e8685dc7d5ca Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Wed, 18 Dec 2024 15:16:30 -0800 Subject: [PATCH 02/22] edits --- server/handler.go | 17 ++++++++++++-- sql/byte_buffer.go | 57 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 2 deletions(-) create mode 100644 sql/byte_buffer.go diff --git a/server/handler.go b/server/handler.go index d3a73ae3e7..8a338fd447 100644 --- a/server/handler.go +++ b/server/handler.go @@ -944,7 +944,14 @@ func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Express continue } var err error - outVals[i], err = col.Type.SQL(ctx, buf.Get(100), row[i]) + spare := buf.Spare() + outVals[i], err = col.Type.SQL(ctx, buf.Get(), row[i]) + if outVals[i].Len() > spare { + buf.Double() + } else { + buf.Advance(outVals[i].Len()) + } + if err != nil { return nil, err } @@ -961,7 +968,13 @@ func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Express outVals[i] = sqltypes.NULL continue } - outVals[i], err = col.Type.SQL(ctx, buf.Get(100), field) + spare := buf.Spare() + outVals[i], err = col.Type.SQL(ctx, buf.Get(), row[i]) + if outVals[i].Len() > spare { + buf.Double() + } else { + buf.Advance(outVals[i].Len()) + } if err != nil { return nil, err } diff --git a/sql/byte_buffer.go b/sql/byte_buffer.go new file mode 100644 index 0000000000..a44b5b6832 --- /dev/null +++ b/sql/byte_buffer.go @@ -0,0 +1,57 @@ +package sql + +var SingletonBuf = NewByteBuffer(1000) + +type ByteBuffer struct { + buf []byte + i int +} + +func NewByteBuffer(initCap int) *ByteBuffer { + return &ByteBuffer{buf: make([]byte, initCap)} +} + +func (b *ByteBuffer) Bytes() []byte { + return b.buf +} + +func (b *ByteBuffer) GetFull(i int) []byte { + start := b.i + b.i = start + i + if b.i > len(b.buf) { + newBuf := make([]byte, len(b.buf)*2) + copy(newBuf, b.buf[:]) + b.buf = newBuf + } + return b.buf[start:b.i] +} + +func (b *ByteBuffer) Double() { + newBuf := make([]byte, len(b.buf)*2) + copy(newBuf, b.buf[:]) + b.buf = newBuf +} + +func (b *ByteBuffer) Advance(i int) { + b.i += i +} + +func (b *ByteBuffer) Spare() int { + return len(b.buf) - b.i +} + +func (b *ByteBuffer) Get() []byte { + //start := b.i + //b.i = start + i + //if b.i > len(b.buf) { + // newBuf := make([]byte, len(b.buf)*2) + // copy(newBuf, b.buf[:]) + // b.buf = newBuf + //} + //return b.buf[start:b.i][:0] + return b.buf[b.i:b.i] +} + +func (b *ByteBuffer) Reset() { + b.i = 0 +} From ba35075f4e6d462a1e90e59e2f183f053e41360d Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Wed, 18 Dec 2024 19:52:15 -0800 Subject: [PATCH 03/22] edits --- sql/byte_buffer.go | 2 +- sql/types/strings.go | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/byte_buffer.go b/sql/byte_buffer.go index a44b5b6832..66d6046b4b 100644 --- a/sql/byte_buffer.go +++ b/sql/byte_buffer.go @@ -1,6 +1,6 @@ package sql -var SingletonBuf = NewByteBuffer(1000) +var SingletonBuf = NewByteBuffer(16000) type ByteBuffer struct { buf []byte diff --git a/sql/types/strings.go b/sql/types/strings.go index d51b7e47da..38be021754 100644 --- a/sql/types/strings.go +++ b/sql/types/strings.go @@ -540,7 +540,8 @@ func (t StringType) SQL(ctx *sql.Context, dest []byte, v interface{}) (sqltypes. case []byte: valueBytes = v case string: - valueBytes = []byte(v) + dest = append(dest, v...) + valueBytes = dest[start:] case int, int8, int16, int32, int64: num, _, err := convertToInt64(Int64.(NumberTypeImpl_), v) if err != nil { @@ -555,10 +556,11 @@ func (t StringType) SQL(ctx *sql.Context, dest []byte, v interface{}) (sqltypes. valueBytes = strconv.AppendUint(dest, num, 10) case bool: if v { - valueBytes = append(dest, '1') + dest = append(dest, '1') } else { - valueBytes = append(dest, '0') + dest = append(dest, '0') } + valueBytes = dest[start:] case float64: valueBytes = strconv.AppendFloat(dest, v, 'f', -1, 64) if valueBytes[start] == '-' { From f5fd9b4ee893ece03b427ef69f47ace2df7d200e Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Thu, 19 Dec 2024 08:57:57 -0800 Subject: [PATCH 04/22] correctness for byte copying --- server/handler.go | 2 +- sql/types/bit.go | 2 +- sql/types/datetime.go | 2 +- sql/types/enum.go | 2 +- sql/types/json.go | 2 +- sql/types/set.go | 2 +- sql/types/sql_test.go | 5 ++++- sql/types/strings.go | 6 +++--- sql/types/time.go | 2 +- 9 files changed, 14 insertions(+), 11 deletions(-) diff --git a/server/handler.go b/server/handler.go index 8a338fd447..2d9b3d7f04 100644 --- a/server/handler.go +++ b/server/handler.go @@ -969,7 +969,7 @@ func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Express continue } spare := buf.Spare() - outVals[i], err = col.Type.SQL(ctx, buf.Get(), row[i]) + outVals[i], err = col.Type.SQL(ctx, buf.Get(), field) if outVals[i].Len() > spare { buf.Double() } else { diff --git a/sql/types/bit.go b/sql/types/bit.go index b8be7a1c0b..fe71464aaa 100644 --- a/sql/types/bit.go +++ b/sql/types/bit.go @@ -214,7 +214,7 @@ func (t BitType_) SQL(ctx *sql.Context, dest []byte, v interface{}) (sqltypes.Va for i, j := 0, len(data)-1; i < j; i, j = i+1, j-1 { data[i], data[j] = data[j], data[i] } - val := AppendAndSliceBytes(dest, data) + val := data return sqltypes.MakeTrusted(sqltypes.Bit, val), nil } diff --git a/sql/types/datetime.go b/sql/types/datetime.go index a956b71382..68f8f7db41 100644 --- a/sql/types/datetime.go +++ b/sql/types/datetime.go @@ -410,7 +410,7 @@ func (t datetimeType) SQL(_ *sql.Context, dest []byte, v interface{}) (sqltypes. return sqltypes.Value{}, sql.ErrInvalidBaseType.New(t.baseType.String(), "datetime") } - valBytes := AppendAndSliceBytes(dest, val) + valBytes := val return sqltypes.MakeTrusted(typ, valBytes), nil } diff --git a/sql/types/enum.go b/sql/types/enum.go index 150412be2c..62fdc07585 100644 --- a/sql/types/enum.go +++ b/sql/types/enum.go @@ -257,7 +257,7 @@ func (t EnumType) SQL(ctx *sql.Context, dest []byte, v interface{}) (sqltypes.Va snippet = strings.ToValidUTF8(snippet, string(utf8.RuneError)) return sqltypes.Value{}, sql.ErrCharSetFailedToEncode.New(resultCharset.Name(), utf8.ValidString(value), snippet) } - val := AppendAndSliceBytes(dest, encodedBytes) + val := encodedBytes return sqltypes.MakeTrusted(sqltypes.Enum, val), nil } diff --git a/sql/types/json.go b/sql/types/json.go index 3587ed8926..adffb907d2 100644 --- a/sql/types/json.go +++ b/sql/types/json.go @@ -140,7 +140,7 @@ func (t JsonType) SQL(ctx *sql.Context, dest []byte, v interface{}) (sqltypes.Va if err != nil { return sqltypes.NULL, err } - val = AppendAndSliceBytes(dest, str) + val = str } else { // Convert to jsonType jsVal, _, err := t.Convert(v) diff --git a/sql/types/set.go b/sql/types/set.go index d67c3fe0c3..fa49d2ce29 100644 --- a/sql/types/set.go +++ b/sql/types/set.go @@ -246,7 +246,7 @@ func (t SetType) SQL(ctx *sql.Context, dest []byte, v interface{}) (sqltypes.Val snippet = strings.ToValidUTF8(snippet, string(utf8.RuneError)) return sqltypes.Value{}, sql.ErrCharSetFailedToEncode.New(resultCharset.Name(), utf8.ValidString(value), snippet) } - val := AppendAndSliceBytes(dest, encodedBytes) + val := encodedBytes return sqltypes.MakeTrusted(sqltypes.Set, val), nil } diff --git a/sql/types/sql_test.go b/sql/types/sql_test.go index e691426ae9..44eecc4d05 100644 --- a/sql/types/sql_test.go +++ b/sql/types/sql_test.go @@ -24,9 +24,12 @@ func BenchmarkNumI64SQL(b *testing.B) { func BenchmarkVarchar10SQL(b *testing.B) { var res sqltypes.Value t := MustCreateStringWithDefaults(sqltypes.VarChar, 10) + buf := sql.NewByteBuffer(1000) ctx := sql.NewEmptyContext() for i := 0; i < b.N; i++ { - res, _ = t.SQL(ctx, nil, "char") + res, _ = t.SQL(ctx, buf.Get(), "char") + buf.Advance(res.Len()) + buf.Reset() } result_ = res } diff --git a/sql/types/strings.go b/sql/types/strings.go index 38be021754..75890fe90a 100644 --- a/sql/types/strings.go +++ b/sql/types/strings.go @@ -524,11 +524,10 @@ func (t StringType) SQL(ctx *sql.Context, dest []byte, v interface{}) (sqltypes. start := len(dest) var val []byte if IsBinaryType(t) { - v, err = ConvertToBytes(v, t, dest) + val, err = ConvertToBytes(v, t, dest) if err != nil { return sqltypes.Value{}, err } - val = AppendAndSliceBytes(dest, v.([]byte)) } else { var valueBytes []byte switch v := v.(type) { @@ -602,7 +601,8 @@ func (t StringType) SQL(ctx *sql.Context, dest []byte, v interface{}) (sqltypes. snippetStr := strings2.ToValidUTF8(string(snippet), string(utf8.RuneError)) return sqltypes.Value{}, sql.ErrCharSetFailedToEncode.New(resultCharset.Name(), utf8.ValidString(snippetStr), snippet) } - val = AppendAndSliceBytes(dest, encodedBytes) + //val = AppendAndSliceBytes(dest, encodedBytes) + val = encodedBytes } return sqltypes.MakeTrusted(t.baseType, val), nil diff --git a/sql/types/time.go b/sql/types/time.go index f42cb7b535..e2bc4b1a03 100644 --- a/sql/types/time.go +++ b/sql/types/time.go @@ -271,7 +271,7 @@ func (t TimespanType_) SQL(_ *sql.Context, dest []byte, v interface{}) (sqltypes return sqltypes.Value{}, err } - val := AppendAndSliceBytes(dest, ti.Bytes()) + val := ti.Bytes() return sqltypes.MakeTrusted(sqltypes.Time, val), nil } From 5fd4a67e4b119e612097ad9d1e726a10a23fbbad Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Thu, 19 Dec 2024 11:40:53 -0800 Subject: [PATCH 05/22] fix bugs --- enginetest/enginetests.go | 3 ++- server/handler.go | 38 ++++++++++++++++++++------------------ sql/byte_buffer.go | 15 +++++++++++++++ 3 files changed, 37 insertions(+), 19 deletions(-) diff --git a/enginetest/enginetests.go b/enginetest/enginetests.go index feee1256a4..2fe9f14e58 100644 --- a/enginetest/enginetests.go +++ b/enginetest/enginetests.go @@ -5743,6 +5743,7 @@ func TestTypesOverWire(t *testing.T, harness ClientHarness, sessionBuilder serve require.NoError(t, err) expectedRowSet := script.Results[queryIdx] expectedRowIdx := 0 + buf := sql.NewByteBuffer(1000) var engineRow sql.Row for engineRow, err = engineIter.Next(ctx); err == nil; engineRow, err = engineIter.Next(ctx) { if !assert.True(t, r.Next()) { @@ -5760,7 +5761,7 @@ func TestTypesOverWire(t *testing.T, harness ClientHarness, sessionBuilder serve break } expectedEngineRow := make([]*string, len(engineRow)) - row, err := server.RowToSQL(ctx, sch, engineRow, nil, nil) + row, err := server.RowToSQL(ctx, sch, engineRow, nil, buf) if !assert.NoError(t, err) { break } diff --git a/server/handler.go b/server/handler.go index 2d9b3d7f04..a109190a42 100644 --- a/server/handler.go +++ b/server/handler.go @@ -442,9 +442,10 @@ func (h *Handler) doQuery( var r *sqltypes.Result var processedAtLeastOneBatch bool - buf := sql.SingletonBuf + buf := sql.ByteBufPool.Get().(*sql.ByteBuffer) defer func() { - sql.SingletonBuf.Reset() + buf.Reset() + sql.ByteBufPool.Put(buf) }() // zero/single return schema use spooling shortcut @@ -930,6 +931,20 @@ func updateMaxUsedConnectionsStatusVariable() { }() } +func toSqlHelper(ctx *sql.Context, typ sql.Type, buf *sql.ByteBuffer, val interface{}) (sqltypes.Value, error) { + if buf == nil { + return typ.SQL(ctx, buf.Get(), val) + } + spare := buf.Spare() + ret, err := typ.SQL(ctx, buf.Get(), val) + if ret.Len() > spare { + buf.Double() + } else { + buf.Advance(ret.Len()) + } + return ret, err +} + func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Expression, buf *sql.ByteBuffer) ([]sqltypes.Value, error) { // need to make sure the schema is not null as some plan schema is defined as null (e.g. IfElseBlock) if len(sch) == 0 { @@ -937,21 +952,14 @@ func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Express } outVals := make([]sqltypes.Value, len(sch)) + var err error if len(projs) == 0 { for i, col := range sch { if row[i] == nil { outVals[i] = sqltypes.NULL continue } - var err error - spare := buf.Spare() - outVals[i], err = col.Type.SQL(ctx, buf.Get(), row[i]) - if outVals[i].Len() > spare { - buf.Double() - } else { - buf.Advance(outVals[i].Len()) - } - + outVals[i], err = toSqlHelper(ctx, col.Type, buf, row[i]) if err != nil { return nil, err } @@ -968,13 +976,7 @@ func RowToSQL(ctx *sql.Context, sch sql.Schema, row sql.Row, projs []sql.Express outVals[i] = sqltypes.NULL continue } - spare := buf.Spare() - outVals[i], err = col.Type.SQL(ctx, buf.Get(), field) - if outVals[i].Len() > spare { - buf.Double() - } else { - buf.Advance(outVals[i].Len()) - } + outVals[i], err = toSqlHelper(ctx, col.Type, buf, field) if err != nil { return nil, err } diff --git a/sql/byte_buffer.go b/sql/byte_buffer.go index 66d6046b4b..015b3b4e7d 100644 --- a/sql/byte_buffer.go +++ b/sql/byte_buffer.go @@ -1,7 +1,22 @@ package sql +import ( + "sync" +) + var SingletonBuf = NewByteBuffer(16000) +var defaultByteBuffCap = 1000 + +var ByteBufPool = sync.Pool{ + New: func() any { + // The Pool's New function should generally only return pointer + // types, since a pointer can be put into the return interface + // value without an allocation: + return NewByteBuffer(defaultByteBuffCap) + }, +} + type ByteBuffer struct { buf []byte i int From 6e3c3fecd3c9f46a7e678f5490a56cbefe4c3fed Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Thu, 19 Dec 2024 14:47:34 -0800 Subject: [PATCH 06/22] simplify --- server/handler.go | 9 ++------ server/handler_test.go | 19 +++++++++------- sql/byte_buffer.go | 51 ++++++------------------------------------ 3 files changed, 20 insertions(+), 59 deletions(-) diff --git a/server/handler.go b/server/handler.go index a109190a42..f54228ae32 100644 --- a/server/handler.go +++ b/server/handler.go @@ -933,15 +933,10 @@ func updateMaxUsedConnectionsStatusVariable() { func toSqlHelper(ctx *sql.Context, typ sql.Type, buf *sql.ByteBuffer, val interface{}) (sqltypes.Value, error) { if buf == nil { - return typ.SQL(ctx, buf.Get(), val) + return typ.SQL(ctx, nil, val) } - spare := buf.Spare() ret, err := typ.SQL(ctx, buf.Get(), val) - if ret.Len() > spare { - buf.Double() - } else { - buf.Advance(ret.Len()) - } + buf.Update(ret.Raw()) return ret, err } diff --git a/server/handler_test.go b/server/handler_test.go index 6aac1c4c22..3d9de512e2 100644 --- a/server/handler_test.go +++ b/server/handler_test.go @@ -15,6 +15,7 @@ package server import ( + "bytes" "context" "fmt" "io" @@ -799,19 +800,21 @@ func TestHandlerKillQuery(t *testing.T) { }() time.Sleep(100 * time.Millisecond) - var sleepQueryID string + var sleepQueryID []byte err = handler.ComQuery(context.Background(), conn2, "SHOW PROCESSLIST", func(res *sqltypes.Result, more bool) error { // 1, , , test, Query, 0, ... , SELECT SLEEP(1000) // 2, , , test, Query, 0, running, SHOW PROCESSLIST require.Equal(2, len(res.Rows)) hasSleepQuery := false + fmt.Println(res.Rows[0][0].ToString(), res.Rows[0][7].ToString()) + fmt.Println(res.Rows[1][0].ToString(), res.Rows[1][7].ToString()) for _, row := range res.Rows { - if row[7].ToString() != sleepQuery { + if !bytes.Equal(row[7].Raw(), []byte(sleepQuery)) { continue } hasSleepQuery = true - sleepQueryID = row[0].ToString() - require.Equal("Query", row[4].ToString()) + sleepQueryID = row[0].Raw() + require.Equal([]byte("Query"), row[4].Raw()) } require.True(hasSleepQuery) return nil @@ -819,7 +822,7 @@ func TestHandlerKillQuery(t *testing.T) { require.NoError(err) time.Sleep(100 * time.Millisecond) - err = handler.ComQuery(context.Background(), conn2, "KILL QUERY "+sleepQueryID, func(res *sqltypes.Result, more bool) error { + err = handler.ComQuery(context.Background(), conn2, "KILL QUERY "+string(sleepQueryID), func(res *sqltypes.Result, more bool) error { return nil }) require.NoError(err) @@ -832,12 +835,12 @@ func TestHandlerKillQuery(t *testing.T) { require.Equal(2, len(res.Rows)) hasSleepQueryID := false for _, row := range res.Rows { - if row[0].ToString() != sleepQueryID { + if !bytes.Equal(row[0].ToBytes(), sleepQueryID) { continue } hasSleepQueryID = true - require.Equal("Sleep", row[4].ToString()) - require.Equal("", row[7].ToString()) + require.Equal([]byte("Sleep"), row[4].ToBytes()) + require.Equal([]byte{}, row[7].ToBytes()) } require.True(hasSleepQueryID) return nil diff --git a/sql/byte_buffer.go b/sql/byte_buffer.go index 015b3b4e7d..fd819745ae 100644 --- a/sql/byte_buffer.go +++ b/sql/byte_buffer.go @@ -4,69 +4,32 @@ import ( "sync" ) -var SingletonBuf = NewByteBuffer(16000) - -var defaultByteBuffCap = 1000 +const defaultByteBuffCap = 1024 var ByteBufPool = sync.Pool{ New: func() any { - // The Pool's New function should generally only return pointer - // types, since a pointer can be put into the return interface - // value without an allocation: return NewByteBuffer(defaultByteBuffCap) }, } type ByteBuffer struct { buf []byte - i int } func NewByteBuffer(initCap int) *ByteBuffer { - return &ByteBuffer{buf: make([]byte, initCap)} -} - -func (b *ByteBuffer) Bytes() []byte { - return b.buf + return &ByteBuffer{buf: make([]byte, 0, initCap)} } -func (b *ByteBuffer) GetFull(i int) []byte { - start := b.i - b.i = start + i - if b.i > len(b.buf) { - newBuf := make([]byte, len(b.buf)*2) - copy(newBuf, b.buf[:]) - b.buf = newBuf +func (b *ByteBuffer) Update(buf []byte) { + if cap(buf) > cap(b.buf) { + b.buf = buf } - return b.buf[start:b.i] -} - -func (b *ByteBuffer) Double() { - newBuf := make([]byte, len(b.buf)*2) - copy(newBuf, b.buf[:]) - b.buf = newBuf -} - -func (b *ByteBuffer) Advance(i int) { - b.i += i -} - -func (b *ByteBuffer) Spare() int { - return len(b.buf) - b.i } func (b *ByteBuffer) Get() []byte { - //start := b.i - //b.i = start + i - //if b.i > len(b.buf) { - // newBuf := make([]byte, len(b.buf)*2) - // copy(newBuf, b.buf[:]) - // b.buf = newBuf - //} - //return b.buf[start:b.i][:0] - return b.buf[b.i:b.i] + return b.buf[len(b.buf):] } func (b *ByteBuffer) Reset() { - b.i = 0 + b.buf = b.buf[:0] } From 281efd8522caf8c9a0e830fabc97d2e796d494d5 Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Thu, 19 Dec 2024 17:11:31 -0800 Subject: [PATCH 07/22] correct simplification --- server/handler_test.go | 19 ++++++++----------- sql/byte_buffer.go | 26 ++++++++++++++++++++------ sql/types/number.go | 2 ++ 3 files changed, 30 insertions(+), 17 deletions(-) diff --git a/server/handler_test.go b/server/handler_test.go index 3d9de512e2..6aac1c4c22 100644 --- a/server/handler_test.go +++ b/server/handler_test.go @@ -15,7 +15,6 @@ package server import ( - "bytes" "context" "fmt" "io" @@ -800,21 +799,19 @@ func TestHandlerKillQuery(t *testing.T) { }() time.Sleep(100 * time.Millisecond) - var sleepQueryID []byte + var sleepQueryID string err = handler.ComQuery(context.Background(), conn2, "SHOW PROCESSLIST", func(res *sqltypes.Result, more bool) error { // 1, , , test, Query, 0, ... , SELECT SLEEP(1000) // 2, , , test, Query, 0, running, SHOW PROCESSLIST require.Equal(2, len(res.Rows)) hasSleepQuery := false - fmt.Println(res.Rows[0][0].ToString(), res.Rows[0][7].ToString()) - fmt.Println(res.Rows[1][0].ToString(), res.Rows[1][7].ToString()) for _, row := range res.Rows { - if !bytes.Equal(row[7].Raw(), []byte(sleepQuery)) { + if row[7].ToString() != sleepQuery { continue } hasSleepQuery = true - sleepQueryID = row[0].Raw() - require.Equal([]byte("Query"), row[4].Raw()) + sleepQueryID = row[0].ToString() + require.Equal("Query", row[4].ToString()) } require.True(hasSleepQuery) return nil @@ -822,7 +819,7 @@ func TestHandlerKillQuery(t *testing.T) { require.NoError(err) time.Sleep(100 * time.Millisecond) - err = handler.ComQuery(context.Background(), conn2, "KILL QUERY "+string(sleepQueryID), func(res *sqltypes.Result, more bool) error { + err = handler.ComQuery(context.Background(), conn2, "KILL QUERY "+sleepQueryID, func(res *sqltypes.Result, more bool) error { return nil }) require.NoError(err) @@ -835,12 +832,12 @@ func TestHandlerKillQuery(t *testing.T) { require.Equal(2, len(res.Rows)) hasSleepQueryID := false for _, row := range res.Rows { - if !bytes.Equal(row[0].ToBytes(), sleepQueryID) { + if row[0].ToString() != sleepQueryID { continue } hasSleepQueryID = true - require.Equal([]byte("Sleep"), row[4].ToBytes()) - require.Equal([]byte{}, row[7].ToBytes()) + require.Equal("Sleep", row[4].ToString()) + require.Equal("", row[7].ToString()) } require.True(hasSleepQueryID) return nil diff --git a/sql/byte_buffer.go b/sql/byte_buffer.go index fd819745ae..c1c01d8c36 100644 --- a/sql/byte_buffer.go +++ b/sql/byte_buffer.go @@ -4,7 +4,7 @@ import ( "sync" ) -const defaultByteBuffCap = 1024 +const defaultByteBuffCap = 10 var ByteBufPool = sync.Pool{ New: func() any { @@ -13,23 +13,37 @@ var ByteBufPool = sync.Pool{ } type ByteBuffer struct { + i int buf []byte } func NewByteBuffer(initCap int) *ByteBuffer { - return &ByteBuffer{buf: make([]byte, 0, initCap)} + buf := make([]byte, initCap) + return &ByteBuffer{buf: buf} } func (b *ByteBuffer) Update(buf []byte) { - if cap(buf) > cap(b.buf) { - b.buf = buf + if b.i+len(buf) > len(b.buf) { + // Runtime alloc'd into a separate backing array, but it chooses + // the doubling cap using the non-optimal |cap(b.buf)-b.i|*2. + // We do not need to increment |b.i| b/c the latest value is in + // the other array. + b.Double() + } else { + b.i += len(buf) } } +func (b *ByteBuffer) Double() { + buf := make([]byte, len(b.buf)*2) + copy(buf, b.buf) + b.buf = buf +} + func (b *ByteBuffer) Get() []byte { - return b.buf[len(b.buf):] + return b.buf[b.i:b.i] } func (b *ByteBuffer) Reset() { - b.buf = b.buf[:0] + b.i = 0 } diff --git a/sql/types/number.go b/sql/types/number.go index 41ae6d4671..6398c0ddae 100644 --- a/sql/types/number.go +++ b/sql/types/number.go @@ -613,6 +613,8 @@ func (t NumberTypeImpl_) SQL(ctx *sql.Context, dest []byte, v interface{}) (sqlt default: return sqltypes.Value{}, err } + } else if err != nil { + return sqltypes.Value{}, err } val := dest[stop:] From ee7dc63d312eab4ce1a99e05a3b69fab1a7105f3 Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Thu, 19 Dec 2024 17:13:32 -0800 Subject: [PATCH 08/22] page sized spool buffer --- sql/byte_buffer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/byte_buffer.go b/sql/byte_buffer.go index c1c01d8c36..43bfd4185c 100644 --- a/sql/byte_buffer.go +++ b/sql/byte_buffer.go @@ -4,7 +4,7 @@ import ( "sync" ) -const defaultByteBuffCap = 10 +const defaultByteBuffCap = 4096 var ByteBufPool = sync.Pool{ New: func() any { From fcae966fde6c3b6a277a1d918f7982db462b3baf Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Thu, 19 Dec 2024 17:18:50 -0800 Subject: [PATCH 09/22] fix build --- sql/types/sql_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/types/sql_test.go b/sql/types/sql_test.go index 44eecc4d05..2d84706e44 100644 --- a/sql/types/sql_test.go +++ b/sql/types/sql_test.go @@ -28,7 +28,7 @@ func BenchmarkVarchar10SQL(b *testing.B) { ctx := sql.NewEmptyContext() for i := 0; i < b.N; i++ { res, _ = t.SQL(ctx, buf.Get(), "char") - buf.Advance(res.Len()) + buf.Update(res.Raw()) buf.Reset() } result_ = res From ba830b2f8c266f69073c16bf72a84e0dc4976cc2 Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Thu, 19 Dec 2024 17:39:49 -0800 Subject: [PATCH 10/22] comments --- server/handler.go | 2 +- sql/byte_buffer.go | 14 +++++++++++--- sql/types/sql_test.go | 2 +- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/server/handler.go b/server/handler.go index f54228ae32..9bec7fc071 100644 --- a/server/handler.go +++ b/server/handler.go @@ -936,7 +936,7 @@ func toSqlHelper(ctx *sql.Context, typ sql.Type, buf *sql.ByteBuffer, val interf return typ.SQL(ctx, nil, val) } ret, err := typ.SQL(ctx, buf.Get(), val) - buf.Update(ret.Raw()) + buf.Grow(ret.Len()) return ret, err } diff --git a/sql/byte_buffer.go b/sql/byte_buffer.go index 43bfd4185c..f67e8bf3ef 100644 --- a/sql/byte_buffer.go +++ b/sql/byte_buffer.go @@ -22,24 +22,32 @@ func NewByteBuffer(initCap int) *ByteBuffer { return &ByteBuffer{buf: buf} } -func (b *ByteBuffer) Update(buf []byte) { - if b.i+len(buf) > len(b.buf) { +// Grow records the latest used byte position. Callers +// are responsible for accurately reporting which bytes +// they expect to be protected. +func (b *ByteBuffer) Grow(n int) { + if b.i+n > len(b.buf) { // Runtime alloc'd into a separate backing array, but it chooses // the doubling cap using the non-optimal |cap(b.buf)-b.i|*2. // We do not need to increment |b.i| b/c the latest value is in // the other array. b.Double() } else { - b.i += len(buf) + b.i += n } } +// Double expands the backing array by 2x. We do this +// here because the runtime only doubles based on slice +// length. func (b *ByteBuffer) Double() { buf := make([]byte, len(b.buf)*2) copy(buf, b.buf) b.buf = buf } +// Get returns a zero length slice beginning at a safe +// write position. func (b *ByteBuffer) Get() []byte { return b.buf[b.i:b.i] } diff --git a/sql/types/sql_test.go b/sql/types/sql_test.go index 2d84706e44..5c90f4ddf0 100644 --- a/sql/types/sql_test.go +++ b/sql/types/sql_test.go @@ -28,7 +28,7 @@ func BenchmarkVarchar10SQL(b *testing.B) { ctx := sql.NewEmptyContext() for i := 0; i < b.N; i++ { res, _ = t.SQL(ctx, buf.Get(), "char") - buf.Update(res.Raw()) + buf.Grow(res.Len()) buf.Reset() } result_ = res From e5e4a5af55557832fd0ffb8c601b8837133370b2 Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Thu, 19 Dec 2024 17:48:22 -0800 Subject: [PATCH 11/22] bump timeout --- server/handler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/handler_test.go b/server/handler_test.go index 6aac1c4c22..ba899a07f0 100644 --- a/server/handler_test.go +++ b/server/handler_test.go @@ -789,7 +789,7 @@ func TestHandlerKillQuery(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - sleepQuery := "SELECT SLEEP(1)" + sleepQuery := "SELECT SLEEP(2)" go func() { defer wg.Done() err = handler.ComQuery(context.Background(), conn1, sleepQuery, func(res *sqltypes.Result, more bool) error { From b362f57235bc20013ad354104e3ea550d5cbd163 Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Thu, 19 Dec 2024 17:57:14 -0800 Subject: [PATCH 12/22] bump timeout --- server/handler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/handler_test.go b/server/handler_test.go index ba899a07f0..daa4d89718 100644 --- a/server/handler_test.go +++ b/server/handler_test.go @@ -789,7 +789,7 @@ func TestHandlerKillQuery(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - sleepQuery := "SELECT SLEEP(2)" + sleepQuery := "SELECT SLEEP(5)" go func() { defer wg.Done() err = handler.ComQuery(context.Background(), conn1, sleepQuery, func(res *sqltypes.Result, more bool) error { From b2e0e808b5ef08cdb1094e61fbe1ecd0e8266ad8 Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Fri, 20 Dec 2024 12:03:22 -0800 Subject: [PATCH 13/22] fix race --- server/handler_test.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/server/handler_test.go b/server/handler_test.go index daa4d89718..61692dcc5d 100644 --- a/server/handler_test.go +++ b/server/handler_test.go @@ -20,6 +20,7 @@ import ( "io" "net" "strconv" + "strings" "sync" "testing" "time" @@ -789,16 +790,16 @@ func TestHandlerKillQuery(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - sleepQuery := "SELECT SLEEP(5)" + sleepQuery := "SELECT SLEEP(100000)" go func() { defer wg.Done() - err = handler.ComQuery(context.Background(), conn1, sleepQuery, func(res *sqltypes.Result, more bool) error { + // need a local |err| variable to avoid being overwritten + err := handler.ComQuery(context.Background(), conn1, sleepQuery, func(res *sqltypes.Result, more bool) error { return nil }) require.Error(err) }() - time.Sleep(100 * time.Millisecond) var sleepQueryID string err = handler.ComQuery(context.Background(), conn2, "SHOW PROCESSLIST", func(res *sqltypes.Result, more bool) error { // 1, , , test, Query, 0, ... , SELECT SLEEP(1000) @@ -810,7 +811,10 @@ func TestHandlerKillQuery(t *testing.T) { continue } hasSleepQuery = true - sleepQueryID = row[0].ToString() + // the values inside a callback are generally only valid for the + // duration of the query, and need to be copied to avoid being + // overwritten + sleepQueryID = strings.Clone(row[0].ToString()) require.Equal("Query", row[4].ToString()) } require.True(hasSleepQuery) @@ -818,14 +822,11 @@ func TestHandlerKillQuery(t *testing.T) { }) require.NoError(err) - time.Sleep(100 * time.Millisecond) err = handler.ComQuery(context.Background(), conn2, "KILL QUERY "+sleepQueryID, func(res *sqltypes.Result, more bool) error { return nil }) require.NoError(err) wg.Wait() - - time.Sleep(100 * time.Millisecond) err = handler.ComQuery(context.Background(), conn2, "SHOW PROCESSLIST", func(res *sqltypes.Result, more bool) error { // 1, , , test, Sleep, 0, , // 2, , , test, Query, 0, running, SHOW PROCESSLIST From d17b1c0a2f01beac4ca5563221ab7fa7c9935c4a Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Fri, 20 Dec 2024 12:12:50 -0800 Subject: [PATCH 14/22] try separate sleep error --- server/handler_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/handler_test.go b/server/handler_test.go index 61692dcc5d..4cca5749a6 100644 --- a/server/handler_test.go +++ b/server/handler_test.go @@ -794,10 +794,10 @@ func TestHandlerKillQuery(t *testing.T) { go func() { defer wg.Done() // need a local |err| variable to avoid being overwritten - err := handler.ComQuery(context.Background(), conn1, sleepQuery, func(res *sqltypes.Result, more bool) error { + serr := handler.ComQuery(context.Background(), conn1, sleepQuery, func(res *sqltypes.Result, more bool) error { return nil }) - require.Error(err) + require.Error(serr) }() var sleepQueryID string From 49a0a9988516690b6c756a384041262a2d0c4fca Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Fri, 20 Dec 2024 12:27:24 -0800 Subject: [PATCH 15/22] vitess bump --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index e774bc733f..bf123fd6e6 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/dolthub/go-icu-regex v0.0.0-20241215010122-db690dd53c90 github.com/dolthub/jsonpath v0.0.2-0.20240227200619-19675ab05c71 github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81 - github.com/dolthub/vitess v0.0.0-20241211024425-b00987f7ba54 + github.com/dolthub/vitess v0.0.0-20241220202600-b18f18d0cde7 github.com/go-kit/kit v0.10.0 github.com/go-sql-driver/mysql v1.7.2-0.20231213112541-0004702b931d github.com/gocraft/dbr/v2 v2.7.2 diff --git a/go.sum b/go.sum index a06fca9d22..03f7164365 100644 --- a/go.sum +++ b/go.sum @@ -58,8 +58,8 @@ github.com/dolthub/jsonpath v0.0.2-0.20240227200619-19675ab05c71 h1:bMGS25NWAGTE github.com/dolthub/jsonpath v0.0.2-0.20240227200619-19675ab05c71/go.mod h1:2/2zjLQ/JOOSbbSboojeg+cAwcRV0fDLzIiWch/lhqI= github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81 h1:7/v8q9XGFa6q5Ap4Z/OhNkAMBaK5YeuEzwJt+NZdhiE= github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81/go.mod h1:siLfyv2c92W1eN/R4QqG/+RjjX5W2+gCTRjZxBjI3TY= -github.com/dolthub/vitess v0.0.0-20241211024425-b00987f7ba54 h1:nzBnC0Rt1gFtscJEz4veYd/mazZEdbdmed+tujdaKOo= -github.com/dolthub/vitess v0.0.0-20241211024425-b00987f7ba54/go.mod h1:1gQZs/byeHLMSul3Lvl3MzioMtOW1je79QYGyi2fd70= +github.com/dolthub/vitess v0.0.0-20241220202600-b18f18d0cde7 h1:w130WLeARGGNYWmhGPugsHXzJEelKKimt3kTWg6/Puk= +github.com/dolthub/vitess v0.0.0-20241220202600-b18f18d0cde7/go.mod h1:1gQZs/byeHLMSul3Lvl3MzioMtOW1je79QYGyi2fd70= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= From 5982731db12d4ac21e814c497d17b7ec344e51f8 Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Fri, 20 Dec 2024 12:36:27 -0800 Subject: [PATCH 16/22] see if sleep error masks a different error --- server/handler_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/handler_test.go b/server/handler_test.go index 4cca5749a6..34eba62b3d 100644 --- a/server/handler_test.go +++ b/server/handler_test.go @@ -794,10 +794,10 @@ func TestHandlerKillQuery(t *testing.T) { go func() { defer wg.Done() // need a local |err| variable to avoid being overwritten - serr := handler.ComQuery(context.Background(), conn1, sleepQuery, func(res *sqltypes.Result, more bool) error { + _ = handler.ComQuery(context.Background(), conn1, sleepQuery, func(res *sqltypes.Result, more bool) error { return nil }) - require.Error(serr) + //require.Error(serr) }() var sleepQueryID string @@ -806,6 +806,8 @@ func TestHandlerKillQuery(t *testing.T) { // 2, , , test, Query, 0, running, SHOW PROCESSLIST require.Equal(2, len(res.Rows)) hasSleepQuery := false + fmt.Println(res.Rows[0][0], res.Rows[0][4], res.Rows[0][7]) + fmt.Println(res.Rows[1][0], res.Rows[1][4], res.Rows[1][7]) for _, row := range res.Rows { if row[7].ToString() != sleepQuery { continue From 112556b19e6befd7e29e2f61d9869f82c3c0d589 Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Fri, 20 Dec 2024 12:45:08 -0800 Subject: [PATCH 17/22] add sleeps back --- server/handler_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/handler_test.go b/server/handler_test.go index 34eba62b3d..66484ffbc7 100644 --- a/server/handler_test.go +++ b/server/handler_test.go @@ -800,6 +800,7 @@ func TestHandlerKillQuery(t *testing.T) { //require.Error(serr) }() + time.Sleep(100 * time.Millisecond) var sleepQueryID string err = handler.ComQuery(context.Background(), conn2, "SHOW PROCESSLIST", func(res *sqltypes.Result, more bool) error { // 1, , , test, Query, 0, ... , SELECT SLEEP(1000) @@ -824,11 +825,13 @@ func TestHandlerKillQuery(t *testing.T) { }) require.NoError(err) + time.Sleep(100 * time.Millisecond) err = handler.ComQuery(context.Background(), conn2, "KILL QUERY "+sleepQueryID, func(res *sqltypes.Result, more bool) error { return nil }) require.NoError(err) wg.Wait() + time.Sleep(100 * time.Millisecond) err = handler.ComQuery(context.Background(), conn2, "SHOW PROCESSLIST", func(res *sqltypes.Result, more bool) error { // 1, , , test, Sleep, 0, , // 2, , , test, Query, 0, running, SHOW PROCESSLIST From fc476f6a11e2c0e3d055d16246d96aae519b4d43 Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Fri, 20 Dec 2024 12:50:37 -0800 Subject: [PATCH 18/22] more error check where it won't hide other errors --- server/handler_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/handler_test.go b/server/handler_test.go index 66484ffbc7..6bc3f1e20b 100644 --- a/server/handler_test.go +++ b/server/handler_test.go @@ -791,13 +791,13 @@ func TestHandlerKillQuery(t *testing.T) { var wg sync.WaitGroup wg.Add(1) sleepQuery := "SELECT SLEEP(100000)" + var sleepErr error go func() { defer wg.Done() // need a local |err| variable to avoid being overwritten - _ = handler.ComQuery(context.Background(), conn1, sleepQuery, func(res *sqltypes.Result, more bool) error { + sleepErr = handler.ComQuery(context.Background(), conn1, sleepQuery, func(res *sqltypes.Result, more bool) error { return nil }) - //require.Error(serr) }() time.Sleep(100 * time.Millisecond) @@ -831,6 +831,8 @@ func TestHandlerKillQuery(t *testing.T) { }) require.NoError(err) wg.Wait() + require.Error(sleepErr) + time.Sleep(100 * time.Millisecond) err = handler.ComQuery(context.Background(), conn2, "SHOW PROCESSLIST", func(res *sqltypes.Result, more bool) error { // 1, , , test, Sleep, 0, , From 3ce2865f7e7dfd19467fc4323be9e360eb1d1a87 Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Fri, 20 Dec 2024 12:54:41 -0800 Subject: [PATCH 19/22] remove handler test race --- server/handler_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server/handler_test.go b/server/handler_test.go index 6bc3f1e20b..510d5338df 100644 --- a/server/handler_test.go +++ b/server/handler_test.go @@ -800,7 +800,8 @@ func TestHandlerKillQuery(t *testing.T) { }) }() - time.Sleep(100 * time.Millisecond) + wg.Wait() + wg.Add(1) var sleepQueryID string err = handler.ComQuery(context.Background(), conn2, "SHOW PROCESSLIST", func(res *sqltypes.Result, more bool) error { // 1, , , test, Query, 0, ... , SELECT SLEEP(1000) @@ -825,7 +826,6 @@ func TestHandlerKillQuery(t *testing.T) { }) require.NoError(err) - time.Sleep(100 * time.Millisecond) err = handler.ComQuery(context.Background(), conn2, "KILL QUERY "+sleepQueryID, func(res *sqltypes.Result, more bool) error { return nil }) @@ -833,7 +833,6 @@ func TestHandlerKillQuery(t *testing.T) { wg.Wait() require.Error(sleepErr) - time.Sleep(100 * time.Millisecond) err = handler.ComQuery(context.Background(), conn2, "SHOW PROCESSLIST", func(res *sqltypes.Result, more bool) error { // 1, , , test, Sleep, 0, , // 2, , , test, Query, 0, running, SHOW PROCESSLIST From 3a7e950afce78feec6bcb8d65adada52d66981ca Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Fri, 20 Dec 2024 13:36:34 -0800 Subject: [PATCH 20/22] revert back to racey with sleeps --- server/handler_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/handler_test.go b/server/handler_test.go index 510d5338df..6bc3f1e20b 100644 --- a/server/handler_test.go +++ b/server/handler_test.go @@ -800,8 +800,7 @@ func TestHandlerKillQuery(t *testing.T) { }) }() - wg.Wait() - wg.Add(1) + time.Sleep(100 * time.Millisecond) var sleepQueryID string err = handler.ComQuery(context.Background(), conn2, "SHOW PROCESSLIST", func(res *sqltypes.Result, more bool) error { // 1, , , test, Query, 0, ... , SELECT SLEEP(1000) @@ -826,6 +825,7 @@ func TestHandlerKillQuery(t *testing.T) { }) require.NoError(err) + time.Sleep(100 * time.Millisecond) err = handler.ComQuery(context.Background(), conn2, "KILL QUERY "+sleepQueryID, func(res *sqltypes.Result, more bool) error { return nil }) @@ -833,6 +833,7 @@ func TestHandlerKillQuery(t *testing.T) { wg.Wait() require.Error(sleepErr) + time.Sleep(100 * time.Millisecond) err = handler.ComQuery(context.Background(), conn2, "SHOW PROCESSLIST", func(res *sqltypes.Result, more bool) error { // 1, , , test, Sleep, 0, , // 2, , , test, Query, 0, running, SHOW PROCESSLIST From 210c5c78964ccae242f6e9834b2066e92edbccef Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Fri, 20 Dec 2024 15:11:52 -0800 Subject: [PATCH 21/22] zach comments --- sql/byte_buffer.go | 32 +++++++++++++++---- sql/byte_buffer_test.go | 71 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 7 deletions(-) create mode 100644 sql/byte_buffer_test.go diff --git a/sql/byte_buffer.go b/sql/byte_buffer.go index f67e8bf3ef..03a977f209 100644 --- a/sql/byte_buffer.go +++ b/sql/byte_buffer.go @@ -1,3 +1,17 @@ +// Copyright 2024 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package sql import ( @@ -26,15 +40,19 @@ func NewByteBuffer(initCap int) *ByteBuffer { // are responsible for accurately reporting which bytes // they expect to be protected. func (b *ByteBuffer) Grow(n int) { - if b.i+n > len(b.buf) { - // Runtime alloc'd into a separate backing array, but it chooses - // the doubling cap using the non-optimal |cap(b.buf)-b.i|*2. - // We do not need to increment |b.i| b/c the latest value is in - // the other array. + newI := b.i + if b.i+n <= len(b.buf) { + // Increment |b.i| if no alloc + newI += n + } + if b.i+n >= len(b.buf) { + // No more space, double. + // An external allocation doubled the cap using the size of + // the override object, which if used could lead to overall + // shrinking behavior. b.Double() - } else { - b.i += n } + b.i = newI } // Double expands the backing array by 2x. We do this diff --git a/sql/byte_buffer_test.go b/sql/byte_buffer_test.go new file mode 100644 index 0000000000..c458bcf6c1 --- /dev/null +++ b/sql/byte_buffer_test.go @@ -0,0 +1,71 @@ +// Copyright 2024 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sql + +import ( + "github.com/stretchr/testify/require" + "testing" +) + +func TestGrowByteBuffer(t *testing.T) { + b := NewByteBuffer(10) + + // grow less than boundary + src1 := []byte{1, 1, 1} + obj1 := append(b.Get(), src1...) + b.Grow(len(src1)) + + require.Equal(t, 10, len(b.buf)) + require.Equal(t, 3, b.i) + require.Equal(t, 10, cap(obj1)) + + // grow to boundary + src2 := []byte{0, 0, 0, 0, 0, 0, 0} + obj2 := append(b.Get(), src2...) + b.Grow(len(src2)) + + require.Equal(t, 20, len(b.buf)) + require.Equal(t, 10, b.i) + require.Equal(t, 7, cap(obj2)) + + src3 := []byte{2, 2, 2, 2, 2} + obj3 := append(b.Get(), src3...) + b.Grow(len(src3)) + + require.Equal(t, 20, len(b.buf)) + require.Equal(t, 15, b.i) + require.Equal(t, 10, cap(obj3)) + + // grow exceeds boundary + + src4 := []byte{3, 3, 3, 3, 3, 3, 3, 3} + obj4 := append(b.Get(), src4...) + b.Grow(len(src4)) + + require.Equal(t, 40, len(b.buf)) + require.Equal(t, 15, b.i) + require.Equal(t, 16, cap(obj4)) + + // objects are all valid after doubling + require.Equal(t, src1, obj1) + require.Equal(t, src2, obj2) + require.Equal(t, src3, obj3) + require.Equal(t, src4, obj4) + + // reset + b.Reset() + require.Equal(t, 40, len(b.buf)) + require.Equal(t, 0, b.i) +} From 4e37c3b49858b5671b1a13041ac9059f5498097a Mon Sep 17 00:00:00 2001 From: max-hoffman Date: Fri, 20 Dec 2024 23:13:08 +0000 Subject: [PATCH 22/22] [ga-format-pr] Run ./format_repo.sh to fix formatting --- sql/byte_buffer_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/byte_buffer_test.go b/sql/byte_buffer_test.go index c458bcf6c1..afe67aa1b7 100644 --- a/sql/byte_buffer_test.go +++ b/sql/byte_buffer_test.go @@ -15,8 +15,9 @@ package sql import ( - "github.com/stretchr/testify/require" "testing" + + "github.com/stretchr/testify/require" ) func TestGrowByteBuffer(t *testing.T) {