@@ -282,6 +282,50 @@ var walFailoverUnhealthyOpThreshold = settings.RegisterDurationSetting(
282282 settings .WithPublic ,
283283)
284284
285+ // This cluster setting controls the baseline compaction concurrency (which
286+ // Pebble can dynamically increase up to the max concurrency; see
287+ // CompactionConcurrencyRange).
288+ //
289+ // When the value of this cluster setting is larger than the max concurrency
290+ // (see below), this cluster setting takes precedence (i.e. the max concurrency
291+ // will also equal this value).
292+ //
293+ // The baseline compaction concurrency is temporarily overridden while
294+ // crdb_internal.set_compaction_concurrency is running (which uses
295+ // SetCompactionConcurrency).
296+ var compactionConcurrencyLower = settings .RegisterIntSetting (
297+ settings .ApplicationLevel ,
298+ "storage.compaction_concurrency" ,
299+ "the baseline number of concurrent compactions" ,
300+ 1 ,
301+ settings .IntWithMinimum (1 ),
302+ )
303+
304+ // The maximum concurrency can be configured via an env var (which allows
305+ // per-node control) and/or this cluster setting (which applies to the entire
306+ // cluster).
307+ //
308+ // The environment variable is COCKROACH_CONCURRENT_COMPACTIONS (we also support
309+ // a deprecated variable, see getMaxConcurrentCompactionsFromEnv()).
310+ //
311+ // In the absence of any configuration, we use min(GOMAXPROCS-1, 3).
312+ //
313+ // If both the env variable and cluster setting are set, we take the maximum of
314+ // the two.
315+ //
316+ // In all cases, the maximum concurrency is temporarily overridden while
317+ // crdb_internal.set_compaction_concurrency is running (which uses
318+ // SetCompactionConcurrency).
319+ var compactionConcurrencyUpper = settings .RegisterIntSetting (
320+ settings .ApplicationLevel ,
321+ "storage.max_compaction_concurrency" ,
322+ "the maximum number of concurrent compactions (0 = default); the default value is " +
323+ "min(3,numCPUs-1) or what the COCKROACH_CONCURRENT_COMPACTIONS env var specifies; " +
324+ "the env var also takes precedence over the cluster setting when the latter is lower" ,
325+ 0 ,
326+ settings .NonNegativeInt ,
327+ )
328+
285329// TODO(ssd): This could be SystemOnly but we currently init pebble
286330// engines for temporary storage. Temporary engines shouldn't really
287331// care about download compactions, but they do currently simply
@@ -360,14 +404,10 @@ func DefaultPebbleOptions() *pebble.Options {
360404 KeySchema : DefaultKeySchema ,
361405 KeySchemas : sstable .MakeKeySchemas (KeySchemas ... ),
362406 // A value of 2 triggers a compaction when there is 1 sub-level.
363- L0CompactionThreshold : 2 ,
364- L0StopWritesThreshold : 1000 ,
365- LBaseMaxBytes : 64 << 20 , // 64 MB
366- Levels : make ([]pebble.LevelOptions , 7 ),
367- // NB: Options.MaxConcurrentCompactions may be "wrapped" in NewPebble to
368- // allow overriding the max at runtime through
369- // Engine.SetCompactionConcurrency.
370- MaxConcurrentCompactions : func () int { return defaultMaxConcurrentCompactions },
407+ L0CompactionThreshold : 2 ,
408+ L0StopWritesThreshold : 1000 ,
409+ LBaseMaxBytes : 64 << 20 , // 64 MB
410+ Levels : make ([]pebble.LevelOptions , 7 ),
371411 MemTableSize : 64 << 20 , // 64 MB
372412 MemTableStopWritesThreshold : 4 ,
373413 Merger : MVCCMerger ,
@@ -676,17 +716,26 @@ func newPebble(ctx context.Context, cfg engineConfig) (p *Pebble, err error) {
676716 return getCompressionAlgorithm (ctx , cfg .settings , CompressionAlgorithmStorage )
677717 }
678718 }
679-
719+ // Note: the CompactionConcurrencyRange function will be wrapped below to
720+ // allow overriding the lower and upper values at runtime through
721+ // Engine.SetCompactionConcurrency.
722+ if cfg .opts .CompactionConcurrencyRange == nil {
723+ cfg .opts .CompactionConcurrencyRange = func () (lower , upper int ) {
724+ lower = int (compactionConcurrencyLower .Get (& cfg .settings .SV ))
725+ upper = determineMaxConcurrentCompactions (
726+ defaultMaxConcurrentCompactions ,
727+ envMaxConcurrentCompactions ,
728+ int (compactionConcurrencyUpper .Get (& cfg .settings .SV )),
729+ )
730+ return lower , max (lower , upper )
731+ }
732+ }
680733 if cfg .opts .MaxConcurrentDownloads == nil {
681734 cfg .opts .MaxConcurrentDownloads = func () int {
682735 return int (concurrentDownloadCompactions .Get (& cfg .settings .SV ))
683736 }
684737 }
685738
686- if cfg .opts .MaxConcurrentCompactions == nil {
687- cfg .opts .MaxConcurrentCompactions = func () int { return defaultMaxConcurrentCompactions }
688- }
689-
690739 cfg .opts .EnsureDefaults ()
691740
692741 // The context dance here is done so that we have a clean context without
@@ -791,11 +840,9 @@ func newPebble(ctx context.Context, cfg engineConfig) (p *Pebble, err error) {
791840 diskWriteStatsCollector : cfg .DiskWriteStatsCollector ,
792841 }
793842
794- // MaxConcurrentCompactions can be set by multiple sources, but all the
795- // sources will eventually call NewPebble. So, we override
796- // cfg.opts.MaxConcurrentCompactions to a closure which allows ovderriding the
797- // value.
798- cfg .opts .MaxConcurrentCompactions = p .cco .Wrap (cfg .opts .MaxConcurrentCompactions )
843+ // Wrap the CompactionConcurrencyRange function to allow overriding the lower
844+ // and upper values at runtime through Engine.SetCompactionConcurrency.
845+ cfg .opts .CompactionConcurrencyRange = p .cco .Wrap (cfg .opts .CompactionConcurrencyRange )
799846
800847 // NB: The ordering of the event listeners passed to TeeEventListener is
801848 // deliberate. The listener returned by makeMetricEtcEventListener is
@@ -2740,8 +2787,7 @@ func (e *ExceedMaxSizeError) Error() string {
27402787 return fmt .Sprintf ("export size (%d bytes) exceeds max size (%d bytes)" , e .reached , e .maxSize )
27412788}
27422789
2743- // compactionConcurrencyOverride allows overriding the max concurrent
2744- // compactions.
2790+ // compactionConcurrencyOverride allows overriding the compaction concurrency.
27452791type compactionConcurrencyOverride struct {
27462792 override atomic.Uint32
27472793}
@@ -2751,12 +2797,15 @@ func (cco *compactionConcurrencyOverride) Set(value uint32) {
27512797 cco .override .Store (value )
27522798}
27532799
2754- // Wrap a MaxConcurrentCompactions function to take into account the override.
2755- func (cco * compactionConcurrencyOverride ) Wrap (maxConcurrentCompactions func () int ) func () int {
2756- return func () int {
2800+ // Wrap a CompactionConcurrencyRange function to take into account the override.
2801+ func (cco * compactionConcurrencyOverride ) Wrap (
2802+ compactionConcurrencyRange func () (lower , upper int ),
2803+ ) func () (lower , upper int ) {
2804+ return func () (lower , upper int ) {
27572805 if o := cco .override .Load (); o > 0 {
2758- return int (o )
2806+ // We override both the lower and upper limits.
2807+ return int (o ), int (o )
27592808 }
2760- return maxConcurrentCompactions ()
2809+ return compactionConcurrencyRange ()
27612810 }
27622811}
0 commit comments