Skip to content

Commit ed42fb4

Browse files
committed
db: don't load large filter blocks for flushable ingests
We have seen cases where a very large file is ingested as flushable, and reads block on loading a very large bloom filter. This PR sets a size limit of 64KB for bloom filter blocks for ingested flushables. We achieve this by extending `manifest.Level` to be able to represent flushable ingests as a "pseudo-level" and changing the `useFilterBlock` flag for new iterator creation to a `filterBlockSizeLimit` value. Fixes #3787
1 parent ca2b210 commit ed42fb4

20 files changed

+251
-103
lines changed

data_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,23 @@ func runBatchDefineCmd(d *datadriven.TestData, b *Batch) error {
443443
return errors.Errorf("%s expects 2 arguments", parts[0])
444444
}
445445
err = b.Set([]byte(parts[1]), parseValue(parts[2]), nil)
446+
447+
case "set-multiple":
448+
if len(parts) != 3 {
449+
return errors.Errorf("%s expects 2 arguments (n and prefix)", parts[0])
450+
}
451+
n, err := strconv.ParseUint(parts[1], 10, 32)
452+
if err != nil {
453+
return err
454+
}
455+
for i := uint64(0); i < n; i++ {
456+
key := fmt.Sprintf("%s-%05d", parts[2], i)
457+
val := fmt.Sprintf("val-%05d", i)
458+
if err := b.Set([]byte(key), []byte(val), nil); err != nil {
459+
return err
460+
}
461+
}
462+
446463
case "del":
447464
if len(parts) != 2 {
448465
return errors.Errorf("%s expects 1 argument", parts[0])

external_iterator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ func createExternalPointIter(ctx context.Context, it *Iterator) (topLevelIterato
160160
seqNum--
161161
pointIter, err = r.NewIterWithBlockPropertyFiltersAndContextEtc(
162162
ctx, transforms, it.opts.LowerBound, it.opts.UpperBound, nil, /* BlockPropertiesFilterer */
163-
false, /* useFilterBlock */
163+
sstable.NeverUseFilterBlock,
164164
&it.stats.InternalStats, it.opts.CategoryAndQoS, nil,
165165
sstable.TrivialReaderProvider{Reader: r})
166166
if err != nil {

flushable.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -214,12 +214,8 @@ func (s *ingestedFlushable) newIter(o *IterOptions) internalIterator {
214214
if o != nil {
215215
opts = *o
216216
}
217-
// TODO(bananabrick): The manifest.Level in newLevelIter is only used for
218-
// logging. Update the manifest.Level encoding to account for levels which
219-
// aren't truly levels in the lsm. Right now, the encoding only supports
220-
// L0 sublevels, and the rest of the levels in the lsm.
221217
return newLevelIter(
222-
context.Background(), opts, s.comparer, s.newIters, s.slice.Iter(), manifest.Level(0),
218+
context.Background(), opts, s.comparer, s.newIters, s.slice.Iter(), manifest.FlushableIngestLevel(),
223219
internalIterOpts{},
224220
)
225221
}
@@ -252,7 +248,7 @@ func (s *ingestedFlushable) newRangeDelIter(_ *IterOptions) keyspan.FragmentIter
252248
return keyspanimpl.NewLevelIter(
253249
context.TODO(),
254250
keyspan.SpanIterOptions{}, s.comparer.Compare,
255-
s.constructRangeDelIter, s.slice.Iter(), manifest.Level(0),
251+
s.constructRangeDelIter, s.slice.Iter(), manifest.FlushableIngestLevel(),
256252
manifest.KeyTypePoint,
257253
)
258254
}
@@ -266,7 +262,7 @@ func (s *ingestedFlushable) newRangeKeyIter(o *IterOptions) keyspan.FragmentIter
266262
return keyspanimpl.NewLevelIter(
267263
context.TODO(),
268264
keyspan.SpanIterOptions{}, s.comparer.Compare, s.newRangeKeyIters,
269-
s.slice.Iter(), manifest.Level(0), manifest.KeyTypeRange,
265+
s.slice.Iter(), manifest.FlushableIngestLevel(), manifest.KeyTypeRange,
270266
)
271267
}
272268

ingest_test.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/cockroachdb/datadriven"
2626
"github.com/cockroachdb/errors"
2727
"github.com/cockroachdb/errors/oserror"
28+
"github.com/cockroachdb/pebble/bloom"
2829
"github.com/cockroachdb/pebble/internal/base"
2930
"github.com/cockroachdb/pebble/internal/keyspan"
3031
"github.com/cockroachdb/pebble/internal/manifest"
@@ -427,12 +428,17 @@ func TestOverlappingIngestedSSTs(t *testing.T) {
427428
closed = false
428429
blockFlush = false
429430
)
431+
cache := NewCache(0)
430432
defer func() {
431433
if !closed {
432434
require.NoError(t, d.Close())
433435
}
436+
cache.Unref()
434437
}()
435-
438+
var fsLog struct {
439+
sync.Mutex
440+
buf bytes.Buffer
441+
}
436442
reset := func(strictMem bool) {
437443
if d != nil && !closed {
438444
require.NoError(t, d.Close())
@@ -447,14 +453,27 @@ func TestOverlappingIngestedSSTs(t *testing.T) {
447453

448454
require.NoError(t, mem.MkdirAll("ext", 0755))
449455
opts = (&Options{
450-
FS: mem,
456+
FS: vfs.WithLogging(mem, func(format string, args ...interface{}) {
457+
fsLog.Lock()
458+
defer fsLog.Unlock()
459+
fmt.Fprintf(&fsLog.buf, format+"\n", args...)
460+
}),
461+
Cache: cache,
451462
MemTableStopWritesThreshold: 4,
452463
L0CompactionThreshold: 100,
453464
L0StopWritesThreshold: 100,
454465
DebugCheck: DebugCheckLevels,
455466
FormatMajorVersion: internalFormatNewest,
456467
Logger: testLogger{t},
457468
}).WithFSDefaults()
469+
if testing.Verbose() {
470+
lel := MakeLoggingEventListener(DefaultLogger)
471+
opts.EventListener = &lel
472+
}
473+
opts.EnsureDefaults()
474+
// Some of the tests require bloom filters.
475+
opts.Levels[0].FilterPolicy = bloom.FilterPolicy(10)
476+
458477
// Disable automatic compactions because otherwise we'll race with
459478
// delete-only compactions triggered by ingesting range tombstones.
460479
opts.DisableAutomaticCompactions = true
@@ -476,7 +495,18 @@ func TestOverlappingIngestedSSTs(t *testing.T) {
476495
}
477496
reset(false)
478497

479-
datadriven.RunTest(t, "testdata/flushable_ingest", func(t *testing.T, td *datadriven.TestData) string {
498+
datadriven.RunTest(t, "testdata/flushable_ingest", func(t *testing.T, td *datadriven.TestData) (result string) {
499+
if td.HasArg("with-fs-logging") {
500+
fsLog.Lock()
501+
fsLog.buf.Reset()
502+
fsLog.Unlock()
503+
defer func() {
504+
fsLog.Lock()
505+
defer fsLog.Unlock()
506+
result = fsLog.buf.String() + result
507+
}()
508+
}
509+
480510
switch td.Cmd {
481511
case "reset":
482512
reset(td.HasArg("strictMem"))

internal/manifest/level.go

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,33 @@ package manifest
77
import "fmt"
88

99
const (
10-
// 3 bits are necessary to represent level values from 0-6.
10+
// 3 bits are necessary to represent level values from 0-6 or 7 for flushable
11+
// ingests.
1112
levelBits = 3
1213
levelMask = (1 << levelBits) - 1
1314
// invalidSublevel denotes an invalid or non-applicable sublevel.
14-
invalidSublevel = -1
15+
invalidSublevel = -1
16+
flushableIngestLevelValue = 7
1517
)
1618

17-
// Level encodes a level and optional sublevel for use in log and error
18-
// messages. The encoding has the property that Level(0) ==
19-
// L0Sublevel(invalidSublevel).
19+
// Level encodes a level and optional sublevel. It can also represent the
20+
// conceptual layer of flushable ingests as "level" -1.
21+
//
22+
// The encoding has the property that Level(0) == L0Sublevel(invalidSublevel).
2023
type Level uint32
2124

2225
func makeLevel(level, sublevel int) Level {
2326
return Level(((sublevel + 1) << levelBits) | level)
2427
}
2528

26-
// LevelToInt returns the int representation of a Level
29+
// LevelToInt returns the int representation of a Level. Returns -1 if the Level
30+
// refers to the flushable ingests pseudo-level.
2731
func LevelToInt(l Level) int {
28-
return int(l) & levelMask
32+
l &= levelMask
33+
if l == flushableIngestLevelValue {
34+
return -1
35+
}
36+
return int(l)
2937
}
3038

3139
// L0Sublevel returns a Level representing the specified L0 sublevel.
@@ -36,11 +44,26 @@ func L0Sublevel(sublevel int) Level {
3644
return makeLevel(0, sublevel)
3745
}
3846

47+
// FlushableIngestLevel returns a Level that represents the flushable ingests
48+
// pseudo-level.
49+
func FlushableIngestLevel() Level {
50+
return makeLevel(flushableIngestLevelValue, invalidSublevel)
51+
}
52+
53+
// FlushableIngestLevel returns true if l represents the flushable ingests
54+
// pseudo-level.
55+
func (l Level) FlushableIngestLevel() bool {
56+
return LevelToInt(l) == -1
57+
}
58+
3959
func (l Level) String() string {
4060
level := int(l) & levelMask
4161
sublevel := (int(l) >> levelBits) - 1
4262
if sublevel != invalidSublevel {
4363
return fmt.Sprintf("L%d.%d", level, sublevel)
4464
}
65+
if level == flushableIngestLevelValue {
66+
return "flushable-ingest"
67+
}
4568
return fmt.Sprintf("L%d", level)
4669
}

internal/manifest/level_test.go

Lines changed: 14 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -5,59 +5,34 @@
55
package manifest
66

77
import (
8-
"fmt"
98
"testing"
109

1110
"github.com/stretchr/testify/require"
1211
)
1312

1413
func TestLevel(t *testing.T) {
1514
testCases := []struct {
16-
level int
15+
level Level
1716
expected string
1817
}{
19-
{0, "L0"},
20-
{1, "L1"},
21-
{2, "L2"},
22-
{3, "L3"},
23-
{4, "L4"},
24-
{5, "L5"},
25-
{6, "L6"},
26-
{7, "L7"},
27-
}
18+
{Level(0), "L0"},
19+
{Level(1), "L1"},
20+
{Level(2), "L2"},
21+
{Level(3), "L3"},
22+
{Level(4), "L4"},
23+
{Level(5), "L5"},
24+
{Level(6), "L6"},
2825

29-
for _, c := range testCases {
30-
t.Run("", func(t *testing.T) {
31-
s := Level(c.level).String()
32-
require.EqualValues(t, c.expected, s)
33-
})
34-
}
35-
}
26+
{L0Sublevel(0), "L0.0"},
27+
{L0Sublevel(1), "L0.1"},
28+
{L0Sublevel(2), "L0.2"},
3629

37-
func TestL0Sublevel(t *testing.T) {
38-
testCases := []struct {
39-
level int
40-
sublevel int
41-
expected string
42-
}{
43-
{0, 0, "L0.0"},
44-
{0, 1, "L0.1"},
45-
{0, 2, "L0.2"},
46-
{0, 1000, "L0.1000"},
47-
{0, -1, "invalid L0 sublevel: -1"},
48-
{0, -2, "invalid L0 sublevel: -2"},
30+
{FlushableIngestLevel(), "flushable-ingest"},
4931
}
5032

5133
for _, c := range testCases {
52-
t.Run("", func(t *testing.T) {
53-
s := func() (result string) {
54-
defer func() {
55-
if r := recover(); r != nil {
56-
result = fmt.Sprint(r)
57-
}
58-
}()
59-
return L0Sublevel(c.sublevel).String()
60-
}()
34+
t.Run(c.expected, func(t *testing.T) {
35+
s := c.level.String()
6136
require.EqualValues(t, c.expected, s)
6237
})
6338
}

iterator.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -834,7 +834,10 @@ func (i *Iterator) sampleRead() {
834834
return
835835
}
836836
if len(mi.levels) > 1 {
837-
mi.ForEachLevelIter(func(li *levelIter) bool {
837+
mi.ForEachLevelIter(func(li *levelIter) (done bool) {
838+
if li.level.FlushableIngestLevel() {
839+
return false
840+
}
838841
l := manifest.LevelToInt(li.level)
839842
if f := li.iterFile; f != nil {
840843
var containsKey bool

level_iter_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ func (lt *levelIterTest) newIters(
175175
if kinds.Point() {
176176
iter, err := lt.readers[file.FileNum].NewIterWithBlockPropertyFiltersAndContextEtc(
177177
ctx, transforms,
178-
opts.LowerBound, opts.UpperBound, nil, true /* useFilterBlock */, iio.stats, sstable.CategoryAndQoS{},
178+
opts.LowerBound, opts.UpperBound, nil, sstable.AlwaysUseFilterBlock, iio.stats, sstable.CategoryAndQoS{},
179179
nil, sstable.TrivialReaderProvider{Reader: lt.readers[file.FileNum]})
180180
if err != nil {
181181
return iterSet{}, errors.CombineErrors(err, set.CloseAll())

merging_iter_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ func TestMergingIterCornerCases(t *testing.T) {
177177
if kinds.Point() {
178178
set.point, err = r.NewIterWithBlockPropertyFilters(
179179
sstable.NoTransforms,
180-
opts.GetLowerBound(), opts.GetUpperBound(), nil, true /* useFilterBlock */, iio.stats,
180+
opts.GetLowerBound(), opts.GetUpperBound(), nil, sstable.AlwaysUseFilterBlock, iio.stats,
181181
sstable.CategoryAndQoS{}, nil, sstable.TrivialReaderProvider{Reader: r})
182182
if err != nil {
183183
return iterSet{}, errors.CombineErrors(err, set.CloseAll())

sstable/block_property_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -972,7 +972,7 @@ func TestBlockProperties(t *testing.T) {
972972
return "filter excludes entire table"
973973
}
974974
iter, err := r.NewIterWithBlockPropertyFilters(
975-
NoTransforms, lower, upper, filterer, false /* useFilterBlock */, &stats,
975+
NoTransforms, lower, upper, filterer, NeverUseFilterBlock, &stats,
976976
CategoryAndQoS{}, nil, TrivialReaderProvider{Reader: r})
977977
if err != nil {
978978
return err.Error()
@@ -1054,7 +1054,7 @@ func TestBlockProperties_BoundLimited(t *testing.T) {
10541054
return "filter excludes entire table"
10551055
}
10561056
iter, err := r.NewIterWithBlockPropertyFilters(
1057-
NoTransforms, lower, upper, filterer, false /* useFilterBlock */, &stats,
1057+
NoTransforms, lower, upper, filterer, NeverUseFilterBlock, &stats,
10581058
CategoryAndQoS{}, nil, TrivialReaderProvider{Reader: r})
10591059
if err != nil {
10601060
return err.Error()

0 commit comments

Comments
 (0)