Skip to content

Commit cfcd825

Browse files
committed
sstable: handle synthetic prefix when checking bloom filter
Fix the sstable iterators to strip the synthetic prefix before checking the bloom filter. Fix the `TestRandomizedPrefixSuffixRewriter` to check `SeekPrefixGE` and randomly enable filters.
1 parent e195862 commit cfcd825

File tree

5 files changed

+79
-49
lines changed

5 files changed

+79
-49
lines changed

options.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -788,7 +788,8 @@ type Options struct {
788788
// Filters is a map from filter policy name to filter policy. It is used for
789789
// debugging tools which may be used on multiple databases configured with
790790
// different filter policies. It is not necessary to populate this filters
791-
// map during normal usage of a DB.
791+
// map during normal usage of a DB (it will be done automatically by
792+
// EnsureDefaults).
792793
Filters map[string]FilterPolicy
793794

794795
// FlushDelayDeleteRange configures how long the database should wait before

sstable/options.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,8 @@ type ReaderOptions struct {
9898
// Merge defines the Merge function in use for this keyspace.
9999
Merge base.Merge
100100

101-
// Filters is a map from filter policy name to filter policy. It is used for
102-
// debugging tools which may be used on multiple databases configured with
103-
// different filter policies. It is not necessary to populate this filters
104-
// map during normal usage of a DB.
101+
// Filters is a map from filter policy name to filter policy. Filters with
102+
// policies that are not in this map will be ignored.
105103
Filters map[string]FilterPolicy
106104

107105
// Merger defines the associative merge operation to use for merging values

sstable/reader_iter_single_lvl.go

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -786,19 +786,13 @@ func (i *singleLevelIterator) seekPrefixGE(
786786
}
787787
i.lastBloomFilterMatched = false
788788
// Check prefix bloom filter.
789-
var dataH bufferHandle
790-
dataH, i.err = i.reader.readFilter(i.ctx, i.indexFilterRH, i.stats, &i.iterStats)
791-
if i.err != nil {
792-
i.data.invalidate()
793-
return nil
794-
}
795-
mayContain := i.reader.tableFilter.mayContain(dataH.Get(), prefix)
796-
dataH.Release()
797-
if !mayContain {
798-
// This invalidation may not be necessary for correctness, and may
799-
// be a place to optimize later by reusing the already loaded
800-
// block. It was necessary in earlier versions of the code since
801-
// the caller was allowed to call Next when SeekPrefixGE returned
789+
var mayContain bool
790+
mayContain, i.err = i.bloomFilterMayContain(prefix)
791+
if i.err != nil || !mayContain {
792+
// In the i.err == nil case, this invalidation may not be necessary for
793+
// correctness, and may be a place to optimize later by reusing the
794+
// already loaded block. It was necessary in earlier versions of the code
795+
// since the caller was allowed to call Next when SeekPrefixGE returned
802796
// nil. This is no longer allowed.
803797
i.data.invalidate()
804798
return nil
@@ -833,6 +827,27 @@ func (i *singleLevelIterator) seekPrefixGE(
833827
return i.maybeVerifyKey(i.seekGEHelper(key, boundsCmp, flags))
834828
}
835829

830+
func (i *singleLevelIterator) bloomFilterMayContain(prefix []byte) (bool, error) {
831+
// Check prefix bloom filter.
832+
prefixToCheck := prefix
833+
if i.transforms.SyntheticPrefix.IsSet() {
834+
// We have to remove the synthetic prefix.
835+
var ok bool
836+
prefixToCheck, ok = bytes.CutPrefix(prefix, i.transforms.SyntheticPrefix)
837+
if !ok {
838+
// This prefix will not be found inside this table.
839+
return false, nil
840+
}
841+
}
842+
843+
dataH, err := i.reader.readFilter(i.ctx, i.indexFilterRH, i.stats, &i.iterStats)
844+
if err != nil {
845+
return false, err
846+
}
847+
defer dataH.Release()
848+
return i.reader.tableFilter.mayContain(dataH.Get(), prefixToCheck), nil
849+
}
850+
836851
// virtualLast should only be called if i.vReader != nil.
837852
func (i *singleLevelIterator) virtualLast() *base.InternalKV {
838853
if i.vState == nil {

sstable/reader_iter_two_lvl.go

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -390,19 +390,13 @@ func (i *twoLevelIterator) SeekPrefixGE(
390390
flags = flags.DisableTrySeekUsingNext()
391391
}
392392
i.lastBloomFilterMatched = false
393-
var dataH bufferHandle
394-
dataH, i.err = i.reader.readFilter(i.ctx, i.indexFilterRH, i.stats, &i.iterStats)
395-
if i.err != nil {
396-
i.data.invalidate()
397-
return nil
398-
}
399-
mayContain := i.reader.tableFilter.mayContain(dataH.Get(), prefix)
400-
dataH.Release()
401-
if !mayContain {
402-
// This invalidation may not be necessary for correctness, and may
403-
// be a place to optimize later by reusing the already loaded
404-
// block. It was necessary in earlier versions of the code since
405-
// the caller was allowed to call Next when SeekPrefixGE returned
393+
var mayContain bool
394+
mayContain, i.err = i.bloomFilterMayContain(prefix)
395+
if i.err != nil || !mayContain {
396+
// In the i.err == nil case, this invalidation may not be necessary for
397+
// correctness, and may be a place to optimize later by reusing the
398+
// already loaded block. It was necessary in earlier versions of the code
399+
// since the caller was allowed to call Next when SeekPrefixGE returned
406400
// nil. This is no longer allowed.
407401
i.data.invalidate()
408402
return nil

sstable/reader_test.go

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,6 +1029,7 @@ type readCallType int
10291029

10301030
const (
10311031
SeekGE readCallType = iota
1032+
SeekPrefixGE
10321033
Next
10331034
Prev
10341035
SeekLT
@@ -1040,6 +1041,8 @@ func (rct readCallType) String() string {
10401041
switch rct {
10411042
case SeekGE:
10421043
return "SeekGE"
1044+
case SeekPrefixGE:
1045+
return "SeekPrefixGE"
10431046
case Next:
10441047
return "Next"
10451048
case Prev:
@@ -1057,7 +1060,7 @@ func (rct readCallType) String() string {
10571060

10581061
func (rct readCallType) Opposite() readCallType {
10591062
switch rct {
1060-
case SeekGE:
1063+
case SeekGE, SeekPrefixGE:
10611064
return SeekLT
10621065
case Next:
10631066
return Prev
@@ -1136,13 +1139,18 @@ func (rw *readerWorkload) setCallAfterInvalid() {
11361139
}
11371140

11381141
func (rw *readerWorkload) handleInvalid(callType readCallType, iter Iterator) *base.InternalKV {
1139-
switch {
1140-
case (SeekGE == callType || Next == callType || Last == callType):
1142+
switch callType {
1143+
case SeekGE, Next, Last:
11411144
if len(rw.seekKeyAfterInvalid) == 0 {
11421145
return iter.Prev()
11431146
}
11441147
return iter.SeekLT(rw.seekKeyAfterInvalid, base.SeekLTFlagsNone)
1145-
case (SeekLT == callType || Prev == callType || First == callType):
1148+
case SeekPrefixGE:
1149+
if len(rw.seekKeyAfterInvalid) == 0 {
1150+
return iter.First()
1151+
}
1152+
return iter.SeekLT(rw.seekKeyAfterInvalid, base.SeekLTFlagsNone)
1153+
case SeekLT, Prev, First:
11461154
if len(rw.seekKeyAfterInvalid) == 0 {
11471155
return iter.Next()
11481156
}
@@ -1157,6 +1165,17 @@ func (rw *readerWorkload) read(call readCall, iter Iterator) *base.InternalKV {
11571165
switch call.callType {
11581166
case SeekGE:
11591167
return iter.SeekGE(call.seekKey, base.SeekGEFlagsNone)
1168+
case SeekPrefixGE:
1169+
cmp := testkeys.Comparer
1170+
prefix := call.seekKey[:cmp.Split(call.seekKey)]
1171+
kv := iter.SeekPrefixGE(prefix, call.seekKey, base.SeekGEFlagsNone)
1172+
// If there is no key with this prefix to return, SeekPrefixGE might return
1173+
// nil or it might return a larger key. Make it always return nil so we can
1174+
// cross-check results.
1175+
if kv != nil && !cmp.Equal(prefix, kv.K.UserKey[:cmp.Split(kv.K.UserKey)]) {
1176+
return nil
1177+
}
1178+
return kv
11601179
case Next:
11611180
return rw.repeatRead(call, iter)
11621181
case SeekLT:
@@ -1207,8 +1226,7 @@ func createReadWorkload(
12071226
// Sqrt the likelihood of calling First and Last as they're not very interesting.
12081227
callType = readCallType(rng.Intn(int(Last + 1)))
12091228
}
1210-
if callType == SeekLT || callType == SeekGE {
1211-
1229+
if callType == SeekLT || callType == SeekGE || callType == SeekPrefixGE {
12121230
idx := rng.Int63n(int64(ks.MaxLen()))
12131231
ts := rng.Int63n(maxTS) + 1
12141232
key := testkeys.KeyAt(ks, idx, ts)
@@ -1239,11 +1257,10 @@ func createReadWorkload(
12391257
// the same sequence of keys as another iterator initialized with a suffix
12401258
// replacement rule to that fixed timestamp which reads an sst with the same
12411259
// keys and randomized suffixes. The iterator with the suffix replacement rule
1242-
// may also be initialized with a prefix synthenthsis rule while the control file
1260+
// may also be initialized with a prefix synthesis rule while the control file
12431261
// will contain all keys with that prefix. In other words, this is a randomized
12441262
// version of TestBlockSyntheticSuffix and TestBlockSyntheticPrefix.
12451263
func TestRandomizedPrefixSuffixRewriter(t *testing.T) {
1246-
12471264
ks := testkeys.Alpha(3)
12481265

12491266
callCount := 500
@@ -1252,29 +1269,34 @@ func TestRandomizedPrefixSuffixRewriter(t *testing.T) {
12521269
syntheticSuffix := []byte("@" + strconv.Itoa(int(suffix)))
12531270
var syntheticPrefix []byte
12541271

1255-
potentialBlockSize := []int{32, 64, 128, 256}
1256-
potentialRestartInterval := []int{1, 4, 8, 16}
1272+
blockSizeCandidates := []int{32, 64, 128, 256}
1273+
restartIntervalCandidates := []int{1, 4, 8, 16}
1274+
filterPolicyCandidates := []FilterPolicy{nil, bloom.FilterPolicy(1), bloom.FilterPolicy(10)}
12571275

12581276
seed := uint64(time.Now().UnixNano())
12591277
rng := rand.New(rand.NewSource(seed))
12601278
mem := vfs.NewMem()
12611279

1262-
blockSize := potentialBlockSize[rng.Intn(len(potentialBlockSize))]
1263-
restartInterval := potentialRestartInterval[rng.Intn(len(potentialRestartInterval))]
1264-
if rng.Intn(1) == 0 {
1280+
blockSize := blockSizeCandidates[rng.Intn(len(blockSizeCandidates))]
1281+
restartInterval := restartIntervalCandidates[rng.Intn(len(restartIntervalCandidates))]
1282+
filterPolicy := filterPolicyCandidates[rng.Intn(len(filterPolicyCandidates))]
1283+
if rng.Intn(10) < 9 {
12651284
randKey := testkeys.Key(ks, rng.Int63n(ks.Count()))
12661285
// Choose from 3 prefix candidates: "_" sorts before all keys, randKey sorts
12671286
// somewhere between all keys, and "~" sorts after all keys
12681287
prefixCandidates := []string{"~", string(randKey) + "_", "_"}
12691288
syntheticPrefix = []byte(prefixCandidates[rng.Intn(len(prefixCandidates))])
1270-
12711289
}
1272-
t.Logf("Configured Block Size %d, Restart Interval %d, Seed %d, Prefix %s", blockSize, restartInterval, seed, string(syntheticPrefix))
1290+
t.Logf("Configured Block Size %d, Restart Interval %d, Seed %d, Prefix %s, Filter policy %v", blockSize, restartInterval, seed, string(syntheticPrefix), filterPolicy)
12731291

12741292
createIter := func(fileName string, syntheticSuffix SyntheticSuffix, syntheticPrefix SyntheticPrefix) (Iterator, func()) {
12751293
f, err := mem.Open(fileName)
12761294
require.NoError(t, err)
1277-
eReader, err := newReader(f, ReaderOptions{Comparer: testkeys.Comparer})
1295+
opts := ReaderOptions{Comparer: testkeys.Comparer}
1296+
if filterPolicy != nil {
1297+
opts.Filters = map[string]FilterPolicy{filterPolicy.Name(): filterPolicy}
1298+
}
1299+
eReader, err := newReader(f, opts)
12781300
require.NoError(t, err)
12791301
iter, err := eReader.newIterWithBlockPropertyFiltersAndContext(
12801302
context.Background(),
@@ -1299,7 +1321,7 @@ func TestRandomizedPrefixSuffixRewriter(t *testing.T) {
12991321
testCaseName = "two-level"
13001322
}
13011323
t.Run(testCaseName, func(t *testing.T) {
1302-
indexBlockSize := 4096
1324+
indexBlockSize := 10 * 1024 * 1024
13031325
if twoLevelIndex {
13041326
indexBlockSize = 1
13051327
}
@@ -1321,6 +1343,7 @@ func TestRandomizedPrefixSuffixRewriter(t *testing.T) {
13211343
BlockSize: blockSize,
13221344
IndexBlockSize: indexBlockSize,
13231345
Comparer: testkeys.Comparer,
1346+
FilterPolicy: filterPolicy,
13241347
})
13251348

13261349
keyIdx := int64(0)
@@ -1366,7 +1389,6 @@ func TestRandomizedPrefixSuffixRewriter(t *testing.T) {
13661389
alsoCheck: func() {
13671390
require.Nil(t, eIter.Error())
13681391
require.Nil(t, iter.Error())
1369-
13701392
},
13711393
}
13721394
for _, call := range w.calls {

0 commit comments

Comments
 (0)