Skip to content

Commit 9658822

Browse files
committed
storage: add cluster settings for compaction concurrency
Add two cluster settings that control the compaction concurrency limits: `storage.compaction_concurrency` and `storage.max_compaction_concurrency`. Note that the `COCKROACH_CONCURRENT_COMPACTIONS` env var is still supported: when it is used together with the cluster setting, we take the max of the two values. Fixes: #144963 Release note: None
1 parent a13d996 commit 9658822

File tree

3 files changed

+155
-62
lines changed

3 files changed

+155
-62
lines changed

pkg/storage/mvcc.go

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,21 @@ var TargetBytesPerLockConflictError = settings.RegisterIntSetting(
103103

104104
var defaultMaxConcurrentCompactions = getDefaultMaxConcurrentCompactions()
105105

106+
// By default, we use up to min(GOMAXPROCS-1, 3) threads for background
107+
// compactions per store (reserving the final process for flushes).
106108
func getDefaultMaxConcurrentCompactions() int {
109+
const def = 3
110+
if n := runtime.GOMAXPROCS(0); n-1 < def {
111+
return max(n-1, 1)
112+
}
113+
return def
114+
}
115+
116+
// envMaxConcurrentCompactions is not zero if this node has an env var override
117+
// for the concurrency.
118+
var envMaxConcurrentCompactions = getMaxConcurrentCompactionsFromEnv()
119+
120+
func getMaxConcurrentCompactionsFromEnv() int {
107121
if v := envutil.EnvOrDefaultInt("COCKROACH_CONCURRENT_COMPACTIONS", 0); v > 0 {
108122
return v
109123
}
@@ -119,14 +133,26 @@ func getDefaultMaxConcurrentCompactions() int {
119133
if oldV := envutil.EnvOrDefaultInt("COCKROACH_ROCKSDB_CONCURRENCY", 0); oldV > 0 {
120134
return max(oldV-1, 1)
121135
}
136+
return 0
137+
}
122138

123-
// By default use up to min(GOMAXPROCS-1, 3) threads for background
124-
// compactions per store (reserving the final process for flushes).
125-
const upperLimit = 3
126-
if n := runtime.GOMAXPROCS(0); n-1 < upperLimit {
127-
return max(n-1, 1)
139+
// determineMaxConcurrentCompactions determines the upper limit on compaction
140+
// concurrency.
141+
//
142+
// Normally, we use the default limit of min(3, numCPU-1). This limit can be
143+
// changed via an environment variable or via a cluster setting. If both of
144+
// those are used, the maximum of them is taken.
145+
func determineMaxConcurrentCompactions(defaultValue int, envValue int, clusterSetting int) int {
146+
if envValue > 0 {
147+
if clusterSetting > 0 {
148+
return max(envValue, clusterSetting)
149+
}
150+
return envValue
151+
}
152+
if clusterSetting > 0 {
153+
return clusterSetting
128154
}
129-
return upperLimit
155+
return defaultValue
130156
}
131157

132158
// l0SubLevelCompactionConcurrency is the sub-level threshold at which to

pkg/storage/pebble.go

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,50 @@ var walFailoverUnhealthyOpThreshold = settings.RegisterDurationSetting(
282282
settings.WithPublic,
283283
)
284284

285+
// This cluster setting controls the baseline compaction concurrency (which
286+
// Pebble can dynamically increase up to the max concurrency; see
287+
// CompactionConcurrencyRange).
288+
//
289+
// When the value of this cluster setting is larger than the max concurrency
290+
// (see below), this cluster setting takes precedence (i.e. the max concurrency
291+
// will also equal this value).
292+
//
293+
// The baseline compaction concurrency is temporarily overridden while
294+
// crdb_internal.set_compaction_concurrency is running (which uses
295+
// SetCompactionConcurrency).
296+
var compactionConcurrencyLower = settings.RegisterIntSetting(
297+
settings.ApplicationLevel,
298+
"storage.compaction_concurrency",
299+
"the baseline number of concurrent compactions",
300+
1,
301+
settings.IntWithMinimum(1),
302+
)
303+
304+
// The maximum concurrency can be configured via an env var (which allows
305+
// per-node control) and/or this cluster setting (which applies to the entire
306+
// cluster).
307+
//
308+
// The environment variable is COCKROACH_CONCURRENT_COMPACTIONS (we also support
309+
// a deprecated variable, see getMaxConcurrentCompactionsFromEnv()).
310+
//
311+
// In the absence of any configuration, we use min(GOMAXPROCS-1, 3).
312+
//
313+
// If both the env variable and cluster setting are set, we take the maximum of
314+
// the two.
315+
//
316+
// In all cases, the maximum concurrency is temporarily overridden while
317+
// crdb_internal.set_compaction_concurrency is running (which uses
318+
// SetCompactionConcurrency).
319+
var compactionConcurrencyUpper = settings.RegisterIntSetting(
320+
settings.ApplicationLevel,
321+
"storage.max_compaction_concurrency",
322+
"the maximum number of concurrent compactions (0 = default); the default value is "+
323+
"min(3,numCPUs-1) or what the COCKROACH_CONCURRENT_COMPACTIONS env var specifies; "+
324+
"the env var also takes precedence over the cluster setting when the latter is lower",
325+
0,
326+
settings.NonNegativeInt,
327+
)
328+
285329
// TODO(ssd): This could be SystemOnly but we currently init pebble
286330
// engines for temporary storage. Temporary engines shouldn't really
287331
// care about download compactions, but they do currently simply
@@ -672,12 +716,18 @@ func newPebble(ctx context.Context, cfg engineConfig) (p *Pebble, err error) {
672716
return getCompressionAlgorithm(ctx, cfg.settings, CompressionAlgorithmStorage)
673717
}
674718
}
675-
// Note: the MaxConcurrentCompactions function will be wrapped below to allow
676-
// overriding dynamically via overriding the max at runtime through
719+
// Note: the CompactionConcurrencyRange function will be wrapped below to
720+
// allow overriding the lower and upper values at runtime through
677721
// Engine.SetCompactionConcurrency.
678722
if cfg.opts.CompactionConcurrencyRange == nil {
679723
cfg.opts.CompactionConcurrencyRange = func() (lower, upper int) {
680-
return 1, defaultMaxConcurrentCompactions
724+
lower = int(compactionConcurrencyLower.Get(&cfg.settings.SV))
725+
upper = determineMaxConcurrentCompactions(
726+
defaultMaxConcurrentCompactions,
727+
envMaxConcurrentCompactions,
728+
int(compactionConcurrencyUpper.Get(&cfg.settings.SV)),
729+
)
730+
return lower, max(lower, upper)
681731
}
682732
}
683733
if cfg.opts.MaxConcurrentDownloads == nil {
@@ -790,8 +840,8 @@ func newPebble(ctx context.Context, cfg engineConfig) (p *Pebble, err error) {
790840
diskWriteStatsCollector: cfg.DiskWriteStatsCollector,
791841
}
792842

793-
// Wrap the MaxConcurrentCompactions function to allow overriding dynamically via
794-
// overriding the max at runtime through Engine.SetCompactionConcurrency.
843+
// Wrap the CompactionConcurrencyRange function to allow overriding the lower
844+
// and upper values at runtime through Engine.SetCompactionConcurrency.
795845
cfg.opts.CompactionConcurrencyRange = p.cco.Wrap(cfg.opts.CompactionConcurrencyRange)
796846

797847
// NB: The ordering of the event listeners passed to TeeEventListener is

pkg/storage/pebble_test.go

Lines changed: 68 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1505,69 +1505,86 @@ func TestCompactionConcurrencyEnvVars(t *testing.T) {
15051505
defer leaktest.AfterTest(t)()
15061506
defer log.Scope(t).Close(t)
15071507

1508-
maxProcsBefore := runtime.GOMAXPROCS(0)
1509-
defer runtime.GOMAXPROCS(maxProcsBefore)
1510-
15111508
type testCase struct {
1512-
maxProcs int
1513-
rocksDBConcurrency string
1514-
cockroachConcurrency string
1515-
want int
1509+
cockroachConcurrentCompactions string
1510+
cockroachRocksDBConcurrency string
1511+
want int
15161512
}
15171513
cases := []testCase{
1518-
// Defaults
1519-
{32, "", "", 3},
1520-
{4, "", "", 3},
1521-
{3, "", "", 2},
1522-
{2, "", "", 1},
1523-
{1, "", "", 1},
1524-
// Old COCKROACH_ROCKSDB_CONCURRENCY env var is set. The user-provided
1525-
// value includes 1 slot for flushes, so resulting compaction
1526-
// concurrency is n-1.
1527-
{32, "4", "", 3},
1528-
{4, "4", "", 3},
1529-
{2, "4", "", 3},
1530-
{1, "4", "", 3},
1531-
{32, "8", "", 7},
1532-
{4, "8", "", 7},
1533-
{2, "8", "", 7},
1534-
{1, "8", "", 7},
1535-
// New COCKROACH_CONCURRENT_COMPACTIONS env var is set.
1536-
{32, "", "4", 4},
1537-
{4, "", "4", 4},
1538-
{2, "", "4", 4},
1539-
{1, "", "4", 4},
1540-
{32, "", "8", 8},
1541-
{4, "", "8", 8},
1542-
{2, "", "8", 8},
1543-
{1, "", "8", 8},
1544-
// Both settings are set; COCKROACH_CONCURRENT_COMPACTIONS supersedes
1545-
// COCKROACH_ROCKSDB_CONCURRENCY.
1546-
{32, "8", "4", 4},
1547-
{4, "1", "4", 4},
1548-
{2, "2", "4", 4},
1549-
{1, "5", "4", 4},
1550-
{32, "1", "8", 8},
1551-
{4, "2", "8", 8},
1552-
{2, "4", "8", 8},
1553-
{1, "1", "8", 8},
1514+
// Default.
1515+
{want: 0},
1516+
{cockroachConcurrentCompactions: "5", want: 5},
1517+
{cockroachConcurrentCompactions: "1", want: 1},
1518+
{cockroachConcurrentCompactions: "0", want: 0},
1519+
// COCKROACH_ROCKSDB_CONCURRENCY is deprecated but we still support it. Note
1520+
// that for this env var, the compaction concurrency is the value minus 1.
1521+
{cockroachRocksDBConcurrency: "5", want: 4},
1522+
{cockroachRocksDBConcurrency: "2", want: 1},
1523+
{cockroachRocksDBConcurrency: "1", want: 1},
1524+
{cockroachRocksDBConcurrency: "0", want: 0},
1525+
// The non-deprecated var takes precedence.
1526+
{cockroachConcurrentCompactions: "10", cockroachRocksDBConcurrency: "5", want: 10},
15541527
}
15551528
for _, tc := range cases {
1556-
t.Run(fmt.Sprintf("GOMAXPROCS=%d,old=%q,new=%q", tc.maxProcs, tc.rocksDBConcurrency, tc.cockroachConcurrency),
1529+
t.Run(fmt.Sprintf("old=%q,new=%q", tc.cockroachRocksDBConcurrency, tc.cockroachConcurrentCompactions),
15571530
func(t *testing.T) {
1558-
runtime.GOMAXPROCS(tc.maxProcs)
1559-
1560-
if tc.rocksDBConcurrency == "" {
1531+
if tc.cockroachRocksDBConcurrency == "" {
15611532
defer envutil.TestUnsetEnv(t, "COCKROACH_ROCKSDB_CONCURRENCY")()
15621533
} else {
1563-
defer envutil.TestSetEnv(t, "COCKROACH_ROCKSDB_CONCURRENCY", tc.rocksDBConcurrency)()
1534+
defer envutil.TestSetEnv(t, "COCKROACH_ROCKSDB_CONCURRENCY", tc.cockroachRocksDBConcurrency)()
15641535
}
1565-
if tc.cockroachConcurrency == "" {
1536+
if tc.cockroachConcurrentCompactions == "" {
15661537
defer envutil.TestUnsetEnv(t, "COCKROACH_CONCURRENT_COMPACTIONS")()
15671538
} else {
1568-
defer envutil.TestSetEnv(t, "COCKROACH_CONCURRENT_COMPACTIONS", tc.cockroachConcurrency)()
1539+
defer envutil.TestSetEnv(t, "COCKROACH_CONCURRENT_COMPACTIONS", tc.cockroachConcurrentCompactions)()
15691540
}
1570-
require.Equal(t, tc.want, getDefaultMaxConcurrentCompactions())
1541+
require.Equal(t, tc.want, getMaxConcurrentCompactionsFromEnv())
1542+
})
1543+
}
1544+
}
1545+
1546+
func TestDetermineMaxConcurrentCompactions(t *testing.T) {
1547+
defer leaktest.AfterTest(t)()
1548+
defer log.Scope(t).Close(t)
1549+
1550+
maxProcsBefore := runtime.GOMAXPROCS(0)
1551+
defer runtime.GOMAXPROCS(maxProcsBefore)
1552+
1553+
type testCase struct {
1554+
maxProcs int
1555+
envValue int
1556+
clusterSetting int
1557+
want int
1558+
}
1559+
cases := []testCase{
1560+
// Defaults.
1561+
{maxProcs: 32, want: 3},
1562+
{maxProcs: 8, want: 3},
1563+
{maxProcs: 4, want: 3},
1564+
{maxProcs: 2, want: 1},
1565+
{maxProcs: 1, want: 1},
1566+
// Env value override.
1567+
{maxProcs: 32, envValue: 10, want: 10},
1568+
{maxProcs: 1, envValue: 10, want: 10},
1569+
// Cluster setting override.
1570+
{maxProcs: 32, clusterSetting: 10, want: 10},
1571+
{maxProcs: 1, clusterSetting: 10, want: 10},
1572+
// Env value and cluster setting override.
1573+
{maxProcs: 32, envValue: 15, clusterSetting: 10, want: 15},
1574+
{maxProcs: 32, envValue: 10, clusterSetting: 15, want: 15},
1575+
{maxProcs: 1, envValue: 15, clusterSetting: 10, want: 15},
1576+
{maxProcs: 1, envValue: 10, clusterSetting: 15, want: 15},
1577+
}
1578+
for _, tc := range cases {
1579+
t.Run(fmt.Sprintf("GOMAXPROCS=%d,env=%d,setting=%d", tc.maxProcs, tc.envValue, tc.clusterSetting),
1580+
func(t *testing.T) {
1581+
runtime.GOMAXPROCS(tc.maxProcs)
1582+
actual := determineMaxConcurrentCompactions(
1583+
getDefaultMaxConcurrentCompactions(),
1584+
tc.envValue,
1585+
tc.clusterSetting,
1586+
)
1587+
require.Equal(t, tc.want, actual)
15711588
})
15721589
}
15731590
}

0 commit comments

Comments
 (0)