Skip to content

Commit 3949508

Browse files
author
James Cor
committed
multi thread partition?
1 parent 3d7fa0e commit 3949508

File tree

1 file changed

+80
-0
lines changed

1 file changed

+80
-0
lines changed

sql/table_iter.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package sql
1616

1717
import (
1818
"io"
19+
"sync"
1920
)
2021

2122
// TableRowIter is an iterator over the partitions in a table.
@@ -24,6 +25,11 @@ type TableRowIter struct {
2425
partitions PartitionIter
2526
partition Partition
2627
rows RowIter
28+
29+
currChan int
30+
rowChans []<-chan Row
31+
errChans []<-chan error
32+
once sync.Once
2733
}
2834

2935
var _ RowIter = (*TableRowIter)(nil)
@@ -33,12 +39,86 @@ func NewTableRowIter(ctx *Context, table Table, partitions PartitionIter) *Table
3339
return &TableRowIter{table: table, partitions: partitions}
3440
}
3541

42+
func (i *TableRowIter) start(ctx *Context) {
43+
i.once.Do(func() {
44+
for {
45+
partition, err := i.partitions.Next(ctx)
46+
if err != nil {
47+
if err == io.EOF {
48+
//continue
49+
break
50+
}
51+
}
52+
53+
rowChan := make(chan Row, 128)
54+
errChan := make(chan error, 1)
55+
56+
i.rowChans = append(i.rowChans, rowChan)
57+
i.errChans = append(i.errChans, errChan)
58+
59+
go func() {
60+
defer close(rowChan)
61+
defer close(errChan)
62+
63+
rowIter, riErr := i.table.PartitionRows(ctx, partition)
64+
if riErr != nil {
65+
errChan <- riErr
66+
return
67+
}
68+
defer rowIter.Close(ctx) // TODO: handle error
69+
70+
for {
71+
row, rErr := rowIter.Next(ctx)
72+
if rErr != nil {
73+
if rErr == io.EOF {
74+
return
75+
}
76+
errChan <- rErr
77+
return
78+
}
79+
select {
80+
case rowChan <- row:
81+
case <-ctx.Done():
82+
return
83+
}
84+
}
85+
}()
86+
}
87+
})
88+
}
89+
3690
func (i *TableRowIter) Next(ctx *Context) (Row, error) {
3791
select {
3892
case <-ctx.Done():
3993
return nil, ctx.Err()
4094
default:
4195
}
96+
97+
i.start(ctx)
98+
99+
for i.currChan < len(i.rowChans) {
100+
rowChan := i.rowChans[i.currChan]
101+
errChan := i.errChans[i.currChan]
102+
103+
select {
104+
case <-ctx.Done():
105+
return nil, ctx.Err()
106+
case err := <-errChan:
107+
if err != nil {
108+
return nil, err
109+
}
110+
case row, ok := <-rowChan:
111+
if !ok {
112+
i.currChan++
113+
continue
114+
}
115+
return row, nil
116+
}
117+
}
118+
119+
return nil, io.EOF
120+
121+
// TODO: multithread partitions?
42122
if i.partition == nil {
43123
partition, err := i.partitions.Next(ctx)
44124
if err != nil {

0 commit comments

Comments
 (0)