Skip to content
This repository was archived by the owner on Jan 28, 2021. It is now read-only.

Commit cdea792

Browse files
erizocosmicoajnavarro
authored andcommitted
sql/(plan,analyzer): track partitions of indexable tables too
Signed-off-by: Miguel Molina <[email protected]> (cherry picked from commit a71b9ee)
1 parent cdc6c8c commit cdea792

File tree

6 files changed

+171
-9
lines changed

6 files changed

+171
-9
lines changed

sql/analyzer/process.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ func trackProcess(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node, error) {
2222
n, err := n.TransformUp(func(n sql.Node) (sql.Node, error) {
2323
switch n := n.(type) {
2424
case *plan.ResolvedTable:
25-
if _, ok := n.Table.(*plan.ProcessTable); ok {
25+
switch n.Table.(type) {
26+
case *plan.ProcessTable, *plan.ProcessIndexableTable:
2627
return n, nil
2728
}
2829

@@ -42,9 +43,18 @@ func trackProcess(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node, error) {
4243
processList.AddProgressItem(ctx.Pid(), name, total)
4344

4445
seen[name] = struct{}{}
45-
t := plan.NewProcessTable(n.Table, func() {
46+
47+
notify := func() {
4648
processList.UpdateProgress(ctx.Pid(), name, 1)
47-
})
49+
}
50+
51+
var t sql.Table
52+
switch table := n.Table.(type) {
53+
case sql.IndexableTable:
54+
t = plan.NewProcessIndexableTable(table, notify)
55+
default:
56+
t = plan.NewProcessTable(table, notify)
57+
}
4858

4959
return plan.NewResolvedTable(t), nil
5060
default:

sql/analyzer/process_test.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func TestTrackProcess(t *testing.T) {
1919
a := NewDefault(catalog)
2020

2121
node := plan.NewInnerJoin(
22-
plan.NewResolvedTable(mem.NewPartitionedTable("foo", nil, 2)),
22+
plan.NewResolvedTable(&table{mem.NewPartitionedTable("foo", nil, 2)}),
2323
plan.NewResolvedTable(mem.NewPartitionedTable("bar", nil, 4)),
2424
expression.NewLiteral(int64(1), sql.Int64),
2525
)
@@ -53,7 +53,7 @@ func TestTrackProcess(t *testing.T) {
5353

5454
rhs, ok := join.Right.(*plan.ResolvedTable)
5555
require.True(ok)
56-
_, ok = rhs.Table.(*plan.ProcessTable)
56+
_, ok = rhs.Table.(*plan.ProcessIndexableTable)
5757
require.True(ok)
5858

5959
iter, err := proc.RowIter(ctx)
@@ -75,3 +75,14 @@ func withoutProcessTracking(a *Analyzer) *Analyzer {
7575
afterAll.Rules = afterAll.Rules[1:]
7676
return a
7777
}
78+
79+
// wrapper around sql.Table to make it not indexable
80+
type table struct {
81+
sql.Table
82+
}
83+
84+
var _ sql.PartitionCounter = (*table)(nil)
85+
86+
func (t *table) PartitionCount(ctx *sql.Context) (int64, error) {
87+
return t.Table.(sql.PartitionCounter).PartitionCount(ctx)
88+
}

sql/plan/create_index.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,10 @@ func (c *CreateIndex) Resolved() bool {
7676

7777
func getIndexableTable(t sql.Table) (sql.IndexableTable, error) {
7878
switch t := t.(type) {
79-
case sql.TableWrapper:
80-
return getIndexableTable(t.Underlying())
8179
case sql.IndexableTable:
8280
return t, nil
81+
case sql.TableWrapper:
82+
return getIndexableTable(t.Underlying())
8383
default:
8484
return nil, ErrInsertIntoNotSupported.New()
8585
}

sql/plan/insert.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,10 @@ func getInsertable(node sql.Node) (sql.Inserter, error) {
4949

5050
func getInsertableTable(t sql.Table) (sql.Inserter, error) {
5151
switch t := t.(type) {
52-
case sql.TableWrapper:
53-
return getInsertableTable(t.Underlying())
5452
case sql.Inserter:
5553
return t, nil
54+
case sql.TableWrapper:
55+
return getInsertableTable(t.Underlying())
5656
default:
5757
return nil, ErrInsertIntoNotSupported.New()
5858
}

sql/plan/process.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,50 @@ func (p *QueryProcess) RowIter(ctx *sql.Context) (sql.RowIter, error) {
5757

5858
func (p *QueryProcess) String() string { return p.Child.String() }
5959

60+
// ProcessIndexableTable is a wrapper for sql.Tables inside a query process
61+
// that support indexing.
62+
// It notifies the process manager about the status of a query when a
63+
// partition is processed.
64+
type ProcessIndexableTable struct {
65+
sql.IndexableTable
66+
Notify NotifyFunc
67+
}
68+
69+
// NewProcessIndexableTable returns a new ProcessIndexableTable.
70+
func NewProcessIndexableTable(t sql.IndexableTable, notify NotifyFunc) *ProcessIndexableTable {
71+
return &ProcessIndexableTable{t, notify}
72+
}
73+
74+
// Underlying implements sql.TableWrapper interface.
75+
func (t *ProcessIndexableTable) Underlying() sql.Table {
76+
return t.IndexableTable
77+
}
78+
79+
// IndexKeyValues implements the sql.IndexableTable interface.
80+
func (t *ProcessIndexableTable) IndexKeyValues(
81+
ctx *sql.Context,
82+
columns []string,
83+
) (sql.PartitionIndexKeyValueIter, error) {
84+
iter, err := t.IndexableTable.IndexKeyValues(ctx, columns)
85+
if err != nil {
86+
return nil, err
87+
}
88+
89+
return &trackedPartitionIndexKeyValueIter{iter, t.Notify}, nil
90+
}
91+
92+
// PartitionRows implements the sql.Table interface.
93+
func (t *ProcessIndexableTable) PartitionRows(ctx *sql.Context, p sql.Partition) (sql.RowIter, error) {
94+
iter, err := t.IndexableTable.PartitionRows(ctx, p)
95+
if err != nil {
96+
return nil, err
97+
}
98+
99+
return &trackedRowIter{iter, t.Notify}, nil
100+
}
101+
102+
var _ sql.IndexableTable = (*ProcessIndexableTable)(nil)
103+
60104
// ProcessTable is a wrapper for sql.Tables inside a query process. It
61105
// notifies the process manager about the status of a query when a partition
62106
// is processed.
@@ -112,3 +156,46 @@ func (i *trackedRowIter) Close() error {
112156
i.done()
113157
return i.iter.Close()
114158
}
159+
160+
type trackedPartitionIndexKeyValueIter struct {
161+
sql.PartitionIndexKeyValueIter
162+
notify NotifyFunc
163+
}
164+
165+
func (i *trackedPartitionIndexKeyValueIter) Next() (sql.Partition, sql.IndexKeyValueIter, error) {
166+
p, iter, err := i.PartitionIndexKeyValueIter.Next()
167+
if err != nil {
168+
return nil, nil, err
169+
}
170+
171+
return p, &trackedIndexKeyValueIter{iter, i.notify}, nil
172+
}
173+
174+
type trackedIndexKeyValueIter struct {
175+
iter sql.IndexKeyValueIter
176+
notify NotifyFunc
177+
}
178+
179+
func (i *trackedIndexKeyValueIter) done() {
180+
if i.notify != nil {
181+
i.notify()
182+
i.notify = nil
183+
}
184+
}
185+
186+
func (i *trackedIndexKeyValueIter) Close() error {
187+
i.done()
188+
return nil
189+
}
190+
191+
func (i *trackedIndexKeyValueIter) Next() ([]interface{}, []byte, error) {
192+
v, k, err := i.iter.Next()
193+
if err != nil {
194+
if err == io.EOF {
195+
i.done()
196+
}
197+
return nil, nil, err
198+
}
199+
200+
return v, k, nil
201+
}

sql/plan/process_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package plan
22

33
import (
4+
"io"
45
"testing"
56

67
"github.com/stretchr/testify/require"
@@ -92,3 +93,56 @@ func TestProcessTable(t *testing.T) {
9293
require.ElementsMatch(expected, rows)
9394
require.Equal(2, notifications)
9495
}
96+
97+
func TestProcessIndexableTable(t *testing.T) {
98+
require := require.New(t)
99+
100+
table := mem.NewPartitionedTable("foo", sql.Schema{
101+
{Name: "a", Type: sql.Int64, Source: "foo"},
102+
}, 2)
103+
104+
table.Insert(sql.NewEmptyContext(), sql.NewRow(int64(1)))
105+
table.Insert(sql.NewEmptyContext(), sql.NewRow(int64(2)))
106+
table.Insert(sql.NewEmptyContext(), sql.NewRow(int64(3)))
107+
table.Insert(sql.NewEmptyContext(), sql.NewRow(int64(4)))
108+
109+
var notifications int
110+
111+
pt := NewProcessIndexableTable(
112+
table,
113+
func() {
114+
notifications++
115+
},
116+
)
117+
118+
iter, err := pt.IndexKeyValues(sql.NewEmptyContext(), []string{"a"})
119+
require.NoError(err)
120+
121+
var values [][]interface{}
122+
for {
123+
_, kviter, err := iter.Next()
124+
if err == io.EOF {
125+
break
126+
}
127+
require.NoError(err)
128+
129+
for {
130+
v, _, err := kviter.Next()
131+
if err == io.EOF {
132+
break
133+
}
134+
values = append(values, v)
135+
require.NoError(err)
136+
}
137+
}
138+
139+
expectedValues := [][]interface{}{
140+
{int64(1)},
141+
{int64(2)},
142+
{int64(3)},
143+
{int64(4)},
144+
}
145+
146+
require.ElementsMatch(expectedValues, values)
147+
require.Equal(2, notifications)
148+
}

0 commit comments

Comments
 (0)