Skip to content

Commit a13d996

Browse files
committed
storage: fix up compaction concurrency code
1 parent a50097a commit a13d996

File tree

5 files changed

+32
-31
lines changed

5 files changed

+32
-31
lines changed

pkg/storage/metamorphic/options.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func randomOptions() *pebble.Options {
127127
opts.MemTableSize = 1 << rngIntRange(rng, 11, 28)
128128
opts.MemTableStopWritesThreshold = int(rngIntRange(rng, 2, 7))
129129
maxConcurrentCompactions := int(rngIntRange(rng, 1, 4))
130-
opts.MaxConcurrentCompactions = func() int { return maxConcurrentCompactions }
130+
opts.CompactionConcurrencyRange = func() (lower, upper int) { return 1, maxConcurrentCompactions }
131131

132132
opts.Cache = pebble.NewCache(1 << rngIntRange(rng, 1, 30))
133133
defer opts.Cache.Unref()

pkg/storage/mvcc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func getDefaultMaxConcurrentCompactions() int {
131131

132132
// l0SubLevelCompactionConcurrency is the sub-level threshold at which to
133133
// allow an increase in compaction concurrency. The maximum is still
134-
// controlled by pebble.Options.MaxConcurrentCompactions. The default of 2
134+
// controlled by pebble.Options.CompactionConcurrencyRange. The default of 2
135135
// allows an additional compaction (so total 1 + 1 = 2 compactions) when the
136136
// sub-level count is 2, and increments concurrency by 1 whenever sub-level
137137
// count increases by 2 (so 1 + 2 = 3 compactions) when sub-level count is 4,

pkg/storage/open.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ func RemoteStorageFactory(accessor *cloud.EarlyBootExternalStorageAccessor) Conf
202202
// compactions an Engine will execute.
203203
func MaxConcurrentCompactions(n int) ConfigOption {
204204
return func(cfg *engineConfig) error {
205-
cfg.opts.MaxConcurrentCompactions = func() int { return n }
205+
cfg.opts.CompactionConcurrencyRange = func() (lower, upper int) { return 1, n }
206206
return nil
207207
}
208208
}

pkg/storage/pebble.go

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -360,14 +360,10 @@ func DefaultPebbleOptions() *pebble.Options {
360360
KeySchema: DefaultKeySchema,
361361
KeySchemas: sstable.MakeKeySchemas(KeySchemas...),
362362
// A value of 2 triggers a compaction when there is 1 sub-level.
363-
L0CompactionThreshold: 2,
364-
L0StopWritesThreshold: 1000,
365-
LBaseMaxBytes: 64 << 20, // 64 MB
366-
Levels: make([]pebble.LevelOptions, 7),
367-
// NB: Options.MaxConcurrentCompactions may be "wrapped" in NewPebble to
368-
// allow overriding the max at runtime through
369-
// Engine.SetCompactionConcurrency.
370-
MaxConcurrentCompactions: func() int { return defaultMaxConcurrentCompactions },
363+
L0CompactionThreshold: 2,
364+
L0StopWritesThreshold: 1000,
365+
LBaseMaxBytes: 64 << 20, // 64 MB
366+
Levels: make([]pebble.LevelOptions, 7),
371367
MemTableSize: 64 << 20, // 64 MB
372368
MemTableStopWritesThreshold: 4,
373369
Merger: MVCCMerger,
@@ -676,17 +672,20 @@ func newPebble(ctx context.Context, cfg engineConfig) (p *Pebble, err error) {
676672
return getCompressionAlgorithm(ctx, cfg.settings, CompressionAlgorithmStorage)
677673
}
678674
}
679-
675+
// Note: the MaxConcurrentCompactions function will be wrapped below to allow
676+
// overriding dynamically via overriding the max at runtime through
677+
// Engine.SetCompactionConcurrency.
678+
if cfg.opts.CompactionConcurrencyRange == nil {
679+
cfg.opts.CompactionConcurrencyRange = func() (lower, upper int) {
680+
return 1, defaultMaxConcurrentCompactions
681+
}
682+
}
680683
if cfg.opts.MaxConcurrentDownloads == nil {
681684
cfg.opts.MaxConcurrentDownloads = func() int {
682685
return int(concurrentDownloadCompactions.Get(&cfg.settings.SV))
683686
}
684687
}
685688

686-
if cfg.opts.MaxConcurrentCompactions == nil {
687-
cfg.opts.MaxConcurrentCompactions = func() int { return defaultMaxConcurrentCompactions }
688-
}
689-
690689
cfg.opts.EnsureDefaults()
691690

692691
// The context dance here is done so that we have a clean context without
@@ -791,11 +790,9 @@ func newPebble(ctx context.Context, cfg engineConfig) (p *Pebble, err error) {
791790
diskWriteStatsCollector: cfg.DiskWriteStatsCollector,
792791
}
793792

794-
// MaxConcurrentCompactions can be set by multiple sources, but all the
795-
// sources will eventually call NewPebble. So, we override
796-
// cfg.opts.MaxConcurrentCompactions to a closure which allows ovderriding the
797-
// value.
798-
cfg.opts.MaxConcurrentCompactions = p.cco.Wrap(cfg.opts.MaxConcurrentCompactions)
793+
// Wrap the MaxConcurrentCompactions function to allow overriding dynamically via
794+
// overriding the max at runtime through Engine.SetCompactionConcurrency.
795+
cfg.opts.CompactionConcurrencyRange = p.cco.Wrap(cfg.opts.CompactionConcurrencyRange)
799796

800797
// NB: The ordering of the event listeners passed to TeeEventListener is
801798
// deliberate. The listener returned by makeMetricEtcEventListener is
@@ -2740,8 +2737,7 @@ func (e *ExceedMaxSizeError) Error() string {
27402737
return fmt.Sprintf("export size (%d bytes) exceeds max size (%d bytes)", e.reached, e.maxSize)
27412738
}
27422739

2743-
// compactionConcurrencyOverride allows overriding the max concurrent
2744-
// compactions.
2740+
// compactionConcurrencyOverride allows overriding the compaction concurrency.
27452741
type compactionConcurrencyOverride struct {
27462742
override atomic.Uint32
27472743
}
@@ -2751,12 +2747,15 @@ func (cco *compactionConcurrencyOverride) Set(value uint32) {
27512747
cco.override.Store(value)
27522748
}
27532749

2754-
// Wrap a MaxConcurrentCompactions function to take into account the override.
2755-
func (cco *compactionConcurrencyOverride) Wrap(maxConcurrentCompactions func() int) func() int {
2756-
return func() int {
2750+
// Wrap a CompactionConcurrencyRange function to take into account the override.
2751+
func (cco *compactionConcurrencyOverride) Wrap(
2752+
compactionConcurrencyRange func() (lower, upper int),
2753+
) func() (lower, upper int) {
2754+
return func() (lower, upper int) {
27572755
if o := cco.override.Load(); o > 0 {
2758-
return int(o)
2756+
// We override both the lower and upper limits.
2757+
return int(o), int(o)
27592758
}
2760-
return maxConcurrentCompactions()
2759+
return compactionConcurrencyRange()
27612760
}
27622761
}

pkg/storage/pebble_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1675,9 +1675,11 @@ func TestPebbleSetCompactionConcurrency(t *testing.T) {
16751675
require.NoError(t, err)
16761676
defer p.Close()
16771677

1678-
require.Equal(t, 4, p.cfg.opts.MaxConcurrentCompactions())
1678+
require.Equal(t, "1 4", fmt.Sprint(p.cfg.opts.CompactionConcurrencyRange()))
1679+
16791680
p.SetCompactionConcurrency(10)
1680-
require.Equal(t, 10, p.cfg.opts.MaxConcurrentCompactions())
1681+
require.Equal(t, "10 10", fmt.Sprint(p.cfg.opts.CompactionConcurrencyRange()))
1682+
16811683
p.SetCompactionConcurrency(0)
1682-
require.Equal(t, 4, p.cfg.opts.MaxConcurrentCompactions())
1684+
require.Equal(t, "1 4", fmt.Sprint(p.cfg.opts.CompactionConcurrencyRange()))
16831685
}

0 commit comments

Comments
 (0)