Skip to content

Commit 11b8d2e

Browse files
author
James Cor
committed
both?
1 parent 6a17fd1 commit 11b8d2e

File tree

1 file changed

+54
-55
lines changed

1 file changed

+54
-55
lines changed

sql/table_iter.go

Lines changed: 54 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ type TableRowIter struct {
2626
partition Partition
2727
rows RowIter
2828

29+
parts []Partition
30+
currPart int
31+
2932
rowChan chan Row
3033
errChan chan error
3134
once sync.Once
@@ -40,31 +43,38 @@ func NewTableRowIter(ctx *Context, table Table, partitions PartitionIter) *Table
4043

4144
func (i *TableRowIter) start(ctx *Context) {
4245
i.once.Do(func() {
43-
i.rowChan = make(chan Row, 1024)
46+
i.rowChan = make(chan Row, 128)
4447
i.errChan = make(chan error)
45-
46-
go func() {
47-
defer close(i.rowChan)
48-
49-
for {
50-
partition, err := i.partitions.Next(ctx)
51-
if err != nil {
52-
if err == io.EOF {
53-
if err = i.partitions.Close(ctx); err != nil {
54-
i.errChan <- err
55-
}
56-
return
48+
i.parts = make([]Partition, 0)
49+
50+
for {
51+
partition, err := i.partitions.Next(ctx)
52+
if err != nil {
53+
if err == io.EOF {
54+
if err = i.partitions.Close(ctx); err != nil {
55+
panic(err)
5756
}
58-
i.errChan <- err
59-
return
57+
break
6058
}
59+
panic(err)
60+
return
61+
}
62+
i.parts = append(i.parts, partition)
63+
}
64+
65+
// TODO: if very few partitions, just read sequentially
66+
if len(i.parts) < 2 {
67+
return
68+
}
6169

62-
rowIter, riErr := i.table.PartitionRows(ctx, partition)
70+
go func() {
71+
defer close(i.rowChan)
72+
for idx := 0; idx < len(i.parts); idx++ {
73+
rowIter, riErr := i.table.PartitionRows(ctx, i.parts[idx])
6374
if riErr != nil {
6475
i.errChan <- riErr
6576
return
6677
}
67-
6878
for {
6979
row, rErr := rowIter.Next(ctx)
7080
if rErr != nil {
@@ -92,6 +102,33 @@ func (i *TableRowIter) start(ctx *Context) {
92102
func (i *TableRowIter) Next(ctx *Context) (Row, error) {
93103
i.start(ctx)
94104

105+
if len(i.parts) < 2 {
106+
for i.currPart < len(i.parts) {
107+
if i.partition == nil {
108+
i.partition = i.parts[i.currPart]
109+
}
110+
if i.rows == nil {
111+
rows, err := i.table.PartitionRows(ctx, i.partition)
112+
if err != nil {
113+
return nil, err
114+
}
115+
i.rows = rows
116+
}
117+
row, err := i.rows.Next(ctx)
118+
if err != nil && err == io.EOF {
119+
if err = i.rows.Close(ctx); err != nil {
120+
return nil, err
121+
}
122+
i.partition = nil
123+
i.rows = nil
124+
i.currPart++
125+
continue
126+
}
127+
return row, err
128+
}
129+
return nil, io.EOF
130+
}
131+
95132
select {
96133
case <-ctx.Done():
97134
return nil, ctx.Err()
@@ -103,44 +140,6 @@ func (i *TableRowIter) Next(ctx *Context) (Row, error) {
103140
}
104141
return row, nil
105142
}
106-
return nil, io.EOF
107-
108-
// TODO: multithread partitions?
109-
if i.partition == nil {
110-
partition, err := i.partitions.Next(ctx)
111-
if err != nil {
112-
if err == io.EOF {
113-
if e := i.partitions.Close(ctx); e != nil {
114-
return nil, e
115-
}
116-
}
117-
118-
return nil, err
119-
}
120-
121-
i.partition = partition
122-
}
123-
124-
if i.rows == nil {
125-
rows, err := i.table.PartitionRows(ctx, i.partition)
126-
if err != nil {
127-
return nil, err
128-
}
129-
130-
i.rows = rows
131-
}
132-
133-
row, err := i.rows.Next(ctx)
134-
if err != nil && err == io.EOF {
135-
if err = i.rows.Close(ctx); err != nil {
136-
return nil, err
137-
}
138-
139-
i.partition = nil
140-
i.rows = nil
141-
row, err = i.Next(ctx)
142-
}
143-
return row, err
144143
}
145144

146145
func (i *TableRowIter) Close(ctx *Context) error {

0 commit comments

Comments
 (0)