Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 97 additions & 30 deletions sql/table_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package sql

import (
"io"
"sync"
)

// TableRowIter is an iterator over the partitions in a table.
Expand All @@ -24,6 +25,13 @@ type TableRowIter struct {
partitions PartitionIter
partition Partition
rows RowIter

parts []Partition
currPart int

rowChan chan Row
errChan chan error
once sync.Once
}

var _ RowIter = (*TableRowIter)(nil)
Expand All @@ -33,50 +41,109 @@ func NewTableRowIter(ctx *Context, table Table, partitions PartitionIter) *Table
return &TableRowIter{table: table, partitions: partitions}
}

func (i *TableRowIter) Next(ctx *Context) (Row, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
if i.partition == nil {
partition, err := i.partitions.Next(ctx)
if err != nil {
if err == io.EOF {
if e := i.partitions.Close(ctx); e != nil {
return nil, e
func (i *TableRowIter) start(ctx *Context) {
i.once.Do(func() {
i.rowChan = make(chan Row, 128)
i.errChan = make(chan error)
i.parts = make([]Partition, 0)

for {
partition, err := i.partitions.Next(ctx)
if err != nil {
if err == io.EOF {
if err = i.partitions.Close(ctx); err != nil {
panic(err)
}
break
}
panic(err)
return
}
i.parts = append(i.parts, partition)
}

return nil, err
// TODO: if very few partitions, just read sequentially
if len(i.parts) < 2 {
return
}

i.partition = partition
}
go func() {
defer close(i.rowChan)
for idx := 0; idx < len(i.parts); idx++ {
rowIter, riErr := i.table.PartitionRows(ctx, i.parts[idx])
if riErr != nil {
i.errChan <- riErr
return
}
for {
row, rErr := rowIter.Next(ctx)
if rErr != nil {
if rErr == io.EOF {
if rErr = rowIter.Close(ctx); rErr != nil {
i.errChan <- rErr
return
}
break
}
i.errChan <- rErr
return
}
select {
case i.rowChan <- row:
case <-ctx.Done():
return
}
}
}
}()
})
}

if i.rows == nil {
rows, err := i.table.PartitionRows(ctx, i.partition)
if err != nil {
return nil, err
}
func (i *TableRowIter) Next(ctx *Context) (Row, error) {
i.start(ctx)

i.rows = rows
if len(i.parts) < 2 {
for i.currPart < len(i.parts) {
if i.partition == nil {
i.partition = i.parts[i.currPart]
}
if i.rows == nil {
rows, err := i.table.PartitionRows(ctx, i.partition)
if err != nil {
return nil, err
}
i.rows = rows
}
row, err := i.rows.Next(ctx)
if err != nil && err == io.EOF {
if err = i.rows.Close(ctx); err != nil {
return nil, err
}
i.partition = nil
i.rows = nil
i.currPart++
continue
}
return row, err
}
return nil, io.EOF
}

row, err := i.rows.Next(ctx)
if err != nil && err == io.EOF {
if err = i.rows.Close(ctx); err != nil {
return nil, err
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-i.errChan:
return nil, err
case row, ok := <-i.rowChan:
if !ok {
return nil, io.EOF
}

i.partition = nil
i.rows = nil
row, err = i.Next(ctx)
return row, nil
}
return row, err
}

func (i *TableRowIter) Close(ctx *Context) error {
return nil
if i.rows != nil {
if err := i.rows.Close(ctx); err != nil {
_ = i.partitions.Close(ctx)
Expand Down
Loading