Skip to content

Commit d30996a

Browse files
committed
iterv2: introduce a new iterator stack (off by default)
We introduce a new iterator abstraction along with a new merging iterator, level iterator, interleaving iterator stack. This abstraction allows presenting the next span boundary at any point, which allows the merging iterator to (conceptually) divide the key space into "slabs". The slab approach will make the addition of new rangedel-type keys much easier. The boundary also naturally allows "stopping points" in the level iterator without a special contract with the merging iterator. The new stack can be used by setting `iterv2.Enabled`. The stack functions correctly (it passed a few hours of the metamorphic test). It is still a prototype in that it does not yet have many of the optimizations the existing stack has (like try-seek-using-next). The integration of the v2 stack also needs some optimization in terms of allocations. Most of the code inside the iterators was written by me. Claude helped with comments and with limited bits and pieces but it was generating much more complicated code for the core logic. In all fairness, I also had to go back and rewrite most of it a few times. Suggestion on what to review: - `iterv2.Iter` semantics; - `iterv2.TestIter` implementation. While this is only used for testing, it serves as a formal codification of the semantics (with a fairly simple implementation). All the new iterators are tested against the `TestIter`, so it's less important to review the actual iterator implementations; - `iterv2.InterleavingIter` struct and `Init()` comments; - `levelIterV2` struct comment; - `mergingIterV2` struct comment; - `slabState` interface (`BuildForward`).
1 parent a267c0a commit d30996a

33 files changed

+7019
-10
lines changed

checkpoint_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/cockroachdb/crlib/testutils/leaktest"
2020
"github.com/cockroachdb/datadriven"
2121
"github.com/cockroachdb/pebble/internal/base"
22+
"github.com/cockroachdb/pebble/internal/iterv2"
2223
"github.com/cockroachdb/pebble/internal/testutils"
2324
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
2425
"github.com/cockroachdb/pebble/objstorage/remote"
@@ -281,6 +282,9 @@ func TestCopyCheckpointOptions(t *testing.T) {
281282
func TestCheckpoint(t *testing.T) {
282283
defer leaktest.AfterTest(t)()
283284
t.Run("shared=false", func(t *testing.T) {
285+
if iterv2.Enabled {
286+
t.Skipf("file open timing changes with iterv2")
287+
}
284288
testCheckpointImpl(t, "testdata/checkpoint", false /* createOnShared */)
285289
})
286290
t.Run("shared=true", func(t *testing.T) {

db.go

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/cockroachdb/pebble/internal/inflight"
2626
"github.com/cockroachdb/pebble/internal/invalidating"
2727
"github.com/cockroachdb/pebble/internal/invariants"
28+
"github.com/cockroachdb/pebble/internal/iterv2"
2829
"github.com/cockroachdb/pebble/internal/keyspan"
2930
"github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
3031
"github.com/cockroachdb/pebble/internal/manifest"
@@ -1203,7 +1204,8 @@ func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator {
12031204
useLazyCombinedIteration := dbi.rangeKey == nil &&
12041205
dbi.opts.KeyTypes == IterKeyTypePointsAndRanges &&
12051206
(dbi.batch == nil || dbi.batch.batch.countRangeKeys == 0) &&
1206-
!dbi.opts.disableLazyCombinedIteration
1207+
!dbi.opts.disableLazyCombinedIteration &&
1208+
!iterv2.Enabled
12071209
if useLazyCombinedIteration {
12081210
// The user requested combined iteration, and there's no indexed
12091211
// batch currently containing range keys that would prevent lazy
@@ -1309,6 +1311,11 @@ func (i *Iterator) constructPointIter(
13091311
internalOpts.boundLimitedFilter = &i.rangeKeyMasking
13101312
}
13111313

1314+
if iterv2.Enabled {
1315+
i.constructPointIterV2(ctx, memtables, buf, internalOpts)
1316+
return
1317+
}
1318+
13121319
// Merging levels and levels from iterAlloc.
13131320
mlevels := buf.mlevels[:0]
13141321
levels := buf.levels[:0]
@@ -1429,6 +1436,102 @@ func (i *Iterator) constructPointIter(
14291436
i.merging = &buf.merging
14301437
}
14311438

1439+
// constructPointIterV2 builds the point iterator stack using mergingIterV2 and
1440+
// levelIterV2/InterleavingIter children.
1441+
func (i *Iterator) constructPointIterV2(
1442+
ctx context.Context, memtables flushableList, buf *iterAlloc, internalOpts internalIterOpts,
1443+
) {
1444+
var v2iters []iterv2.Iter
1445+
1446+
// 1. Batch iterator (if any).
1447+
if i.batch != nil {
1448+
if i.batch.batch.index == nil {
1449+
panic(errors.AssertionFailedf("creating an iterator over an unindexed batch"))
1450+
}
1451+
i.batch.batch.initInternalIter(&i.opts, &i.batch.pointIter)
1452+
i.batch.batch.initRangeDelIter(&i.opts, &i.batch.rangeDelIter, i.batch.batchSeqNum)
1453+
var rangeDelIter keyspan.FragmentIterator
1454+
if i.batch.rangeDelIter.Count() > 0 {
1455+
rangeDelIter = &i.batch.rangeDelIter
1456+
}
1457+
iiter := new(iterv2.InterleavingIter)
1458+
iiter.Init(i.comparer,
1459+
&iterv2.PointKeyFilter{Iter: &i.batch.pointIter},
1460+
rangeDelIter,
1461+
nil, nil, // startKey, endKey: unbounded
1462+
i.opts.LowerBound, i.opts.UpperBound)
1463+
v2iters = append(v2iters, iiter)
1464+
}
1465+
1466+
if !i.batchOnlyIter {
1467+
// 2. Memtable iterators (newest to oldest).
1468+
for j := len(memtables) - 1; j >= 0; j-- {
1469+
mem := memtables[j]
1470+
if fi, ok := mem.flushable.(*ingestedFlushable); ok {
1471+
flushableLevelIter, flushableRangeDelIter := fi.newItersV2(&i.opts, internalOpts)
1472+
v2iters = append(v2iters, flushableLevelIter)
1473+
if flushableRangeDelIter != nil {
1474+
v2iters = append(v2iters, flushableRangeDelIter)
1475+
}
1476+
continue
1477+
}
1478+
pointIter := mem.newIter(&i.opts)
1479+
// Apparently the memtable iterator interleaves range key span boundaries.
1480+
// Filter them out using a PointKeyFilter.
1481+
pointIter = &iterv2.PointKeyFilter{Iter: pointIter}
1482+
rangeDelIter := mem.newRangeDelIter(&i.opts)
1483+
iiter := new(iterv2.InterleavingIter)
1484+
iiter.Init(i.comparer, pointIter, rangeDelIter,
1485+
nil, nil, // startKey, endKey: unbounded
1486+
i.opts.LowerBound, i.opts.UpperBound)
1487+
v2iters = append(v2iters, iiter)
1488+
}
1489+
1490+
// 3. Level iterators: L0 sublevels followed by L1+.
1491+
var current *manifest.Version
1492+
if i.version != nil {
1493+
current = i.version
1494+
} else {
1495+
current = i.readState.current
1496+
}
1497+
i.opts.snapshotForHideObsoletePoints = buf.dbi.seqNum
1498+
1499+
addLevelIterV2 := func(files manifest.LevelIterator, layer manifest.Layer) {
1500+
// Filter to point-key files only; range-key-only files are handled
1501+
// by the separate range key iterator.
1502+
files = files.Filter(manifest.KeyTypePoint)
1503+
li := newLevelIterV2(ctx, i.opts, i.comparer, i.newIters, files, layer, internalOpts)
1504+
v2iters = append(v2iters, li)
1505+
}
1506+
1507+
// L0 sublevels, newest to oldest.
1508+
for idx := len(current.L0SublevelFiles) - 1; idx >= 0; idx-- {
1509+
addLevelIterV2(current.L0SublevelFiles[idx].Iter(), manifest.L0Sublevel(idx))
1510+
}
1511+
// L1+.
1512+
for level := 1; level < len(current.Levels); level++ {
1513+
if current.Levels[level].Empty() {
1514+
continue
1515+
}
1516+
addLevelIterV2(current.Levels[level].Iter(), manifest.Level(level))
1517+
}
1518+
}
1519+
for j := range v2iters {
1520+
v2iters[j] = iterv2.MaybeWrapInInvalidating(v2iters[j])
1521+
}
1522+
1523+
m := newMergingIterV2(i.comparer.Compare, i.comparer.Split, i.seqNum, v2iters...)
1524+
m.lower = i.opts.LowerBound
1525+
m.upper = i.opts.UpperBound
1526+
m.logger = i.opts.getLogger()
1527+
m.stats = &i.stats.InternalStats
1528+
if i.batch != nil {
1529+
m.slab.batchSnapshot = i.batch.batchSeqNum
1530+
}
1531+
i.pointIter = invalidating.MaybeWrapIfInvariants(m).(topLevelIterator)
1532+
i.mergingV2 = m
1533+
}
1534+
14321535
// NewBatch returns a new empty write-only batch. Any reads on the batch will
14331536
// return an error. If the batch is committed it will be applied to the DB.
14341537
func (d *DB) NewBatch(opts ...BatchOption) *Batch {

flushable.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/cockroachdb/errors"
1313
"github.com/cockroachdb/pebble/internal/base"
1414
"github.com/cockroachdb/pebble/internal/invariants"
15+
"github.com/cockroachdb/pebble/internal/iterv2"
1516
"github.com/cockroachdb/pebble/internal/keyspan"
1617
"github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
1718
"github.com/cockroachdb/pebble/internal/manifest"
@@ -262,6 +263,42 @@ func (s *ingestedFlushable) newIterInternal(o *IterOptions, iio internalIterOpts
262263
)
263264
}
264265

266+
func (s *ingestedFlushable) newItersV2(
267+
o *IterOptions, iio internalIterOpts,
268+
) (levelIter, rangeDelIter iterv2.Iter) {
269+
var opts IterOptions
270+
if o != nil {
271+
opts = *o
272+
}
273+
files := s.slice.Iter()
274+
levelIter = newLevelIterV2(
275+
context.Background(),
276+
opts,
277+
s.comparer,
278+
s.newIters,
279+
files.Filter(manifest.KeyTypePoint),
280+
manifest.FlushableIngestsLayer(),
281+
iio,
282+
)
283+
if s.exciseSpan.Valid() {
284+
// We have an excise span; we will set it up as a separate level.
285+
rdel := keyspan.Span{
286+
Start: s.exciseSpan.Start,
287+
End: s.exciseSpan.End,
288+
Keys: []keyspan.Key{{Trailer: base.MakeTrailer(s.exciseSeqNum, base.InternalKeyKindRangeDelete)}},
289+
}
290+
iiter := &iterv2.InterleavingIter{}
291+
iiter.Init(
292+
s.comparer,
293+
base.NewFakeIterWithCmp(s.comparer.Compare, nil),
294+
keyspan.NewIter(s.comparer.Compare, []keyspan.Span{rdel}),
295+
nil, nil, nil, nil,
296+
)
297+
rangeDelIter = iiter
298+
}
299+
return levelIter, rangeDelIter
300+
}
301+
265302
// newFlushIter is part of the flushable interface.
266303
func (s *ingestedFlushable) newFlushIter(*IterOptions) internalIterator {
267304
// newFlushIter is only used for writing memtables to disk as sstables.

ingest_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2213,9 +2213,6 @@ func TestIngest(t *testing.T) {
22132213
if runtime.GOARCH == "386" {
22142214
t.Skip("skipped on 32-bit due to slightly varied output")
22152215
}
2216-
defer func() {
2217-
require.NoError(t, d.Close())
2218-
}()
22192216

22202217
reset := func(split bool) {
22212218
if d != nil {
@@ -2330,6 +2327,9 @@ func TestIngest(t *testing.T) {
23302327
return fmt.Sprintf("unknown command: %s", td.Cmd)
23312328
}
23322329
})
2330+
if d != nil {
2331+
require.NoError(t, d.Close())
2332+
}
23332333
}
23342334

23352335
// linkAndRemovePredicate is like errorfs.InjectIndex, and injects only on the

internal/base/comparer.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,16 @@ func (c *Comparer) EnsureDefaults() *Comparer {
300300
return n
301301
}
302302

303+
// Prefix returns the prefix of the given key.
304+
func (c *Comparer) Prefix(key []byte) []byte {
305+
return c.Split.Prefix(key)
306+
}
307+
308+
// HasPrefix returns true if the key's prefix equals the given prefix.
309+
func (c *Comparer) HasPrefix(key, prefix []byte) bool {
310+
return bytes.Equal(c.Split.Prefix(key), prefix)
311+
}
312+
303313
// DefaultComparer is the default implementation of the Comparer interface.
304314
// It uses the natural ordering, consistent with bytes.Compare.
305315
var DefaultComparer = &Comparer{

internal/base/internal.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,12 @@ const (
181181
// IDs (varint count + varint blob file IDs).
182182
InternalKeyKindIngestSSTWithBlobs InternalKeyKind = 26
183183

184+
// InternalKeyKindSpanBoundary is a synthetic key generated by iterators to
185+
// stop at span boundaries. See iterv2.Iter.
186+
//
187+
// It is important that this key is larger than any other valid point key.
188+
InternalKeyKindSpanBoundary InternalKeyKind = 30
189+
184190
// This maximum value isn't part of the file format. Future extensions may
185191
// increase this value.
186192
//
@@ -190,7 +196,7 @@ const (
190196
// which sorts 'less than or equal to' any other valid internalKeyKind, when
191197
// searching for any kind of internal key formed by a certain user key and
192198
// seqNum.
193-
InternalKeyKindMax InternalKeyKind = 26
199+
InternalKeyKindMax InternalKeyKind = 30
194200

195201
// InternalKeyKindMaxForSSTable is the largest valid key kind that can exist
196202
// in an SSTable. This should usually equal InternalKeyKindMax, except
@@ -244,6 +250,7 @@ var internalKeyKindNames = []string{
244250
InternalKeyKindExcise: "EXCISE",
245251
InternalKeyKindSyntheticKey: "SYNTHETIC",
246252
InternalKeyKindIngestSSTWithBlobs: "INGESTSSTWITHBLOB",
253+
InternalKeyKindSpanBoundary: "BDRY",
247254
InternalKeyKindInvalid: "INVALID",
248255
}
249256

@@ -362,6 +369,7 @@ var kindsMap = map[string]InternalKeyKind{
362369
"EXCISE": InternalKeyKindExcise,
363370
"SYNTHETIC": InternalKeyKindSyntheticKey,
364371
"INGESTSSTWITHBLOB": InternalKeyKindIngestSSTWithBlobs,
372+
"BDRY": InternalKeyKindSpanBoundary,
365373
}
366374

367375
// ParseSeqNum parses the string representation of a sequence number.

0 commit comments

Comments
 (0)