Skip to content

Commit e8985d3

Browse files
committed
storage: cleanup default max concurrent compactions code
Refactor this code so we statically initialize the default value. Epic: none Release note: None
1 parent 5feb203 commit e8985d3

File tree

4 files changed

+53
-46
lines changed

4 files changed

+53
-46
lines changed

pkg/kv/kvserver/stores_server.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,9 @@ func (is Server) ScanStorageInternalKeys(
191191
// SetCompactionConcurrency implements PerStoreServer. It changes the compaction
192192
// concurrency of a store. While SetCompactionConcurrency is safe for concurrent
193193
// use, it adds uncertainty about the compaction concurrency actually set on
194-
// the store. It also adds uncertainty about the compaction concurrency set on
195-
// the store once the request is cancelled.
194+
// the store. We do guarantee that once all SetCompactionConcurrency requests
195+
// are finished (cancelled), the override is removed and the original
196+
// concurrency is restored.
196197
func (is Server) SetCompactionConcurrency(
197198
ctx context.Context, req *CompactionConcurrencyRequest,
198199
) (*CompactionConcurrencyResponse, error) {
@@ -201,7 +202,8 @@ func (is Server) SetCompactionConcurrency(
201202
func(ctx context.Context, s *Store) error {
202203
s.TODOEngine().SetCompactionConcurrency(req.CompactionConcurrency)
203204

204-
// Wait for cancellation, and once cancelled, reset the compaction concurrency.
205+
// Wait for cancellation, and once cancelled, reset the compaction
206+
// concurrency.
205207
<-ctx.Done()
206208
s.TODOEngine().SetCompactionConcurrency(0)
207209
return nil

pkg/storage/mvcc.go

Lines changed: 26 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -101,46 +101,32 @@ var TargetBytesPerLockConflictError = settings.RegisterIntSetting(
101101
settings.WithName("storage.mvcc.target_bytes_per_lock_conflict_error"),
102102
)
103103

104-
// getMaxConcurrentCompactions wraps the maxConcurrentCompactions env var in a
105-
// func that may be installed on Options.MaxConcurrentCompactions. It also
106-
// imposes a floor on the max, so that an engine is always created with at least
107-
// 1 slot for a compactions.
108-
//
109-
// NB: This function inspects the environment every time it's called. This is
110-
// okay, because Engine construction in NewPebble will invoke it and store the
111-
// value on the Engine itself.
112-
func getMaxConcurrentCompactions() int {
113-
n := envutil.EnvOrDefaultInt(
114-
"COCKROACH_CONCURRENT_COMPACTIONS", func() int {
115-
// The old COCKROACH_ROCKSDB_CONCURRENCY environment variable was never
116-
// documented, but customers were told about it and use today in
117-
// production. We don't want to break them, so if the new env var
118-
// is unset but COCKROACH_ROCKSDB_CONCURRENCY is set, use the old env
119-
// var's value. This old env var has a wart in that it's expressed as a
120-
// number of concurrency slots to make available to both flushes and
121-
// compactions (a vestige of the corresponding RocksDB option's
122-
// mechanics). We need to adjust it to be in terms of just compaction
123-
// concurrency by subtracting the flushing routine's dedicated slot.
124-
//
125-
// TODO(jackson): Should envutil expose its `getEnv` internal func for
126-
// cases like this where we actually want to know whether it's present
127-
// or not; not just fallback to a default?
128-
if oldV := envutil.EnvOrDefaultInt("COCKROACH_ROCKSDB_CONCURRENCY", 0); oldV > 0 {
129-
return oldV - 1
130-
}
131-
132-
// By default use up to min(numCPU-1, 3) threads for background
133-
// compactions per store (reserving the final process for flushes).
134-
const max = 3
135-
if n := runtime.GOMAXPROCS(0); n-1 < max {
136-
return n - 1
137-
}
138-
return max
139-
}())
140-
if n < 1 {
141-
return 1
142-
}
143-
return n
104+
var defaultMaxConcurrentCompactions = getDefaultMaxConcurrentCompactions()
105+
106+
func getDefaultMaxConcurrentCompactions() int {
107+
if v := envutil.EnvOrDefaultInt("COCKROACH_CONCURRENT_COMPACTIONS", 0); v > 0 {
108+
return v
109+
}
110+
// The old COCKROACH_ROCKSDB_CONCURRENCY environment variable was never
111+
// documented, but customers were told about it and use today in
112+
// production. We don't want to break them, so if the new env var
113+
// is unset but COCKROACH_ROCKSDB_CONCURRENCY is set, use the old env
114+
// var's value. This old env var has a wart in that it's expressed as a
115+
// number of concurrency slots to make available to both flushes and
116+
// compactions (a vestige of the corresponding RocksDB option's
117+
// mechanics). We need to adjust it to be in terms of just compaction
118+
// concurrency by subtracting the flushing routine's dedicated slot.
119+
if oldV := envutil.EnvOrDefaultInt("COCKROACH_ROCKSDB_CONCURRENCY", 0); oldV > 0 {
120+
return max(oldV-1, 1)
121+
}
122+
123+
// By default use up to min(GOMAXPROCS-1, 3) threads for background
124+
// compactions per store (reserving the final process for flushes).
125+
const upperLimit = 3
126+
if n := runtime.GOMAXPROCS(0); n-1 < upperLimit {
127+
return max(n-1, 1)
128+
}
129+
return upperLimit
144130
}
145131

146132
// l0SubLevelCompactionConcurrency is the sub-level threshold at which to

pkg/storage/pebble.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ func DefaultPebbleOptions() *pebble.Options {
367367
// NB: Options.MaxConcurrentCompactions may be "wrapped" in NewPebble to
368368
// allow overriding the max at runtime through
369369
// Engine.SetCompactionConcurrency.
370-
MaxConcurrentCompactions: getMaxConcurrentCompactions,
370+
MaxConcurrentCompactions: func() int { return defaultMaxConcurrentCompactions },
371371
MemTableSize: 64 << 20, // 64 MB
372372
MemTableStopWritesThreshold: 4,
373373
Merger: MVCCMerger,
@@ -683,7 +683,9 @@ func newPebble(ctx context.Context, cfg engineConfig) (p *Pebble, err error) {
683683
}
684684
}
685685

686-
cfg.opts.MaxConcurrentCompactions = getMaxConcurrentCompactions
686+
if cfg.opts.MaxConcurrentCompactions == nil {
687+
cfg.opts.MaxConcurrentCompactions = func() int { return defaultMaxConcurrentCompactions }
688+
}
687689

688690
cfg.opts.EnsureDefaults()
689691

pkg/storage/pebble_test.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1567,7 +1567,7 @@ func TestCompactionConcurrencyEnvVars(t *testing.T) {
15671567
} else {
15681568
defer envutil.TestSetEnv(t, "COCKROACH_CONCURRENT_COMPACTIONS", tc.cockroachConcurrency)()
15691569
}
1570-
require.Equal(t, tc.want, getMaxConcurrentCompactions())
1570+
require.Equal(t, tc.want, getDefaultMaxConcurrentCompactions())
15711571
})
15721572
}
15731573
}
@@ -1664,3 +1664,20 @@ func TestPebbleLoggingSlowReads(t *testing.T) {
16641664
require.Less(t, 0, slowCount)
16651665
})
16661666
}
1667+
1668+
func TestPebbleSetCompactionConcurrency(t *testing.T) {
1669+
defer leaktest.AfterTest(t)()
1670+
defer log.Scope(t).Close(t)
1671+
ctx := context.Background()
1672+
1673+
settings := cluster.MakeTestingClusterSettings()
1674+
p, err := Open(ctx, InMemory(), settings, CacheSize(1<<20 /* 1 MiB */), MaxConcurrentCompactions(4))
1675+
require.NoError(t, err)
1676+
defer p.Close()
1677+
1678+
require.Equal(t, 4, p.cfg.opts.MaxConcurrentCompactions())
1679+
p.SetCompactionConcurrency(10)
1680+
require.Equal(t, 10, p.cfg.opts.MaxConcurrentCompactions())
1681+
p.SetCompactionConcurrency(0)
1682+
require.Equal(t, 4, p.cfg.opts.MaxConcurrentCompactions())
1683+
}

0 commit comments

Comments
 (0)