Skip to content

Commit b7922ff

Browse files
DiannaHohenseejoshua-adams-1
authored andcommitted
Implement WriteLoadConstraintDecider#canAllocate (elastic#132041)
The initial version of the write load decider with #canAllocate implemented. Checks whether the new node assignment for a shard would exceed the node's simulated utilization threshold. Closes ES-12564
1 parent af0c58e commit b7922ff

File tree

10 files changed

+509
-15
lines changed

10 files changed

+509
-15
lines changed

server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,9 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() {
347347
var settings = Settings.builder()
348348
.put(
349349
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
350-
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
350+
randomBoolean()
351+
? WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
352+
: WriteLoadConstraintSettings.WriteLoadDeciderStatus.LOW_THRESHOLD_ONLY
351353
)
352354
// Manually control cluster info refreshes
353355
.put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m")

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,9 @@ public void testNodeWriteLoadsArePresent() {
319319
Settings.builder()
320320
.put(
321321
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
322-
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
322+
randomBoolean()
323+
? WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
324+
: WriteLoadConstraintSettings.WriteLoadDeciderStatus.LOW_THRESHOLD_ONLY
323325
)
324326
.build()
325327
);
@@ -376,7 +378,9 @@ public void testShardWriteLoadsArePresent() {
376378
Settings.builder()
377379
.put(
378380
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
379-
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
381+
randomBoolean()
382+
? WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
383+
: WriteLoadConstraintSettings.WriteLoadDeciderStatus.LOW_THRESHOLD_ONLY
380384
)
381385
.build()
382386
);

server/src/main/java/org/elasticsearch/cluster/ClusterModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
6767
import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
6868
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
69+
import org.elasticsearch.cluster.routing.allocation.decider.WriteLoadConstraintDecider;
6970
import org.elasticsearch.cluster.service.ClusterService;
7071
import org.elasticsearch.common.io.stream.NamedWriteable;
7172
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
@@ -446,6 +447,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(
446447
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
447448
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
448449
addAllocationDecider(deciders, new NodeShutdownAllocationDecider());
450+
addAllocationDecider(deciders, new WriteLoadConstraintDecider(clusterSettings));
449451
addAllocationDecider(deciders, new NodeReplacementAllocationDecider());
450452
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
451453
addAllocationDecider(deciders, new SameShardAllocationDecider(clusterSettings));

server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ void execute() {
219219
logger.trace("starting async refresh");
220220

221221
try (var ignoredRefs = fetchRefs) {
222-
maybeFetchIndicesStats(diskThresholdEnabled || writeLoadConstraintEnabled == WriteLoadDeciderStatus.ENABLED);
222+
maybeFetchIndicesStats(diskThresholdEnabled || writeLoadConstraintEnabled.atLeastLowThresholdEnabled());
223223
maybeFetchNodeStats(diskThresholdEnabled || estimatedHeapThresholdEnabled);
224224
maybeFetchNodesEstimatedHeapUsage(estimatedHeapThresholdEnabled);
225225
maybeFetchNodesUsageStatsForThreadPools(writeLoadConstraintEnabled);
@@ -262,7 +262,7 @@ private void maybeFetchNodesEstimatedHeapUsage(boolean shouldFetch) {
262262
}
263263

264264
private void maybeFetchNodesUsageStatsForThreadPools(WriteLoadDeciderStatus writeLoadConstraintEnabled) {
265-
if (writeLoadConstraintEnabled != WriteLoadDeciderStatus.DISABLED) {
265+
if (writeLoadConstraintEnabled.atLeastLowThresholdEnabled()) {
266266
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
267267
fetchNodesUsageStatsForThreadPools();
268268
}
@@ -313,7 +313,7 @@ private void fetchIndicesStats() {
313313
// This returns the shard sizes on disk
314314
indicesStatsRequest.store(true);
315315
}
316-
if (writeLoadConstraintEnabled == WriteLoadDeciderStatus.ENABLED) {
316+
if (writeLoadConstraintEnabled.atLeastLowThresholdEnabled()) {
317317
// This returns the shard write-loads
318318
indicesStatsRequest.indexing(true);
319319
}

server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,32 @@ private static NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoo
8888
.get(ThreadPool.Names.WRITE);
8989
return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
9090
writeThreadPoolStats.totalThreadPoolThreads(),
91-
(float) Math.max(
92-
(writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats.totalThreadPoolThreads())),
93-
0.0
91+
updateNodeUtilizationWithShardMovements(
92+
writeThreadPoolStats.averageThreadPoolUtilization(),
93+
(float) writeLoadDelta,
94+
writeThreadPoolStats.totalThreadPoolThreads()
9495
),
9596
writeThreadPoolStats.maxThreadPoolQueueLatencyMillis()
9697
);
9798
}
99+
100+
/**
101+
* The {@code nodeUtilization} is the average utilization per thread for some duration of time. The {@code shardWriteLoadDelta} is the
102+
* sum of shards' total execution time. Dividing the shards total execution time by the number of threads provides the average
103+
* utilization of each write thread for those shards. The change in shard load can then be added to the node utilization.
104+
*
105+
* @param nodeUtilization The current node-level write load percent utilization.
106+
* @param shardWriteLoadDelta The change in shard(s) execution time across all threads. This can be positive or negative depending on
107+
* whether shards were moved onto the node or off of the node.
108+
* @param numberOfWriteThreads The number of threads available in the node's write thread pool.
109+
* @return The new node-level write load percent utilization after adding the shard write load delta.
110+
*/
111+
public static float updateNodeUtilizationWithShardMovements(
112+
float nodeUtilization,
113+
float shardWriteLoadDelta,
114+
int numberOfWriteThreads
115+
) {
116+
float newNodeUtilization = nodeUtilization + (shardWriteLoadDelta / numberOfWriteThreads);
117+
return (float) Math.max(newNodeUtilization, 0.0);
118+
}
98119
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,30 @@ public enum WriteLoadDeciderStatus {
3030
*/
3131
DISABLED,
3232
/**
33-
* Only the low-threshold is enabled (write-load will not trigger rebalance)
33+
* Only the low write low threshold, to try to avoid allocating to a node exceeding
34+
* {@link #WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING}. Write-load hot-spot will not trigger rebalancing.
3435
*/
35-
LOW_ONLY,
36+
LOW_THRESHOLD_ONLY,
3637
/**
37-
* The decider is enabled
38+
* All write load decider development work is turned on.
3839
*/
39-
ENABLED
40+
ENABLED;
41+
42+
public boolean fullyEnabled() {
43+
return this == ENABLED;
44+
}
45+
46+
public boolean notFullyEnabled() {
47+
return this != ENABLED;
48+
}
49+
50+
public boolean atLeastLowThresholdEnabled() {
51+
return this != DISABLED;
52+
}
53+
54+
public boolean disabled() {
55+
return this == DISABLED;
56+
}
4057
}
4158

4259
public static final Setting<WriteLoadDeciderStatus> WRITE_LOAD_DECIDER_ENABLED_SETTING = Setting.enumSetting(
@@ -102,10 +119,16 @@ public enum WriteLoadDeciderStatus {
102119

103120
WriteLoadDeciderStatus writeLoadDeciderStatus;
104121
TimeValue writeLoadDeciderRerouteIntervalSetting;
122+
double writeThreadPoolHighUtilizationThresholdSetting;
105123

106-
WriteLoadConstraintSettings(ClusterSettings clusterSettings) {
124+
public WriteLoadConstraintSettings(ClusterSettings clusterSettings) {
107125
clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_ENABLED_SETTING, this::setWriteLoadConstraintEnabled);
108126
clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING, this::setWriteLoadDeciderRerouteIntervalSetting);
127+
clusterSettings.initializeAndWatch(
128+
WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING,
129+
this::setWriteThreadPoolHighUtilizationThresholdSetting
130+
);
131+
109132
};
110133

111134
private void setWriteLoadConstraintEnabled(WriteLoadDeciderStatus status) {
@@ -120,7 +143,15 @@ public TimeValue getWriteLoadDeciderRerouteIntervalSetting() {
120143
return this.writeLoadDeciderRerouteIntervalSetting;
121144
}
122145

146+
public double getWriteThreadPoolHighUtilizationThresholdSetting() {
147+
return this.writeThreadPoolHighUtilizationThresholdSetting;
148+
}
149+
123150
private void setWriteLoadDeciderRerouteIntervalSetting(TimeValue timeValue) {
124151
this.writeLoadDeciderRerouteIntervalSetting = timeValue;
125152
}
153+
154+
private void setWriteThreadPoolHighUtilizationThresholdSetting(RatioValue percent) {
155+
this.writeThreadPoolHighUtilizationThresholdSetting = percent.getAsRatio();
156+
}
126157
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing.allocation.decider;
11+
12+
import org.apache.logging.log4j.LogManager;
13+
import org.apache.logging.log4j.Logger;
14+
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools.ThreadPoolUsageStats;
15+
import org.elasticsearch.cluster.metadata.IndexMetadata;
16+
import org.elasticsearch.cluster.routing.RoutingNode;
17+
import org.elasticsearch.cluster.routing.ShardMovementWriteLoadSimulator;
18+
import org.elasticsearch.cluster.routing.ShardRouting;
19+
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
20+
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
21+
import org.elasticsearch.common.settings.ClusterSettings;
22+
import org.elasticsearch.core.Strings;
23+
import org.elasticsearch.threadpool.ThreadPool;
24+
25+
/**
26+
* Decides whether shards can be allocated to cluster nodes, or can remain on cluster nodes, based on the target node's current write thread
27+
* pool usage stats and any candidate shard's write load estimate.
28+
*/
29+
public class WriteLoadConstraintDecider extends AllocationDecider {
30+
private static final Logger logger = LogManager.getLogger(WriteLoadConstraintDecider.class);
31+
32+
public static final String NAME = "write_load";
33+
34+
private final WriteLoadConstraintSettings writeLoadConstraintSettings;
35+
36+
public WriteLoadConstraintDecider(ClusterSettings clusterSettings) {
37+
this.writeLoadConstraintSettings = new WriteLoadConstraintSettings(clusterSettings);
38+
}
39+
40+
@Override
41+
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
42+
if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled().disabled()) {
43+
return Decision.single(Decision.Type.YES, NAME, "Decider is disabled");
44+
}
45+
46+
// Check whether the shard being relocated has any write load estimate. If it does not, then this decider has no opinion.
47+
var allShardWriteLoads = allocation.clusterInfo().getShardWriteLoads();
48+
var shardWriteLoad = allShardWriteLoads.get(shardRouting.shardId());
49+
if (shardWriteLoad == null || shardWriteLoad == 0) {
50+
return Decision.single(Decision.Type.YES, NAME, "Shard has no estimated write load. Decider takes no action.");
51+
}
52+
53+
var allNodeUsageStats = allocation.clusterInfo().getNodeUsageStatsForThreadPools();
54+
var nodeUsageStatsForThreadPools = allNodeUsageStats.get(node.nodeId());
55+
if (nodeUsageStatsForThreadPools == null) {
56+
// No node-level thread pool usage stats were reported for this node. Let's assume this is OK and that the simulator will handle
57+
// setting a node-level write load for this node after this shard is assigned.
58+
return Decision.single(Decision.Type.YES, NAME, "The node has no write load estimate. Decider takes no action.");
59+
}
60+
61+
assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().isEmpty() == false;
62+
assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE) != null;
63+
var nodeWriteThreadPoolStats = nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE);
64+
var nodeWriteThreadPoolLoadThreshold = writeLoadConstraintSettings.getWriteThreadPoolHighUtilizationThresholdSetting();
65+
if (nodeWriteThreadPoolStats.averageThreadPoolUtilization() >= nodeWriteThreadPoolLoadThreshold) {
66+
// The node's write thread pool usage stats already show high utilization above the threshold for accepting new shards.
67+
String explain = Strings.format(
68+
"Node [%s] with write thread pool utilization [%.2f] already exceeds the high utilization threshold of [%f]. Cannot "
69+
+ "allocate shard [%s] to node without risking increased write latencies.",
70+
node.nodeId(),
71+
nodeWriteThreadPoolStats.averageThreadPoolUtilization(),
72+
nodeWriteThreadPoolLoadThreshold,
73+
shardRouting.shardId()
74+
);
75+
logger.debug(explain);
76+
return Decision.single(Decision.Type.NO, NAME, explain);
77+
}
78+
79+
if (calculateShardMovementChange(nodeWriteThreadPoolStats, shardWriteLoad) >= nodeWriteThreadPoolLoadThreshold) {
80+
// The node's write thread pool usage would be raised above the high utilization threshold with assignment of the new shard.
81+
// This could lead to a hot spot on this node and is undesirable.
82+
String explain = Strings.format(
83+
"The high utilization threshold of [%f] would be exceeded on node [%s] with utilization [%.2f] if shard [%s] with "
84+
+ "estimated additional utilisation [%.5f] (write load [%.5f] / threads [%d]) were assigned to it. Cannot allocate "
85+
+ "shard to node without risking increased write latencies.",
86+
nodeWriteThreadPoolLoadThreshold,
87+
node.nodeId(),
88+
nodeWriteThreadPoolStats.averageThreadPoolUtilization(),
89+
shardRouting.shardId(),
90+
shardWriteLoad / nodeWriteThreadPoolStats.totalThreadPoolThreads(),
91+
shardWriteLoad,
92+
nodeWriteThreadPoolStats.totalThreadPoolThreads()
93+
);
94+
logger.debug(explain);
95+
return Decision.single(Decision.Type.NO, NAME, explain);
96+
}
97+
98+
return Decision.YES;
99+
}
100+
101+
@Override
102+
public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
103+
if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled().notFullyEnabled()) {
104+
return Decision.single(Decision.Type.YES, NAME, "canRemain() is not enabled");
105+
}
106+
107+
// TODO: implement
108+
109+
return Decision.single(Decision.Type.YES, NAME, "canRemain() is not yet implemented");
110+
}
111+
112+
/**
113+
* Calculates the change to the node's write thread pool utilization percentage if the shard is added to the node.
114+
* Returns the percent thread pool utilization change.
115+
*/
116+
private float calculateShardMovementChange(ThreadPoolUsageStats nodeWriteThreadPoolStats, double shardWriteLoad) {
117+
assert shardWriteLoad > 0;
118+
return ShardMovementWriteLoadSimulator.updateNodeUtilizationWithShardMovements(
119+
nodeWriteThreadPoolStats.averageThreadPoolUtilization(),
120+
(float) shardWriteLoad,
121+
nodeWriteThreadPoolStats.totalThreadPoolThreads()
122+
);
123+
}
124+
}

server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
3737
import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
3838
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
39+
import org.elasticsearch.cluster.routing.allocation.decider.WriteLoadConstraintDecider;
3940
import org.elasticsearch.cluster.service.ClusterService;
4041
import org.elasticsearch.common.settings.ClusterSettings;
4142
import org.elasticsearch.common.settings.IndexScopedSettings;
@@ -278,6 +279,7 @@ public void testAllocationDeciderOrder() {
278279
SnapshotInProgressAllocationDecider.class,
279280
RestoreInProgressAllocationDecider.class,
280281
NodeShutdownAllocationDecider.class,
282+
WriteLoadConstraintDecider.class,
281283
NodeReplacementAllocationDecider.class,
282284
FilterAllocationDecider.class,
283285
SameShardAllocationDecider.class,

server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ public void testScheduling() {
6262
.put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_ESTIMATED_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true)
6363
.put(
6464
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
65-
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
65+
randomBoolean()
66+
? WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
67+
: WriteLoadConstraintSettings.WriteLoadDeciderStatus.LOW_THRESHOLD_ONLY
6668
);
6769
if (randomBoolean()) {
6870
settingsBuilder.put(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), randomIntBetween(10000, 60000) + "ms");

0 commit comments

Comments
 (0)