Skip to content

Commit f2c1129

Browse files
authored
Merge pull request #1942 from dolthub/zachmu/iter-fix
Better in memory index iterator implementation
2 parents db0c4a2 + 7378789 commit f2c1129

File tree

5 files changed

+62
-58
lines changed

5 files changed

+62
-58
lines changed

server/tables/pgcatalog/inmem_index.go

Lines changed: 62 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package pgcatalog
1616

1717
import (
1818
"io"
19+
"iter"
1920

2021
"github.com/dolthub/go-mysql-server/sql"
2122
"github.com/google/btree"
@@ -29,7 +30,8 @@ type inMemIndexScanIter[T any] struct {
2930
btreeAccess BTreeStorageAccess[T]
3031
rowConverter rowConverter[T]
3132
rangeIdx int
32-
nextChan chan T
33+
next func() (T, bool)
34+
stop func()
3335
}
3436

3537
var _ sql.RowIter = (*inMemIndexScanIter[any])(nil)
@@ -61,6 +63,9 @@ func (l *inMemIndexScanIter[T]) Next(ctx *sql.Context) (sql.Row, error) {
6163

6264
// Close implements the sql.RowIter interface.
6365
func (l *inMemIndexScanIter[T]) Close(ctx *sql.Context) error {
66+
if l.stop != nil {
67+
l.stop()
68+
}
6469
return nil
6570
}
6671

@@ -71,38 +76,32 @@ func (l *inMemIndexScanIter[T]) nextItem() (*T, error) {
7176
return nil, io.EOF
7277
}
7378

74-
if l.nextChan != nil {
75-
next, ok := <-l.nextChan
79+
if l.next != nil {
80+
next, ok := l.next()
7681
if !ok {
77-
l.nextChan = nil
82+
l.stop()
83+
l.next = nil
84+
l.stop = nil
7885
l.rangeIdx++
7986
return l.nextItem()
8087
}
8188
return &next, nil
8289
}
8390

8491
inMemIndex := l.lookup.Index.(pgCatalogInMemIndex)
85-
86-
l.nextChan = make(chan T)
8792
rng := l.lookup.Ranges.ToRanges()[l.rangeIdx]
88-
go func() {
89-
defer func() {
90-
close(l.nextChan)
91-
}()
92-
93-
gte, hasLowerBound, lt, hasUpperBound := l.rangeConverter.getIndexScanRange(rng, l.lookup.Index)
94-
idx := l.btreeAccess.getIndex(inMemIndex.name)
95-
if hasLowerBound && hasUpperBound {
96-
idx.IterRange(gte, lt, l.nextChan)
97-
} else if hasLowerBound {
98-
idx.IterGreaterThanEqual(gte, l.nextChan)
99-
} else if hasUpperBound {
100-
idx.IterLessThan(lt, l.nextChan)
101-
} else {
102-
// We don't support nil lookups for this kind of index, there are never nillable elements
103-
return
104-
}
105-
}()
93+
gte, hasLowerBound, lt, hasUpperBound := l.rangeConverter.getIndexScanRange(rng, l.lookup.Index)
94+
idx := l.btreeAccess.getIndex(inMemIndex.name)
95+
if hasLowerBound && hasUpperBound {
96+
l.next, l.stop = idx.IterRange(gte, lt)
97+
} else if hasLowerBound {
98+
l.next, l.stop = idx.IterGreaterThanEqual(gte)
99+
} else if hasUpperBound {
100+
l.next, l.stop = idx.IterLessThan(lt)
101+
} else {
102+
// We don't support nil lookups for this kind of index, there are never nillable elements
103+
return nil, io.EOF
104+
}
106105

107106
return l.nextItem()
108107
}
@@ -270,50 +269,63 @@ func (s *inMemIndexStorage[T]) Add(val T) {
270269

271270
// IterRange implements an in-order iteration over the index values in the range [gte, lt). All values in the
272271
// index in the range are sent to the channel
273-
func (s *inMemIndexStorage[T]) IterRange(gte, lt T, c chan T) {
272+
func (s *inMemIndexStorage[T]) IterRange(gte, lt T) (next func() (T, bool), stop func()) {
274273
if s.uniqTree != nil {
275-
s.uniqTree.AscendRange(gte, lt, s.sendItem(c))
274+
return iter.Pull(func(yield func(T) bool) {
275+
s.uniqTree.AscendRange(gte, lt, yield)
276+
})
276277
} else {
277-
s.nonUniqTree.AscendRange([]T{gte}, []T{lt}, s.sendItems(c))
278+
next, stop := iter.Pull(func(yield func([]T) bool) {
279+
s.nonUniqTree.AscendRange([]T{gte}, []T{lt}, yield)
280+
})
281+
return s.unnestIter(next, stop)
278282
}
279283
}
280284

281285
// IterGreaterThanEqual implements an in-order iteration over the index values greater than or equal to the given value.
282286
// All values in the index greater than or equal to the given value are sent to the channel.
283-
func (s *inMemIndexStorage[T]) IterGreaterThanEqual(gte T, c chan T) {
287+
func (s *inMemIndexStorage[T]) IterGreaterThanEqual(gte T) (next func() (T, bool), stop func()) {
284288
if s.uniqTree != nil {
285-
s.uniqTree.AscendGreaterOrEqual(gte, s.sendItem(c))
289+
return iter.Pull(func(yield func(T) bool) {
290+
s.uniqTree.AscendGreaterOrEqual(gte, yield)
291+
})
286292
} else {
287-
s.nonUniqTree.AscendGreaterOrEqual([]T{gte}, s.sendItems(c))
293+
next, stop := iter.Pull(func(yield func([]T) bool) {
294+
s.nonUniqTree.AscendGreaterOrEqual([]T{gte}, yield)
295+
})
296+
return s.unnestIter(next, stop)
288297
}
289298
}
290299

291300
// IterLessThan implements an in-order iteration over the index values less than the given value.
292301
// All values in the index less than or equal to the given value are sent to the channel.
293-
func (s *inMemIndexStorage[T]) IterLessThan(lt T, c chan T) {
302+
func (s *inMemIndexStorage[T]) IterLessThan(lt T) (next func() (T, bool), stop func()) {
294303
if s.uniqTree != nil {
295-
s.uniqTree.AscendLessThan(lt, s.sendItem(c))
304+
return iter.Pull(func(yield func(T) bool) {
305+
s.uniqTree.AscendLessThan(lt, yield)
306+
})
296307
} else {
297-
s.nonUniqTree.AscendLessThan([]T{lt}, s.sendItems(c))
308+
next, stop := iter.Pull(func(yield func([]T) bool) {
309+
s.nonUniqTree.AscendLessThan([]T{lt}, yield)
310+
})
311+
return s.unnestIter(next, stop)
298312
}
299313
}
300314

301-
// sendItem returns an iterator function that sends the given item to the channel. This function returns a bool to
302-
// conform to the interface for the Ascend* methods in the btree package.
303-
func (s *inMemIndexStorage[T]) sendItem(c chan T) btree.ItemIteratorG[T] {
304-
return func(item T) bool {
305-
c <- item
306-
return true
307-
}
308-
}
309-
310-
// sendItems returns an iterator function that sends all items in the given slice to the channel. This function
311-
// returns a bool to conform to the interface for the Ascend* methods in the btree package.
312-
func (s *inMemIndexStorage[T]) sendItems(c chan T) btree.ItemIteratorG[[]T] {
313-
return func(items []T) bool {
314-
for _, item := range items {
315-
c <- item
315+
// unnestIter takes an iterator that returns slices of T, and returns an iterator that returns individual T values.
316+
func (s *inMemIndexStorage[T]) unnestIter(sNext func() ([]T, bool), sStop func()) (next func() (T, bool), stop func()) {
317+
return iter.Pull(func(yield func(T) bool) {
318+
defer sStop()
319+
for {
320+
items, ok := sNext()
321+
if !ok {
322+
return
323+
}
324+
for _, item := range items {
325+
if !yield(item) {
326+
return
327+
}
328+
}
316329
}
317-
return true
318-
}
330+
})
319331
}

server/tables/pgcatalog/pg_attribute.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,6 @@ func (p PgAttributeHandler) RowIter(ctx *sql.Context, partition sql.Partition) (
7070
rangeConverter: p,
7171
btreeAccess: pgCatalogCache.pgAttributes,
7272
rowConverter: pgAttributeToRow,
73-
rangeIdx: 0,
74-
nextChan: nil,
7573
}, nil
7674
}
7775

server/tables/pgcatalog/pg_class.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,6 @@ func (p PgClassHandler) RowIter(ctx *sql.Context, partition sql.Partition) (sql.
6969
rangeConverter: p,
7070
btreeAccess: pgCatalogCache.pgClasses,
7171
rowConverter: pgClassToRow,
72-
rangeIdx: 0,
73-
nextChan: nil,
7472
}, nil
7573
}
7674

server/tables/pgcatalog/pg_constraint.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@ func (p PgConstraintHandler) RowIter(ctx *sql.Context, partition sql.Partition)
6767
rangeConverter: p,
6868
btreeAccess: pgCatalogCache.pgConstraints,
6969
rowConverter: pgConstraintToRow,
70-
rangeIdx: 0,
71-
nextChan: nil,
7270
}, nil
7371
}
7472

server/tables/pgcatalog/pg_index.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,6 @@ func (p PgIndexHandler) RowIter(ctx *sql.Context, partition sql.Partition) (sql.
6666
rangeConverter: p,
6767
btreeAccess: pgCatalogCache.pgIndexes,
6868
rowConverter: pgIndexToRow,
69-
rangeIdx: 0,
70-
nextChan: nil,
7169
}, nil
7270
}
7371

0 commit comments

Comments
 (0)