Skip to content

Commit 7cdedc8

Browse files
committed
db: fix cases where SeekPrefixGE prefix doesn't match key
This commit adds `SeekPrefixGE` assertions verifying that the `prefix` actually is the prefix for the `key`. This was not always the case so the offending code paths are adjusted. In the future, we should create a wrapper iterator that verifies this sort of thing. Informs #3794
1 parent 65445b8 commit 7cdedc8

File tree

7 files changed

+98
-41
lines changed

7 files changed

+98
-41
lines changed

internal/base/comparer.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,12 @@ func (s Split) Prefix(k []byte) []byte {
159159
return k[:i:i]
160160
}
161161

162+
// HasPrefix returns true if the given key has the given prefix.
163+
func (s Split) HasPrefix(prefix, key []byte) bool {
164+
i := s(key)
165+
return bytes.Equal(prefix, key[:i:i])
166+
}
167+
162168
// DefaultSplit is a trivial implementation of Split which always returns the
163169
// full key.
164170
var DefaultSplit Split = func(key []byte) int { return len(key) }

iterator.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1476,23 +1476,18 @@ func (i *Iterator) SeekPrefixGE(key []byte) bool {
14761476
}
14771477
// Make a copy of the prefix so that modifications to the key after
14781478
// SeekPrefixGE returns does not affect the stored prefix.
1479-
if cap(i.prefixOrFullSeekKey) < prefixLen {
1480-
i.prefixOrFullSeekKey = make([]byte, prefixLen)
1481-
} else {
1482-
i.prefixOrFullSeekKey = i.prefixOrFullSeekKey[:prefixLen]
1483-
}
14841479
i.hasPrefix = true
1485-
copy(i.prefixOrFullSeekKey, keyPrefix)
1480+
i.prefixOrFullSeekKey = append(i.prefixOrFullSeekKey[:0], keyPrefix...)
14861481

14871482
if lowerBound := i.opts.GetLowerBound(); lowerBound != nil && i.cmp(key, lowerBound) < 0 {
1488-
if p := i.comparer.Split.Prefix(lowerBound); !bytes.Equal(i.prefixOrFullSeekKey, p) {
1483+
if !i.comparer.Split.HasPrefix(i.prefixOrFullSeekKey, lowerBound) {
14891484
i.err = errors.New("pebble: SeekPrefixGE supplied with key outside of lower bound")
14901485
i.iterValidityState = IterExhausted
14911486
return false
14921487
}
14931488
key = lowerBound
14941489
} else if upperBound := i.opts.GetUpperBound(); upperBound != nil && i.cmp(key, upperBound) > 0 {
1495-
if p := i.comparer.Split.Prefix(upperBound); !bytes.Equal(i.prefixOrFullSeekKey, p) {
1490+
if !i.comparer.Split.HasPrefix(i.prefixOrFullSeekKey, upperBound) {
14961491
i.err = errors.New("pebble: SeekPrefixGE supplied with key outside of upper bound")
14971492
i.iterValidityState = IterExhausted
14981493
return false

level_iter.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,9 @@ func (l *levelIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV
655655
}
656656

657657
func (l *levelIter) SeekPrefixGE(prefix, key []byte, flags base.SeekGEFlags) *base.InternalKV {
658+
if invariants.Enabled && !l.split.HasPrefix(prefix, key) {
659+
panic(fmt.Sprintf("invalid prefix %q for key %q", prefix, key))
660+
}
658661
if invariants.Enabled && l.lower != nil && l.cmp(key, l.lower) < 0 {
659662
panic(errors.AssertionFailedf("levelIter SeekGE to key %q violates lower bound %q", key, l.lower))
660663
}

merging_iter.go

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -524,11 +524,9 @@ func (m *mergingIter) nextEntry(l *mergingIterLevel, succKey []byte) error {
524524
// P2. Care is taken to avoid ever advancing the iterator beyond the current
525525
// prefix. If nextEntry is ever invoked while we're already beyond the
526526
// current prefix, we're violating the invariant.
527-
if invariants.Enabled && m.prefix != nil {
528-
if p := m.split.Prefix(l.iterKV.K.UserKey); !bytes.Equal(m.prefix, p) {
529-
m.logger.Fatalf("mergingIter: prefix violation: nexting beyond prefix %q; existing heap root %q\n%s",
530-
m.prefix, l.iterKV, debug.Stack())
531-
}
527+
if invariants.Enabled && m.prefix != nil && !m.split.HasPrefix(m.prefix, l.iterKV.K.UserKey) {
528+
m.logger.Fatalf("mergingIter: prefix violation: nexting beyond prefix %q; existing heap root %q\n%s",
529+
m.prefix, l.iterKV, debug.Stack())
532530
}
533531

534532
oldTopLevel := l.index
@@ -905,6 +903,10 @@ func (m *mergingIter) findPrevEntry() *base.InternalKV {
905903
//
906904
// If an error occurs, seekGE returns the error without setting m.err.
907905
func (m *mergingIter) seekGE(key []byte, level int, flags base.SeekGEFlags) error {
906+
if invariants.Enabled && m.lower != nil && m.heap.cmp(key, m.lower) < 0 {
907+
m.logger.Fatalf("mergingIter: lower bound violation: %s < %s\n%s", key, m.lower, debug.Stack())
908+
}
909+
908910
// When seeking, we can use tombstones to adjust the key we seek to on each
909911
// level. Consider the series of range tombstones:
910912
//
@@ -957,15 +959,11 @@ func (m *mergingIter) seekGE(key []byte, level int, flags base.SeekGEFlags) erro
957959
}
958960

959961
for ; level < len(m.levels); level++ {
960-
if invariants.Enabled && m.lower != nil && m.heap.cmp(key, m.lower) < 0 {
961-
m.logger.Fatalf("mergingIter: lower bound violation: %s < %s\n%s", key, m.lower, debug.Stack())
962-
}
963-
964962
l := &m.levels[level]
965963
if m.prefix != nil {
966964
l.iterKV = l.iter.SeekPrefixGE(m.prefix, key, flags)
967965
if l.iterKV != nil {
968-
if !bytes.Equal(m.prefix, m.split.Prefix(l.iterKV.K.UserKey)) {
966+
if !m.split.HasPrefix(m.prefix, l.iterKV.K.UserKey) {
969967
// Prevent keys without a matching prefix from being added to the heap by setting
970968
// iterKey and iterValue to their zero values before calling initMinHeap.
971969
l.iterKV = nil
@@ -999,7 +997,21 @@ func (m *mergingIter) seekGE(key []byte, level int, flags base.SeekGEFlags) erro
999997
// Based on the containment condition tombstone.End > key, so
1000998
// the assignment to key results in a monotonically
1001999
// non-decreasing key across iterations of this loop.
1002-
//
1000+
if m.prefix != nil && !m.split.HasPrefix(m.prefix, l.tombstone.End) {
1001+
// Any keys with m.prefix on subsequent levels are under the tombstone.
1002+
// We still need to perform the seeks, in case the next seek uses
1003+
// the TrySeekUsingNext flag.
1004+
for level++; level < len(m.levels); level++ {
1005+
l := &m.levels[level]
1006+
if kv := l.iter.SeekPrefixGE(m.prefix, key, flags); kv == nil {
1007+
if err := l.iter.Error(); err != nil {
1008+
return err
1009+
}
1010+
}
1011+
l.iterKV = nil
1012+
}
1013+
break
1014+
}
10031015
// The adjustment of key here can only move it to a larger key.
10041016
// Since the caller of seekGE guaranteed that the original key
10051017
// was greater than or equal to m.lower, the new key will
@@ -1037,17 +1049,18 @@ func (m *mergingIter) SeekPrefixGE(prefix, key []byte, flags base.SeekGEFlags) *
10371049
func (m *mergingIter) SeekPrefixGEStrict(
10381050
prefix, key []byte, flags base.SeekGEFlags,
10391051
) *base.InternalKV {
1052+
if invariants.Enabled && !m.split.HasPrefix(prefix, key) {
1053+
panic(fmt.Sprintf("invalid prefix %q for key %q", prefix, key))
1054+
}
10401055
m.prefix = prefix
10411056
m.err = m.seekGE(key, 0 /* start level */, flags)
10421057
if m.err != nil {
10431058
return nil
10441059
}
10451060

10461061
iterKV := m.findNextEntry()
1047-
if invariants.Enabled && iterKV != nil {
1048-
if !bytes.Equal(m.prefix, m.split.Prefix(iterKV.K.UserKey)) {
1049-
m.logger.Fatalf("mergingIter: prefix violation: returning key %q without prefix %q\n", iterKV, m.prefix)
1050-
}
1062+
if invariants.Enabled && iterKV != nil && !m.split.HasPrefix(m.prefix, iterKV.K.UserKey) {
1063+
m.logger.Fatalf("mergingIter: prefix violation: returning key %q without prefix %q\n", iterKV, m.prefix)
10511064
}
10521065
return iterKV
10531066
}

sstable/reader_iter_single_lvl.go

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,13 @@ type singleLevelIterator struct {
161161
// present, should be used for prefix seeks or not. In some cases it is
162162
// beneficial to skip a filter block even if it exists (eg. if probability of
163163
// a match is high).
164-
useFilterBlock bool
165-
lastBloomFilterMatched bool
164+
useFilterBlock bool
165+
166+
// didNotPositionOnLastSeekGE is set to true if we completed a call to SeekGE
167+
// or SeekPrefixGE without positioning the iterator internally. If this flag
168+
// is set, the TrySeekUsingNext optimization is disabled on the next seek.
169+
// This happens for example when the bloom filter excludes a prefix.
170+
didNotPositionOnLastSeekGE bool
166171

167172
transforms IterTransforms
168173

@@ -639,6 +644,11 @@ func (i *singleLevelIterator) SeekGE(key []byte, flags base.SeekGEFlags) *base.I
639644
key = i.lower
640645
}
641646
}
647+
if i.didNotPositionOnLastSeekGE {
648+
// Iterator is not positioned based on last seek.
649+
flags = flags.DisableTrySeekUsingNext()
650+
i.didNotPositionOnLastSeekGE = false
651+
}
642652

643653
if flags.TrySeekUsingNext() {
644654
// The i.exhaustedBounds comparison indicates that the upper bound was
@@ -791,6 +801,12 @@ func (i *singleLevelIterator) SeekPrefixGE(
791801
// TODO(bananabrick): We can optimize away this check for the level iter
792802
// if necessary.
793803
if i.cmp(key, i.lower) < 0 {
804+
if !i.reader.Split.HasPrefix(prefix, i.lower) {
805+
i.err = nil // clear any cached iteration error
806+
// Disable the TrySeekUsingNext optimization next time.
807+
i.didNotPositionOnLastSeekGE = true
808+
return nil
809+
}
794810
key = i.lower
795811
}
796812
}
@@ -800,18 +816,22 @@ func (i *singleLevelIterator) SeekPrefixGE(
800816
func (i *singleLevelIterator) seekPrefixGE(
801817
prefix, key []byte, flags base.SeekGEFlags,
802818
) (kv *base.InternalKV) {
819+
if invariants.Enabled && !i.reader.Split.HasPrefix(prefix, key) {
820+
panic(fmt.Sprintf("invalid prefix %q for key %q", prefix, key))
821+
}
822+
if i.didNotPositionOnLastSeekGE {
823+
// Iterator is not positioned based on last seek.
824+
flags = flags.DisableTrySeekUsingNext()
825+
i.didNotPositionOnLastSeekGE = false
826+
}
827+
803828
// NOTE: prefix is only used for bloom filter checking and not later work in
804829
// this method. Hence, we can use the existing iterator position if the last
805830
// SeekPrefixGE did not fail bloom filter matching.
806831

807832
err := i.err
808833
i.err = nil // clear cached iteration error
809834
if i.useFilterBlock {
810-
if !i.lastBloomFilterMatched {
811-
// Iterator is not positioned based on last seek.
812-
flags = flags.DisableTrySeekUsingNext()
813-
}
814-
i.lastBloomFilterMatched = false
815835
// Check prefix bloom filter.
816836
var mayContain bool
817837
mayContain, i.err = i.bloomFilterMayContain(prefix)
@@ -822,9 +842,10 @@ func (i *singleLevelIterator) seekPrefixGE(
822842
// since the caller was allowed to call Next when SeekPrefixGE returned
823843
// nil. This is no longer allowed.
824844
i.data.Invalidate()
845+
// Disable the TrySeekUsingNext optimization next time.
846+
i.didNotPositionOnLastSeekGE = true
825847
return nil
826848
}
827-
i.lastBloomFilterMatched = true
828849
}
829850
if flags.TrySeekUsingNext() {
830851
// The i.exhaustedBounds comparison indicates that the upper bound was

sstable/reader_iter_two_lvl.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,12 @@ type twoLevelIterator struct {
2323
// useFilterBlock controls whether we consult the bloom filter in the
2424
// twoLevelIterator code. Note that secondLevel.useFilterBlock is always
2525
// false - any filtering happens at the top level.
26-
useFilterBlock bool
27-
lastBloomFilterMatched bool
26+
useFilterBlock bool
27+
// didNotPositionOnLastSeekGE is set to true if we completed a call to SeekGE
28+
// or SeekPrefixGE without positioning the iterator internally. If this flag
29+
// is set, the TrySeekUsingNext optimization is disabled on the next seek.
30+
// This happens for example when the bloom filter excludes a prefix.
31+
didNotPositionOnLastSeekGE bool
2832
}
2933

3034
var _ Iterator = (*twoLevelIterator)(nil)
@@ -336,9 +340,19 @@ func (i *twoLevelIterator) SeekPrefixGE(
336340
// TODO(bananabrick): We can optimize away this check for the level iter
337341
// if necessary.
338342
if i.secondLevel.cmp(key, i.secondLevel.lower) < 0 {
343+
if !i.secondLevel.reader.Split.HasPrefix(prefix, i.secondLevel.lower) {
344+
i.secondLevel.err = nil // clear any cached iteration error
345+
// Disable the TrySeekUsingNext optimization next time.
346+
i.didNotPositionOnLastSeekGE = true
347+
return nil
348+
}
339349
key = i.secondLevel.lower
340350
}
341351
}
352+
if i.didNotPositionOnLastSeekGE {
353+
flags = flags.DisableTrySeekUsingNext()
354+
i.didNotPositionOnLastSeekGE = false
355+
}
342356

343357
// NOTE: prefix is only used for bloom filter checking and not later work in
344358
// this method. Hence, we can use the existing iterator position if the last
@@ -350,8 +364,7 @@ func (i *twoLevelIterator) SeekPrefixGE(
350364
// The twoLevelIterator could be already exhausted. Utilize that when
351365
// trySeekUsingNext is true. See the comment about data-exhausted, PGDE, and
352366
// bounds-exhausted near the top of the file.
353-
filterUsedAndDidNotMatch := i.useFilterBlock && !i.lastBloomFilterMatched
354-
if flags.TrySeekUsingNext() && !filterUsedAndDidNotMatch &&
367+
if flags.TrySeekUsingNext() &&
355368
(i.secondLevel.exhaustedBounds == +1 || (i.secondLevel.data.IsDataInvalidated() && i.secondLevel.index.IsDataInvalidated())) &&
356369
err == nil {
357370
// Already exhausted, so return nil.
@@ -360,11 +373,6 @@ func (i *twoLevelIterator) SeekPrefixGE(
360373

361374
// Check prefix bloom filter.
362375
if i.useFilterBlock {
363-
if !i.lastBloomFilterMatched {
364-
// Iterator is not positioned based on last seek.
365-
flags = flags.DisableTrySeekUsingNext()
366-
}
367-
i.lastBloomFilterMatched = false
368376
var mayContain bool
369377
mayContain, i.secondLevel.err = i.secondLevel.bloomFilterMayContain(prefix)
370378
if i.secondLevel.err != nil || !mayContain {
@@ -374,9 +382,10 @@ func (i *twoLevelIterator) SeekPrefixGE(
374382
// since the caller was allowed to call Next when SeekPrefixGE returned
375383
// nil. This is no longer allowed.
376384
i.secondLevel.data.Invalidate()
385+
// Disable the TrySeekUsingNext optimization next time.
386+
i.didNotPositionOnLastSeekGE = true
377387
return nil
378388
}
379-
i.lastBloomFilterMatched = true
380389
}
381390

382391
// Bloom filter matches.

sstable/testdata/virtual_reader_iter

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ bounds: [dd#5,SET-ddd#6,SET]
7777
# Check lower bound enforcement during SeekPrefixGE.
7878
iter
7979
seek-prefix-ge d
80+
----
81+
.
82+
83+
iter
84+
seek-prefix-ge dd
8085
next
8186
next
8287
----
@@ -292,6 +297,11 @@ bounds: [dd#5,SET-ddd#6,SET]
292297
# Check lower bound enforcement during SeekPrefixGE.
293298
iter
294299
seek-prefix-ge d
300+
----
301+
.
302+
303+
iter
304+
seek-prefix-ge dd
295305
next
296306
next
297307
----

0 commit comments

Comments
 (0)