Skip to content

Commit 5ef1c35

Browse files
authored
allocation: create separate limit for frozen tier concurrent rebalance (#135243)
The concurrent rebalance limit prevents too many shard relocations from happening at once. This works well in limiting active shards on the hot tier, so capacity for indexing is reserved. Given that the frozen tier is an archive, capacity separation is less of a concern and more rebalancing activity can occur without service degradation. This change creates a separate variable for specifying the frozen-tier limit. Fixes: ES-11303
1 parent 087021e commit 5ef1c35

File tree

7 files changed

+886
-71
lines changed

7 files changed

+886
-71
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ public class RoutingNodes implements Iterable<RoutingNode> {
8888

8989
private int relocatingShards = 0;
9090

91+
private int relocatingFrozenShards = 0;
92+
9193
private final Map<String, Set<String>> attributeValuesByAttribute;
9294
private final Map<String, Recoveries> recoveriesPerNode;
9395

@@ -152,6 +154,9 @@ private RoutingNodes(GlobalRoutingTable routingTable, DiscoveryNodes discoveryNo
152154
assignedShardsAdd(shard);
153155
if (shard.relocating()) {
154156
relocatingShards++;
157+
if (isDedicatedFrozenNode(shard.currentNodeId())) {
158+
relocatingFrozenShards++;
159+
}
155160
ShardRouting targetShardRouting = shard.getTargetRelocatingShard();
156161
addInitialRecovery(targetShardRouting, indexShard.primary);
157162
// LinkedHashMap to preserve order.
@@ -192,6 +197,7 @@ private RoutingNodes(RoutingNodes routingNodes) {
192197
this.inactivePrimaryCount = routingNodes.inactivePrimaryCount;
193198
this.inactiveShardCount = routingNodes.inactiveShardCount;
194199
this.relocatingShards = routingNodes.relocatingShards;
200+
this.relocatingFrozenShards = routingNodes.relocatingFrozenShards;
195201
this.attributeValuesByAttribute = Collections.synchronizedMap(Maps.copyOf(routingNodes.attributeValuesByAttribute, HashSet::new));
196202
this.recoveriesPerNode = Maps.copyOf(routingNodes.recoveriesPerNode, Recoveries::copy);
197203
}
@@ -343,6 +349,18 @@ public int getRelocatingShardCount() {
343349
return relocatingShards;
344350
}
345351

352+
public boolean isDedicatedFrozenNode(String nodeId) {
353+
RoutingNode node = nodesToShards.get(nodeId);
354+
if (node != null && node.node() != null && node.node().isDedicatedFrozenNode()) {
355+
return true;
356+
}
357+
return false;
358+
}
359+
360+
public int getRelocatingFrozenShardCount() {
361+
return relocatingFrozenShards;
362+
}
363+
346364
/**
347365
* Returns all shards that are not in the state UNASSIGNED with the same shard
348366
* ID as the given shard.
@@ -478,6 +496,9 @@ public Tuple<ShardRouting, ShardRouting> relocateShard(
478496
) {
479497
ensureMutable();
480498
relocatingShards++;
499+
if (isDedicatedFrozenNode(nodeId)) {
500+
relocatingFrozenShards++;
501+
}
481502
ShardRouting source = startedShard.relocate(nodeId, expectedShardSize);
482503
ShardRouting target = source.getTargetRelocatingShard();
483504
updateAssigned(startedShard, source);
@@ -726,6 +747,9 @@ private ShardRouting started(ShardRouting shard, long expectedShardSize) {
726747
*/
727748
private ShardRouting cancelRelocation(ShardRouting shard) {
728749
relocatingShards--;
750+
if (isDedicatedFrozenNode(shard.currentNodeId())) {
751+
relocatingFrozenShards--;
752+
}
729753
ShardRouting cancelledShard = shard.cancelRelocation();
730754
updateAssigned(shard, cancelledShard);
731755
return cancelledShard;
@@ -881,6 +905,7 @@ public boolean equals(Object o) {
881905
&& inactivePrimaryCount == that.inactivePrimaryCount
882906
&& inactiveShardCount == that.inactiveShardCount
883907
&& relocatingShards == that.relocatingShards
908+
&& relocatingFrozenShards == that.relocatingFrozenShards
884909
&& nodesToShards.equals(that.nodesToShards)
885910
&& unassignedShards.equals(that.unassignedShards)
886911
&& assignedShards.equals(that.assignedShards)
@@ -898,6 +923,7 @@ public int hashCode() {
898923
inactivePrimaryCount,
899924
inactiveShardCount,
900925
relocatingShards,
926+
relocatingFrozenShards,
901927
attributeValuesByAttribute,
902928
recoveriesPerNode
903929
);

server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java

Lines changed: 111 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,17 @@
2020
/**
2121
* Similar to the {@link ClusterRebalanceAllocationDecider} this
2222
* {@link AllocationDecider} controls the number of currently in-progress
23-
* re-balance (relocation) operations and restricts node allocations if the
24-
* configured threshold is reached. The default number of concurrent rebalance
25-
* operations is set to {@code 2}
23+
* re-balance (shard relocation) operations and restricts node allocations
24+
* if the configured threshold is reached. Frozen and non-frozen shards are
25+
* considered separately. The default number of concurrent rebalance operations
26+
* is set to {@code 2} for non-frozen shards. For frozen shards, the default is
27+
* the same setting as non-frozen shards, until set explicitly.
2628
* <p>
2729
* Re-balance operations can be controlled in real-time via the cluster update API using
28-
* {@code cluster.routing.allocation.cluster_concurrent_rebalance}. Iff this
29-
* setting is set to {@code -1} the number of concurrent re-balance operations
30-
* are unlimited.
30+
* {@code cluster.routing.allocation.cluster_concurrent_rebalance} and
31+
* {@code cluster.routing.allocation.cluster_concurrent_frozen_rebalance}.
32+
* Iff either setting is set to {@code -1} the number of concurrent re-balance operations
33+
* within the setting's category (frozen or non-frozen) are unlimited.
3134
*/
3235
public class ConcurrentRebalanceAllocationDecider extends AllocationDecider {
3336

@@ -44,21 +47,91 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider {
4447
);
4548
private volatile int clusterConcurrentRebalance;
4649

50+
/**
51+
* Same as cluster_concurrent_rebalance, but applies separately to frozen tier shards
52+
*
53+
* Defaults to the same value as normal concurrent rebalance, if unspecified
54+
*/
55+
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING = Setting.intSetting(
56+
"cluster.routing.allocation.cluster_concurrent_frozen_rebalance",
57+
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING,
58+
-1,
59+
Property.Dynamic,
60+
Property.NodeScope
61+
);
62+
private volatile int clusterConcurrentFrozenRebalance;
63+
4764
public ConcurrentRebalanceAllocationDecider(ClusterSettings clusterSettings) {
4865
clusterSettings.initializeAndWatch(
4966
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING,
5067
this::setClusterConcurrentRebalance
5168
);
52-
logger.debug("using [cluster_concurrent_rebalance] with [{}]", clusterConcurrentRebalance);
69+
clusterSettings.initializeAndWatch(
70+
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING,
71+
this::setClusterConcurrentFrozenRebalance
72+
);
73+
logger.debug(
74+
"using [cluster_concurrent_rebalance] with [concurrent_rebalance={}, concurrent_frozen_rebalance={}]",
75+
clusterConcurrentRebalance,
76+
clusterConcurrentFrozenRebalance
77+
);
5378
}
5479

5580
private void setClusterConcurrentRebalance(int concurrentRebalance) {
5681
clusterConcurrentRebalance = concurrentRebalance;
5782
}
5883

84+
private void setClusterConcurrentFrozenRebalance(int concurrentFrozenRebalance) {
85+
clusterConcurrentFrozenRebalance = concurrentFrozenRebalance;
86+
}
87+
5988
@Override
6089
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
61-
return canRebalance(allocation);
90+
int relocatingFrozenShards = allocation.routingNodes().getRelocatingFrozenShardCount();
91+
if (allocation.routingNodes().isDedicatedFrozenNode(shardRouting.currentNodeId())) {
92+
if (clusterConcurrentFrozenRebalance == -1) {
93+
return allocation.decision(Decision.YES, NAME, "unlimited concurrent frozen rebalances are allowed");
94+
}
95+
if (relocatingFrozenShards >= clusterConcurrentFrozenRebalance) {
96+
return allocation.decision(
97+
Decision.THROTTLE,
98+
NAME,
99+
"reached the limit of concurrently rebalancing frozen shards [%d], cluster setting [%s=%d]",
100+
relocatingFrozenShards,
101+
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING.getKey(),
102+
clusterConcurrentFrozenRebalance
103+
);
104+
}
105+
return allocation.decision(
106+
Decision.YES,
107+
NAME,
108+
"below threshold [%d] for concurrent frozen rebalances, current frozen rebalance shard count [%d]",
109+
clusterConcurrentFrozenRebalance,
110+
relocatingFrozenShards
111+
);
112+
} else {
113+
int relocatingShards = allocation.routingNodes().getRelocatingShardCount() - relocatingFrozenShards;
114+
if (clusterConcurrentRebalance == -1) {
115+
return allocation.decision(Decision.YES, NAME, "unlimited concurrent rebalances are allowed");
116+
}
117+
if (relocatingShards >= clusterConcurrentRebalance) {
118+
return allocation.decision(
119+
Decision.THROTTLE,
120+
NAME,
121+
"reached the limit of concurrently rebalancing shards [%d], cluster setting [%s=%d]",
122+
relocatingShards,
123+
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(),
124+
clusterConcurrentRebalance
125+
);
126+
}
127+
return allocation.decision(
128+
Decision.YES,
129+
NAME,
130+
"below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]",
131+
clusterConcurrentRebalance,
132+
relocatingShards
133+
);
134+
}
62135
}
63136

64137
/**
@@ -68,33 +141,52 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca
68141
*/
69142
@Override
70143
public Decision canRebalance(RoutingAllocation allocation) {
144+
int relocatingFrozenShards = allocation.routingNodes().getRelocatingFrozenShardCount();
71145
int relocatingShards = allocation.routingNodes().getRelocatingShardCount();
72146
if (allocation.isSimulating() && relocatingShards >= 2) {
73147
// BalancedShardAllocator is prone to perform unnecessary moves when cluster_concurrent_rebalance is set to high values (>2).
74148
// (See https://github.com/elastic/elasticsearch/issues/87279)
75149
// Above allocator is used in DesiredBalanceComputer. Since we do not move actual shard data during calculation
76150
// it is possible to artificially set above setting to 2 to avoid unnecessary moves in desired balance.
151+
// Separately: keep overall limit in simulation to two including frozen shards
77152
return allocation.decision(Decision.THROTTLE, NAME, "allocation should move one shard at the time when simulating");
78153
}
79-
if (clusterConcurrentRebalance == -1) {
80-
return allocation.decision(Decision.YES, NAME, "unlimited concurrent rebalances are allowed");
81-
}
82-
if (relocatingShards >= clusterConcurrentRebalance) {
154+
155+
// separate into frozen/non-frozen counts
156+
relocatingShards = relocatingShards - relocatingFrozenShards;
157+
158+
// either frozen or non-frozen having some allowance before their limit means the allocator has room to rebalance
159+
if (clusterConcurrentRebalance == -1 || relocatingShards < clusterConcurrentRebalance) {
83160
return allocation.decision(
84-
Decision.THROTTLE,
161+
Decision.YES,
85162
NAME,
86-
"reached the limit of concurrently rebalancing shards [%d], cluster setting [%s=%d]",
87-
relocatingShards,
163+
"below threshold [%s=%d] for concurrent rebalances, current rebalance shard count [%d]",
88164
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(),
89-
clusterConcurrentRebalance
165+
clusterConcurrentRebalance,
166+
relocatingShards
167+
);
168+
}
169+
if (clusterConcurrentFrozenRebalance == -1 || relocatingFrozenShards < clusterConcurrentFrozenRebalance) {
170+
return allocation.decision(
171+
Decision.YES,
172+
NAME,
173+
"below threshold [%s=%d] for concurrent frozen rebalances, current frozen rebalance shard count [%d]",
174+
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING.getKey(),
175+
clusterConcurrentFrozenRebalance,
176+
relocatingFrozenShards
90177
);
91178
}
92179
return allocation.decision(
93-
Decision.YES,
180+
Decision.THROTTLE,
94181
NAME,
95-
"below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]",
182+
"reached the limit of concurrently rebalancing shards [%d] for concurrent rebalances, cluster setting [%s=%d], "
183+
+ "and [%d] for concurrent frozen rebalances, frozen cluster setting [%s=%d]",
184+
relocatingShards,
185+
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(),
96186
clusterConcurrentRebalance,
97-
relocatingShards
187+
relocatingFrozenShards,
188+
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING.getKey(),
189+
clusterConcurrentFrozenRebalance
98190
);
99191
}
100192
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ public void apply(Settings value, Settings current, Settings previous) {
240240
BreakerSettings.CIRCUIT_BREAKER_TYPE,
241241
ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING,
242242
ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING,
243+
ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING,
243244
EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING,
244245
EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING,
245246
FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP_SETTING,

0 commit comments

Comments
 (0)