Skip to content

Commit e1c6c3f

Browse files
authored
Configurable limit on concurrent shard closing (#121267)
Today we limit the number of shards concurrently closed by the `IndicesClusterStateService`, but this limit is currently a function of the CPU count of the node. On nodes with plentiful CPU but poor IO performance we may want to restrict this limit further. This commit exposes the throttling limit as a setting.
1 parent 67c2f41 commit e1c6c3f

File tree

3 files changed

+88
-3
lines changed

3 files changed

+88
-3
lines changed

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -614,6 +614,7 @@ public void apply(Settings value, Settings current, Settings previous) {
614614
DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING,
615615
IndicesClusterStateService.SHARD_LOCK_RETRY_INTERVAL_SETTING,
616616
IndicesClusterStateService.SHARD_LOCK_RETRY_TIMEOUT_SETTING,
617+
IndicesClusterStateService.CONCURRENT_SHARD_CLOSE_LIMIT,
617618
IngestSettings.GROK_WATCHDOG_INTERVAL,
618619
IngestSettings.GROK_WATCHDOG_MAX_EXECUTION_TIME,
619620
TDigestExecutionHint.SETTING,

server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,18 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
116116
Setting.Property.NodeScope
117117
);
118118

119+
/**
120+
* Maximum number of shards to try and close concurrently. Defaults to the smaller of {@code node.processors} and {@code 10}, but can be
121+
* set to any positive integer.
122+
*/
123+
public static final Setting<Integer> CONCURRENT_SHARD_CLOSE_LIMIT = Setting.intSetting(
124+
"indices.store.max_concurrent_closing_shards",
125+
settings -> Integer.toString(Math.min(10, EsExecutors.NODE_PROCESSORS_SETTING.get(settings).roundUp())),
126+
1,
127+
Integer.MAX_VALUE,
128+
Setting.Property.NodeScope
129+
);
130+
119131
final AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>> indicesService;
120132
private final ClusterService clusterService;
121133
private final ThreadPool threadPool;
@@ -1347,7 +1359,7 @@ enum IndexRemovalReason {
13471359
}
13481360
}
13491361

1350-
private static class ShardCloseExecutor implements Executor {
1362+
static class ShardCloseExecutor implements Executor {
13511363

13521364
private final ThrottledTaskRunner throttledTaskRunner;
13531365

@@ -1360,8 +1372,11 @@ private static class ShardCloseExecutor implements Executor {
13601372
// can't close the old ones down fast enough. Maybe we could block or throttle new shards starting while old shards are still
13611373
// shutting down, given that starting new shards is already async. Since this seems unlikely in practice, we opt for the simple
13621374
// approach here.
1363-
final var maxThreads = Math.max(EsExecutors.NODE_PROCESSORS_SETTING.get(settings).roundUp(), 10);
1364-
throttledTaskRunner = new ThrottledTaskRunner(IndicesClusterStateService.class.getCanonicalName(), maxThreads, delegate);
1375+
throttledTaskRunner = new ThrottledTaskRunner(
1376+
IndicesClusterStateService.class.getCanonicalName(),
1377+
CONCURRENT_SHARD_CLOSE_LIMIT.get(settings),
1378+
delegate
1379+
);
13651380
}
13661381

13671382
@Override
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.indices.cluster;
11+
12+
import org.elasticsearch.common.settings.Settings;
13+
import org.elasticsearch.common.util.concurrent.EsExecutors;
14+
import org.elasticsearch.test.ESTestCase;
15+
16+
import java.util.ArrayList;
17+
import java.util.concurrent.atomic.AtomicInteger;
18+
19+
public class ShardCloseExecutorTests extends ESTestCase {
20+
21+
public void testThrottling() {
22+
// This defaults to the number of CPUs of the machine running the tests which could be either side of 10.
23+
final var defaultProcessors = EsExecutors.NODE_PROCESSORS_SETTING.get(Settings.EMPTY).roundUp();
24+
ensureThrottling(Math.min(10, defaultProcessors), Settings.EMPTY);
25+
26+
if (10 < defaultProcessors) {
27+
ensureThrottling(
28+
10,
29+
Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), between(10, defaultProcessors - 1)).build()
30+
);
31+
} // else we cannot run this check, the machine running the tests doesn't have enough CPUs
32+
33+
if (1 < defaultProcessors) {
34+
final var fewProcessors = between(1, Math.min(10, defaultProcessors - 1));
35+
ensureThrottling(fewProcessors, Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), fewProcessors).build());
36+
} // else we cannot run this check, the machine running the tests has less than 2 whole CPUs (and we already tested the 1 case)
37+
38+
// but in any case we can override the throttle regardless of its default value
39+
final var override = between(1, defaultProcessors * 2);
40+
ensureThrottling(
41+
override,
42+
Settings.builder().put(IndicesClusterStateService.CONCURRENT_SHARD_CLOSE_LIMIT.getKey(), override).build()
43+
);
44+
}
45+
46+
private static void ensureThrottling(int expectedLimit, Settings settings) {
47+
final var tasksToRun = new ArrayList<Runnable>(expectedLimit + 1);
48+
final var executor = new IndicesClusterStateService.ShardCloseExecutor(settings, tasksToRun::add);
49+
final var runCount = new AtomicInteger();
50+
51+
// enqueue one more task than the throttling limit
52+
for (int i = 0; i < expectedLimit + 1; i++) {
53+
executor.execute(runCount::incrementAndGet);
54+
}
55+
56+
// check that we submitted tasks up to the expected limit, holding back the final task behind the throttle for now
57+
assertEquals(expectedLimit, tasksToRun.size());
58+
59+
// now execute all the tasks one by one
60+
for (int i = 0; i < expectedLimit + 1; i++) {
61+
assertEquals(i, runCount.get());
62+
tasksToRun.get(i).run();
63+
assertEquals(i + 1, runCount.get());
64+
65+
// executing the first task enqueues the final task
66+
assertEquals(expectedLimit + 1, tasksToRun.size());
67+
}
68+
}
69+
}

0 commit comments

Comments
 (0)