Skip to content

Commit d6a773c

Browse files
author
James Cor
committed
look ahead of partitions
1 parent 3949508 commit d6a773c

File tree

1 file changed

+43
-59
lines changed

1 file changed

+43
-59
lines changed

sql/table_iter.go

Lines changed: 43 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,9 @@ type TableRowIter struct {
2626
partition Partition
2727
rows RowIter
2828

29-
currChan int
30-
rowChans []<-chan Row
31-
errChans []<-chan error
32-
once sync.Once
29+
rowChan chan Row
30+
errChan chan error
31+
once sync.Once
3332
}
3433

3534
var _ RowIter = (*TableRowIter)(nil)
@@ -41,79 +40,64 @@ func NewTableRowIter(ctx *Context, table Table, partitions PartitionIter) *Table
4140

4241
func (i *TableRowIter) start(ctx *Context) {
4342
i.once.Do(func() {
44-
for {
43+
i.rowChan = make(chan Row, 1024)
44+
i.errChan = make(chan error, 1)
45+
46+
go func() {
47+
defer close(i.rowChan)
48+
defer close(i.errChan)
49+
4550
partition, err := i.partitions.Next(ctx)
4651
if err != nil {
4752
if err == io.EOF {
48-
//continue
49-
break
53+
i.partitions.Close(ctx)
54+
return
5055
}
56+
i.errChan <- err
57+
return
5158
}
5259

53-
rowChan := make(chan Row, 128)
54-
errChan := make(chan error, 1)
55-
56-
i.rowChans = append(i.rowChans, rowChan)
57-
i.errChans = append(i.errChans, errChan)
58-
59-
go func() {
60-
defer close(rowChan)
61-
defer close(errChan)
60+
rowIter, riErr := i.table.PartitionRows(ctx, partition)
61+
if riErr != nil {
62+
i.errChan <- riErr
63+
return
64+
}
6265

63-
rowIter, riErr := i.table.PartitionRows(ctx, partition)
64-
if riErr != nil {
65-
errChan <- riErr
66-
return
67-
}
68-
defer rowIter.Close(ctx) // TODO: handle error
69-
70-
for {
71-
row, rErr := rowIter.Next(ctx)
72-
if rErr != nil {
73-
if rErr == io.EOF {
74-
return
75-
}
76-
errChan <- rErr
77-
return
78-
}
79-
select {
80-
case rowChan <- row:
81-
case <-ctx.Done():
66+
for {
67+
row, rErr := rowIter.Next(ctx)
68+
if rErr != nil {
69+
if rErr == io.EOF {
70+
rowIter.Close(ctx)
8271
return
8372
}
73+
i.errChan <- rErr
74+
return
8475
}
85-
}()
86-
}
76+
select {
77+
case i.rowChan <- row:
78+
case <-ctx.Done():
79+
return
80+
}
81+
}
82+
}()
8783
})
8884
}
8985

9086
func (i *TableRowIter) Next(ctx *Context) (Row, error) {
87+
i.start(ctx)
88+
9189
select {
9290
case <-ctx.Done():
9391
return nil, ctx.Err()
94-
default:
95-
}
96-
97-
i.start(ctx)
98-
99-
for i.currChan < len(i.rowChans) {
100-
rowChan := i.rowChans[i.currChan]
101-
errChan := i.errChans[i.currChan]
102-
103-
select {
104-
case <-ctx.Done():
105-
return nil, ctx.Err()
106-
case err := <-errChan:
107-
if err != nil {
108-
return nil, err
109-
}
110-
case row, ok := <-rowChan:
111-
if !ok {
112-
i.currChan++
113-
continue
114-
}
115-
return row, nil
92+
case err := <-i.errChan:
93+
if err != nil {
94+
return nil, err
95+
}
96+
case row, ok := <-i.rowChan:
97+
if !ok {
98+
return nil, io.EOF
11699
}
100+
return row, nil
117101
}
118102

119103
return nil, io.EOF

0 commit comments

Comments
 (0)