Skip to content

Commit 9bd9e2d

Browse files
committed
merge
2 parents 8c8842a + 29e26f9 commit 9bd9e2d

File tree

13 files changed

+175
-23
lines changed

13 files changed

+175
-23
lines changed

enginetest/enginetests.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,95 @@ func TestInfoSchema(t *testing.T, h Harness) {
359359
)
360360
})
361361

362+
t.Run("information_schema.processlist projection with alias case", func(t *testing.T) {
363+
e := mustNewEngine(t, h)
364+
defer e.Close()
365+
366+
if IsServerEngine(e) {
367+
t.Skip("skipping for server engine as the processlist returned from server differs")
368+
}
369+
p := sqle.NewProcessList()
370+
p.AddConnection(1, "localhost")
371+
372+
ctx := NewContext(h)
373+
ctx.Session.SetClient(sql.Client{Address: "localhost", User: "root"})
374+
ctx.Session.SetConnectionId(1)
375+
ctx.ProcessList = p
376+
ctx.SetCurrentDatabase("")
377+
378+
p.ConnectionReady(ctx.Session)
379+
380+
ctx, err := p.BeginQuery(ctx, "SELECT foo")
381+
require.NoError(t, err)
382+
383+
p.AddConnection(2, "otherhost")
384+
sess2 := sql.NewBaseSessionWithClientServer("localhost", sql.Client{Address: "otherhost", User: "root"}, 2)
385+
sess2.SetCurrentDatabase("otherdb")
386+
p.ConnectionReady(sess2)
387+
ctx2 := sql.NewContext(context.Background(), sql.WithPid(2), sql.WithSession(sess2))
388+
ctx2, err = p.BeginQuery(ctx2, "SELECT bar")
389+
require.NoError(t, err)
390+
p.EndQuery(ctx2)
391+
392+
TestQueryWithContext(t, ctx, e, h,
393+
"SELECT id, uSeR, hOST FROM information_schema.processlist pl ORDER BY id",
394+
[]sql.Row{
395+
{uint64(1), "root", "localhost"},
396+
{uint64(2), "root", "otherhost"},
397+
},
398+
sql.Schema{
399+
{Name: "id", Type: types.Uint64},
400+
{Name: "uSeR", Type: types.MustCreateString(sqltypes.VarChar, 96, sql.Collation_Information_Schema_Default)},
401+
{Name: "hOST", Type: types.MustCreateString(sqltypes.VarChar, 783, sql.Collation_Information_Schema_Default)},
402+
},
403+
nil, nil,
404+
)
405+
})
406+
407+
t.Run("information_schema.processlist projection with aliased join case", func(t *testing.T) {
408+
e := mustNewEngine(t, h)
409+
defer e.Close()
410+
411+
if IsServerEngine(e) {
412+
t.Skip("skipping for server engine as the processlist returned from server differs")
413+
}
414+
p := sqle.NewProcessList()
415+
p.AddConnection(1, "localhost")
416+
417+
ctx := NewContext(h)
418+
ctx.Session.SetClient(sql.Client{Address: "localhost", User: "root"})
419+
ctx.Session.SetConnectionId(1)
420+
ctx.ProcessList = p
421+
ctx.SetCurrentDatabase("")
422+
423+
p.ConnectionReady(ctx.Session)
424+
425+
ctx, err := p.BeginQuery(ctx, "SELECT foo")
426+
require.NoError(t, err)
427+
428+
p.AddConnection(2, "otherhost")
429+
sess2 := sql.NewBaseSessionWithClientServer("localhost", sql.Client{Address: "otherhost", User: "root"}, 2)
430+
sess2.SetCurrentDatabase("otherdb")
431+
p.ConnectionReady(sess2)
432+
ctx2 := sql.NewContext(context.Background(), sql.WithPid(2), sql.WithSession(sess2))
433+
ctx2, err = p.BeginQuery(ctx2, "SELECT bar")
434+
require.NoError(t, err)
435+
p.EndQuery(ctx2)
436+
437+
TestQueryWithContext(t, ctx, e, h,
438+
"SELECT id, uSeR, hOST FROM information_schema.processlist pl join information_schema.schemata on true ORDER BY id limit 1",
439+
[]sql.Row{
440+
{uint64(1), "root", "localhost"},
441+
},
442+
sql.Schema{
443+
{Name: "id", Type: types.Uint64},
444+
{Name: "uSeR", Type: types.MustCreateString(sqltypes.VarChar, 96, sql.Collation_Information_Schema_Default)},
445+
{Name: "hOST", Type: types.MustCreateString(sqltypes.VarChar, 783, sql.Collation_Information_Schema_Default)},
446+
},
447+
nil, nil,
448+
)
449+
})
450+
362451
for _, tt := range queries.SkippedInfoSchemaQueries {
363452
t.Run(tt.Query, func(t *testing.T) {
364453
t.Skip()

enginetest/queries/script_queries.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,17 @@ CREATE TABLE teams (
143143
},
144144
},
145145
},
146+
{
147+
Name: "keyless reverse index",
148+
SetUpScript: []string{
149+
"create table x (x int, key(x));",
150+
"insert into x values (0),(1)",
151+
},
152+
Query: "select * from x order by x desc limit 1",
153+
Expected: []sql.Row{
154+
{1},
155+
},
156+
},
146157
{
147158
Name: "filter pushdown through join uppercase name",
148159
SetUpScript: []string{

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ require (
66
github.com/dolthub/go-icu-regex v0.0.0-20241215010122-db690dd53c90
77
github.com/dolthub/jsonpath v0.0.2-0.20240227200619-19675ab05c71
88
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81
9-
github.com/dolthub/vitess v0.0.0-20241220202600-b18f18d0cde7
9+
github.com/dolthub/vitess v0.0.0-20241231200706-18992bb25fdc
1010
github.com/go-kit/kit v0.10.0
1111
github.com/go-sql-driver/mysql v1.7.2-0.20231213112541-0004702b931d
1212
github.com/gocraft/dbr/v2 v2.7.2

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81 h1:7/v8q9X
6060
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81/go.mod h1:siLfyv2c92W1eN/R4QqG/+RjjX5W2+gCTRjZxBjI3TY=
6161
github.com/dolthub/vitess v0.0.0-20241220202600-b18f18d0cde7 h1:w130WLeARGGNYWmhGPugsHXzJEelKKimt3kTWg6/Puk=
6262
github.com/dolthub/vitess v0.0.0-20241220202600-b18f18d0cde7/go.mod h1:1gQZs/byeHLMSul3Lvl3MzioMtOW1je79QYGyi2fd70=
63+
github.com/dolthub/vitess v0.0.0-20241231200706-18992bb25fdc h1:3FuwEDwyue/JuHdnwGSbQhE9xKAFM+k1y3uXi58h7Gk=
64+
github.com/dolthub/vitess v0.0.0-20241231200706-18992bb25fdc/go.mod h1:1gQZs/byeHLMSul3Lvl3MzioMtOW1je79QYGyi2fd70=
6365
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
6466
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
6567
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=

server/extension.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ func Intercept(h Interceptor) {
3535
func WithChain() Option {
3636
return func(e *sqle.Engine, sm *SessionManager, handler mysql.Handler) {
3737
f := DefaultProtocolListenerFunc
38-
DefaultProtocolListenerFunc = func(cfg mysql.ListenerConfig) (ProtocolListener, error) {
38+
DefaultProtocolListenerFunc = func(cfg mysql.ListenerConfig, sel ServerEventListener) (ProtocolListener, error) {
3939
cfg.Handler = buildChain(cfg.Handler)
40-
return f(cfg)
40+
return f(cfg, sel)
4141
}
4242
}
4343
}

server/handler.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -634,6 +634,12 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
634634
timer := time.NewTimer(waitTime)
635635
defer timer.Stop()
636636

637+
// Wrap the callback to include a BytesBuffer.Reset() to clean out rows that have already been spooled
638+
resetCallback := func(r *sqltypes.Result, more bool) error {
639+
defer buf.Reset()
640+
return callback(r, more)
641+
}
642+
637643
// Reads rows from the channel, converts them to wire format,
638644
// and calls |callback| to give them to vitess.
639645
eg.Go(func() error {
@@ -645,7 +651,7 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
645651
r = &sqltypes.Result{Fields: resultFields}
646652
}
647653
if r.RowsAffected == rowsBatch {
648-
if err := callback(r, more); err != nil {
654+
if err := resetCallback(r, more); err != nil {
649655
return err
650656
}
651657
r = nil

server/server.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,13 @@ type ProtocolListener interface {
3737
}
3838

3939
// ProtocolListenerFunc returns a ProtocolListener based on the configuration it was given.
40-
type ProtocolListenerFunc func(cfg mysql.ListenerConfig) (ProtocolListener, error)
40+
type ProtocolListenerFunc func(cfg mysql.ListenerConfig, sel ServerEventListener) (ProtocolListener, error)
4141

4242
// DefaultProtocolListenerFunc is the protocol listener, which defaults to Vitess' protocol listener. Changing
4343
// this function will change the protocol listener used when creating all servers. If multiple servers are needed
4444
// with different protocols, then create each server after changing this function. Servers retain the protocol that
4545
// they were created with.
46-
var DefaultProtocolListenerFunc ProtocolListenerFunc = func(cfg mysql.ListenerConfig) (ProtocolListener, error) {
46+
var DefaultProtocolListenerFunc ProtocolListenerFunc = func(cfg mysql.ListenerConfig, sel ServerEventListener) (ProtocolListener, error) {
4747
return mysql.NewListenerWithConfig(cfg)
4848
}
4949

@@ -75,7 +75,7 @@ func NewServer(cfg Config, e *sqle.Engine, sb SessionBuilder, listener ServerEve
7575
sel: listener,
7676
}
7777
//handler = NewHandler_(e, sm, cfg.ConnReadTimeout, cfg.DisableClientMultiStatements, cfg.MaxLoggedQueryLen, cfg.EncodeLoggedQuery, listener)
78-
return newServerFromHandler(cfg, e, sm, handler)
78+
return newServerFromHandler(cfg, e, sm, handler, listener)
7979
}
8080

8181
// HandlerWrapper provides a way for clients to wrap the mysql.Handler used by the server with a custom implementation
@@ -113,7 +113,7 @@ func NewServerWithHandler(
113113
return nil, err
114114
}
115115

116-
return newServerFromHandler(cfg, e, sm, handler)
116+
return newServerFromHandler(cfg, e, sm, handler, listener)
117117
}
118118

119119
func portInUse(hostPort string) bool {
@@ -126,7 +126,7 @@ func portInUse(hostPort string) bool {
126126
return false
127127
}
128128

129-
func newServerFromHandler(cfg Config, e *sqle.Engine, sm *SessionManager, handler mysql.Handler) (*Server, error) {
129+
func newServerFromHandler(cfg Config, e *sqle.Engine, sm *SessionManager, handler mysql.Handler, sel ServerEventListener) (*Server, error) {
130130
for _, option := range cfg.Options {
131131
option(e, sm, handler)
132132
}
@@ -169,7 +169,7 @@ func newServerFromHandler(cfg Config, e *sqle.Engine, sm *SessionManager, handle
169169
ConnReadBufferSize: mysql.DefaultConnBufferSize,
170170
AllowClearTextWithoutTLS: cfg.AllowClearTextWithoutTLS,
171171
}
172-
protocolListener, err := DefaultProtocolListenerFunc(listenerCfg)
172+
protocolListener, err := DefaultProtocolListenerFunc(listenerCfg, sel)
173173
if err != nil {
174174
return nil, err
175175
}

sql/analyzer/indexed_joins.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1070,9 +1070,18 @@ func addMergeJoins(ctx *sql.Context, m *memo.Memo) error {
10701070
// Check to see if any rIndexes match that set of filters
10711071
// Remove the last matched filter
10721072
for _, lIndex := range lIndexes {
1073+
if lIndex.Order() == sql.IndexOrderNone {
1074+
// lookups can be unordered, merge indexes need to
1075+
// be globally ordered
1076+
continue
1077+
}
1078+
10731079
matchedEqFilters := matchedFiltersForLeftIndex(lIndex, join.Left.RelProps.FuncDeps().Constants(), eqFilters)
10741080
for len(matchedEqFilters) > 0 {
10751081
for _, rIndex := range rIndexes {
1082+
if rIndex.Order() == sql.IndexOrderNone {
1083+
continue
1084+
}
10761085
if rightIndexMatchesFilters(rIndex, join.Left.RelProps.FuncDeps().Constants(), matchedEqFilters) {
10771086
jb := join.Copy()
10781087
if d, ok := jb.Left.First.(*memo.Distinct); ok && lIndex.SqlIdx().IsUnique() {

sql/memo/memo.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -743,14 +743,15 @@ type SourceRel interface {
743743

744744
type Index struct {
745745
// ordered list of index columns
746-
order []sql.ColumnId
746+
cols []sql.ColumnId
747747
// unordered column set
748-
set sql.ColSet
749-
idx sql.Index
748+
set sql.ColSet
749+
idx sql.Index
750+
order sql.IndexOrder
750751
}
751752

752753
func (i *Index) Cols() []sql.ColumnId {
753-
return i.order
754+
return i.cols
754755
}
755756

756757
func (i *Index) ColSet() sql.ColSet {
@@ -761,6 +762,10 @@ func (i *Index) SqlIdx() sql.Index {
761762
return i.idx
762763
}
763764

765+
func (i *Index) Order() sql.IndexOrder {
766+
return i.order
767+
}
768+
764769
type sourceBase struct {
765770
*relBase
766771
indexes []*Index

sql/memo/rel_props.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,10 @@ func (p *relProps) populateFds() {
226226
// strict if primary key or all nonNull and unique
227227
columns := idxExprsColumns(idx)
228228
strict := true
229-
normIdx := &Index{idx: idx, order: make([]sql.ColumnId, len(columns))}
229+
normIdx := &Index{idx: idx, cols: make([]sql.ColumnId, len(columns)), order: sql.IndexOrderNone}
230+
if oidx, ok := idx.(sql.OrderedIndex); ok {
231+
normIdx.order = oidx.Order()
232+
}
230233
for i, c := range columns {
231234
ord := sch.IndexOfColName(strings.ToLower(c))
232235
idOffset := firstCol + sql.ColumnId(ord)
@@ -236,7 +239,7 @@ func (p *relProps) populateFds() {
236239
p.grp.m.HandleErr(err)
237240
}
238241
normIdx.set.Add(colId)
239-
normIdx.order[i] = colId
242+
normIdx.cols[i] = colId
240243
if !notNull.Contains(colId) {
241244
strict = false
242245
}

0 commit comments

Comments
 (0)