Skip to content

Commit 6a17fd1

Browse files
author
James Cor
committed
exhaust partitions correctly
1 parent 2335335 commit 6a17fd1

File tree

1 file changed

+37
-34
lines changed

1 file changed

+37
-34
lines changed

sql/table_iter.go

Lines changed: 37 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -45,39 +45,45 @@ func (i *TableRowIter) start(ctx *Context) {
4545

4646
go func() {
4747
defer close(i.rowChan)
48-
defer close(i.errChan)
49-
50-
partition, err := i.partitions.Next(ctx)
51-
if err != nil {
52-
if err == io.EOF {
53-
i.partitions.Close(ctx)
54-
return
55-
}
56-
i.errChan <- err
57-
return
58-
}
59-
60-
rowIter, riErr := i.table.PartitionRows(ctx, partition)
61-
if riErr != nil {
62-
i.errChan <- riErr
63-
return
64-
}
6548

6649
for {
67-
row, rErr := rowIter.Next(ctx)
68-
if rErr != nil {
69-
if rErr == io.EOF {
70-
rowIter.Close(ctx)
50+
partition, err := i.partitions.Next(ctx)
51+
if err != nil {
52+
if err == io.EOF {
53+
if err = i.partitions.Close(ctx); err != nil {
54+
i.errChan <- err
55+
}
7156
return
7257
}
73-
i.errChan <- rErr
58+
i.errChan <- err
7459
return
7560
}
76-
select {
77-
case i.rowChan <- row:
78-
case <-ctx.Done():
61+
62+
rowIter, riErr := i.table.PartitionRows(ctx, partition)
63+
if riErr != nil {
64+
i.errChan <- riErr
7965
return
8066
}
67+
68+
for {
69+
row, rErr := rowIter.Next(ctx)
70+
if rErr != nil {
71+
if rErr == io.EOF {
72+
if rErr = rowIter.Close(ctx); rErr != nil {
73+
i.errChan <- rErr
74+
return
75+
}
76+
break
77+
}
78+
i.errChan <- rErr
79+
return
80+
}
81+
select {
82+
case i.rowChan <- row:
83+
case <-ctx.Done():
84+
return
85+
}
86+
}
8187
}
8288
}()
8389
})
@@ -89,18 +95,14 @@ func (i *TableRowIter) Next(ctx *Context) (Row, error) {
8995
select {
9096
case <-ctx.Done():
9197
return nil, ctx.Err()
98+
case err := <-i.errChan:
99+
return nil, err
92100
case row, ok := <-i.rowChan:
93-
if ok {
94-
return row, nil
95-
}
96-
select {
97-
case err := <-i.errChan:
98-
if err != nil {
99-
return nil, err
100-
}
101+
if !ok {
102+
return nil, io.EOF
101103
}
104+
return row, nil
102105
}
103-
104106
return nil, io.EOF
105107

106108
// TODO: multithread partitions?
@@ -142,6 +144,7 @@ func (i *TableRowIter) Next(ctx *Context) (Row, error) {
142144
}
143145

144146
func (i *TableRowIter) Close(ctx *Context) error {
147+
return nil
145148
if i.rows != nil {
146149
if err := i.rows.Close(ctx); err != nil {
147150
_ = i.partitions.Close(ctx)

0 commit comments

Comments
 (0)