Skip to content

Commit 5feb203

Browse files
committed
pebble: separate compaction concurrency override logic
Separate the logic that allows temporarily overriding compaction concurrency. Epic: none Release note: None
1 parent 9a70b62 commit 5feb203

File tree

3 files changed

+38
-30
lines changed

3 files changed

+38
-30
lines changed

pkg/kv/kvserver/stores_server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,11 +199,11 @@ func (is Server) SetCompactionConcurrency(
199199
resp := &CompactionConcurrencyResponse{}
200200
err := is.execStoreCommand(ctx, req.StoreRequestHeader,
201201
func(ctx context.Context, s *Store) error {
202-
prevConcurrency := s.TODOEngine().SetCompactionConcurrency(req.CompactionConcurrency)
202+
s.TODOEngine().SetCompactionConcurrency(req.CompactionConcurrency)
203203

204204
// Wait for cancellation, and once cancelled, reset the compaction concurrency.
205205
<-ctx.Done()
206-
s.TODOEngine().SetCompactionConcurrency(prevConcurrency)
206+
s.TODOEngine().SetCompactionConcurrency(0)
207207
return nil
208208
})
209209
return resp, err

pkg/storage/engine.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,9 +1102,9 @@ type Engine interface {
11021102
// version that it must maintain compatibility with.
11031103
SetMinVersion(version roachpb.Version) error
11041104

1105-
// SetCompactionConcurrency is used to set the engine's compaction
1106-
// concurrency. It returns the previous compaction concurrency.
1107-
SetCompactionConcurrency(n uint64) uint64
1105+
// SetCompactionConcurrency is used to override the engine's max compaction
1106+
// concurrency. A value of 0 removes any existing override.
1107+
SetCompactionConcurrency(n uint64)
11081108

11091109
// SetStoreID informs the engine of the store ID, once it is known.
11101110
// Used to show the store ID in logs and to initialize the shared object

pkg/storage/pebble.go

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ func DefaultPebbleOptions() *pebble.Options {
364364
L0StopWritesThreshold: 1000,
365365
LBaseMaxBytes: 64 << 20, // 64 MB
366366
Levels: make([]pebble.LevelOptions, 7),
367-
// NB: Options.MaxConcurrentCompactions may be overidden in NewPebble to
367+
// NB: Options.MaxConcurrentCompactions may be "wrapped" in NewPebble to
368368
// allow overriding the max at runtime through
369369
// Engine.SetCompactionConcurrency.
370370
MaxConcurrentCompactions: getMaxConcurrentCompactions,
@@ -501,24 +501,15 @@ type engineConfig struct {
501501

502502
// Pebble is a wrapper around a Pebble database instance.
503503
type Pebble struct {
504-
atomic struct {
505-
// compactionConcurrency is the current compaction concurrency set on
506-
// the Pebble store. The compactionConcurrency option in the Pebble
507-
// Options struct is a closure which will return
508-
// Pebble.atomic.compactionConcurrency.
509-
//
510-
// This mechanism allows us to change the Pebble compactionConcurrency
511-
// on the fly without restarting Pebble.
512-
compactionConcurrency uint64
513-
}
514-
515504
cfg engineConfig
516505
db *pebble.DB
517506
closed bool
518507
auxDir string
519508
ballastPath string
520509
properties roachpb.StoreProperties
521510

511+
cco compactionConcurrencyOverride
512+
522513
// Stats updated by pebble.EventListener invocations, and returned in
523514
// GetMetrics. Updated and retrieved atomically.
524515
writeStallCount int64
@@ -575,10 +566,10 @@ var _ Engine = &Pebble{}
575566
// WorkloadCollectorEnabled specifies if the workload collector will be enabled
576567
var WorkloadCollectorEnabled = envutil.EnvOrDefaultBool("COCKROACH_STORAGE_WORKLOAD_COLLECTOR", false)
577568

578-
// SetCompactionConcurrency will return the previous compaction concurrency.
579-
func (p *Pebble) SetCompactionConcurrency(n uint64) uint64 {
580-
prevConcurrency := atomic.SwapUint64(&p.atomic.compactionConcurrency, n)
581-
return prevConcurrency
569+
// SetCompactionConcurrency is used to override the engine's max compaction
570+
// concurrency. A value of 0 removes any existing override.
571+
func (p *Pebble) SetCompactionConcurrency(n uint64) {
572+
p.cco.Set(uint32(n))
582573
}
583574

584575
// RegisterDiskSlowCallback registers a callback that will be run when a write
@@ -692,6 +683,8 @@ func newPebble(ctx context.Context, cfg engineConfig) (p *Pebble, err error) {
692683
}
693684
}
694685

686+
cfg.opts.MaxConcurrentCompactions = getMaxConcurrentCompactions
687+
695688
cfg.opts.EnsureDefaults()
696689

697690
// The context dance here is done so that we have a clean context without
@@ -798,15 +791,9 @@ func newPebble(ctx context.Context, cfg engineConfig) (p *Pebble, err error) {
798791

799792
// MaxConcurrentCompactions can be set by multiple sources, but all the
800793
// sources will eventually call NewPebble. So, we override
801-
// cfg.opts.MaxConcurrentCompactions to a closure which will return
802-
// Pebble.atomic.compactionConcurrency. This will allow us to both honor
803-
// the compactions concurrency which has already been set and allow us
804-
// to update the compactionConcurrency on the fly by changing the
805-
// Pebble.atomic.compactionConcurrency variable.
806-
p.atomic.compactionConcurrency = uint64(cfg.opts.MaxConcurrentCompactions())
807-
cfg.opts.MaxConcurrentCompactions = func() int {
808-
return int(atomic.LoadUint64(&p.atomic.compactionConcurrency))
809-
}
794+
// cfg.opts.MaxConcurrentCompactions to a closure which allows ovderriding the
795+
// value.
796+
cfg.opts.MaxConcurrentCompactions = p.cco.Wrap(cfg.opts.MaxConcurrentCompactions)
810797

811798
// NB: The ordering of the event listeners passed to TeeEventListener is
812799
// deliberate. The listener returned by makeMetricEtcEventListener is
@@ -2750,3 +2737,24 @@ var _ error = &ExceedMaxSizeError{}
27502737
func (e *ExceedMaxSizeError) Error() string {
27512738
return fmt.Sprintf("export size (%d bytes) exceeds max size (%d bytes)", e.reached, e.maxSize)
27522739
}
2740+
2741+
// compactionConcurrencyOverride allows overriding the max concurrent
2742+
// compactions.
2743+
type compactionConcurrencyOverride struct {
2744+
override atomic.Uint32
2745+
}
2746+
2747+
// Set the override value. If the value is 0, the override is cancelled.
2748+
func (cco *compactionConcurrencyOverride) Set(value uint32) {
2749+
cco.override.Store(value)
2750+
}
2751+
2752+
// Wrap a MaxConcurrentCompactions function to take into account the override.
2753+
func (cco *compactionConcurrencyOverride) Wrap(maxConcurrentCompactions func() int) func() int {
2754+
return func() int {
2755+
if o := cco.override.Load(); o > 0 {
2756+
return int(o)
2757+
}
2758+
return maxConcurrentCompactions()
2759+
}
2760+
}

0 commit comments

Comments
 (0)