Skip to content

Commit b599f4a

Browse files
authored
Merge branch 'main' into cps-move-project-routing-info
2 parents 39c88aa + afd10f4 commit b599f4a

File tree

6 files changed

+262
-39
lines changed

6 files changed

+262
-39
lines changed

muted-tests.yml

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -549,12 +549,6 @@ tests:
549549
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
550550
method: test {p0=transform/transforms_start_stop/Test stop transform with force and wait_for_checkpoint true}
551551
issue: https://github.com/elastic/elasticsearch/issues/135135
552-
- class: org.elasticsearch.lucene.RollingUpgradeSearchableSnapshotIndexCompatibilityIT
553-
method: testSearchableSnapshotUpgrade {p0=[9.2.0, 9.2.0, 9.2.0]}
554-
issue: https://github.com/elastic/elasticsearch/issues/135150
555-
- class: org.elasticsearch.lucene.RollingUpgradeSearchableSnapshotIndexCompatibilityIT
556-
method: testMountSearchableSnapshot {p0=[9.2.0, 9.2.0, 9.2.0]}
557-
issue: https://github.com/elastic/elasticsearch/issues/135151
558552
- class: org.elasticsearch.discovery.ClusterDisruptionIT
559553
method: testAckedIndexing
560554
issue: https://github.com/elastic/elasticsearch/issues/117024
@@ -591,27 +585,12 @@ tests:
591585
- class: org.elasticsearch.xpack.esql.expression.function.scalar.score.DecayTests
592586
method: "testEvaluateBlockWithoutNulls {TestCase=<date_nanos>, <date_nanos>, <time_duration>, <_source> #12}"
593587
issue: https://github.com/elastic/elasticsearch/issues/135394
594-
- class: org.elasticsearch.upgrades.DataStreamsUpgradeIT
595-
method: testDataStreamValidationDoesNotBreakUpgrade
596-
issue: https://github.com/elastic/elasticsearch/issues/135406
597-
- class: org.elasticsearch.upgrades.IndexingIT
598-
method: testIndexing
599-
issue: https://github.com/elastic/elasticsearch/issues/135407
600-
- class: org.elasticsearch.upgrades.QueryableBuiltInRolesUpgradeIT
601-
method: testBuiltInRolesSyncedOnClusterUpgrade
602-
issue: https://github.com/elastic/elasticsearch/issues/135194
603588
- class: org.elasticsearch.gradle.TestClustersPluginFuncTest
604589
method: override jdk usage via ES_JAVA_HOME for known jdk os incompatibilities
605590
issue: https://github.com/elastic/elasticsearch/issues/135413
606591
- class: org.elasticsearch.xpack.esql.qa.single_node.EsqlSpecIT
607592
method: test {csv-spec:spatial_shapes.ConvertCartesianShapeFromStringParseError}
608593
issue: https://github.com/elastic/elasticsearch/issues/135455
609-
- class: org.elasticsearch.upgrades.SearchableSnapshotsRollingUpgradeIT
610-
method: testBlobStoreCacheWithPartialCopyInMixedVersions
611-
issue: https://github.com/elastic/elasticsearch/issues/135473
612-
- class: org.elasticsearch.upgrades.SearchableSnapshotsRollingUpgradeIT
613-
method: testBlobStoreCacheWithFullCopyInMixedVersions
614-
issue: https://github.com/elastic/elasticsearch/issues/135474
615594
- class: org.elasticsearch.xpack.esql.qa.multi_node.GenerativeIT
616595
method: test
617596
issue: https://github.com/elastic/elasticsearch/issues/134407
@@ -636,6 +615,9 @@ tests:
636615
- class: org.elasticsearch.xpack.esql.heap_attack.HeapAttackIT
637616
method: testAggTooManyMvLongs
638617
issue: https://github.com/elastic/elasticsearch/issues/135585
618+
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeIT
619+
method: test
620+
issue: https://github.com/elastic/elasticsearch/issues/134407
639621

640622
# Examples:
641623
#

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

Lines changed: 110 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -140,25 +140,83 @@ public BalancedShardsAllocator(
140140

141141
@Override
142142
public void allocate(RoutingAllocation allocation) {
143+
assert allocation.isSimulating() == false || balancerSettings.completeEarlyOnShardAssignmentChange()
144+
: "inconsistent states: isSimulating ["
145+
+ allocation.isSimulating()
146+
+ "] vs completeEarlyOnShardAssignmentChange ["
147+
+ balancerSettings.completeEarlyOnShardAssignmentChange()
148+
+ "]";
143149
if (allocation.metadata().hasAnyIndices()) {
144150
// must not use licensed features when just starting up
145151
writeLoadForecaster.refreshLicense();
146152
}
147153

148154
assert allocation.ignoreDisable() == false;
155+
assert allocation.isSimulating() == false || allocation.routingNodes().hasInactiveShards() == false
156+
: "expect no initializing shard, but got " + allocation.routingNodes();
157+
// TODO: ES-12943 cannot assert the following because shards moved by commands are not simulated promptly in DesiredBalanceComputer
158+
// assert allocation.isSimulating() == false || allocation.routingNodes().getRelocatingShardCount() == 0
159+
// : "expect no relocating shard, but got " + allocation.routingNodes();
149160

150161
if (allocation.routingNodes().size() == 0) {
151162
failAllocationOfNewPrimaries(allocation);
152163
return;
153164
}
154165
final BalancingWeights balancingWeights = balancingWeightsFactory.create();
155-
final Balancer balancer = new Balancer(writeLoadForecaster, allocation, balancerSettings.getThreshold(), balancingWeights);
156-
balancer.allocateUnassigned();
157-
balancer.moveShards();
158-
balancer.balance();
166+
final Balancer balancer = new Balancer(
167+
writeLoadForecaster,
168+
allocation,
169+
balancerSettings.getThreshold(),
170+
balancingWeights,
171+
balancerSettings.completeEarlyOnShardAssignmentChange()
172+
);
173+
174+
boolean shardAssigned = false, shardMoved = false, shardBalanced = false;
175+
try {
176+
shardAssigned = balancer.allocateUnassigned();
177+
if (shardAssigned && balancerSettings.completeEarlyOnShardAssignmentChange()) {
178+
return;
179+
}
159180

160-
// Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy.
161-
collectAndRecordNodeWeightStats(balancer, balancingWeights, allocation);
181+
shardMoved = balancer.moveShards();
182+
if (shardMoved && balancerSettings.completeEarlyOnShardAssignmentChange()) {
183+
return;
184+
}
185+
186+
shardBalanced = balancer.balance();
187+
} finally {
188+
if (logger.isDebugEnabled()) {
189+
logger.debug(
190+
"shards assigned: {}, shards moved: {}, shards balanced: {}, "
191+
+ "routingNodes hasInactiveShards [{}], relocation count [{}]",
192+
shardAssigned,
193+
shardMoved,
194+
shardBalanced,
195+
allocation.routingNodes().hasInactiveShards(),
196+
allocation.routingNodes().getRelocatingShardCount()
197+
);
198+
}
199+
assert assertShardAssignmentChanges(allocation, shardAssigned, shardMoved, shardBalanced);
200+
// Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy.
201+
collectAndRecordNodeWeightStats(balancer, balancingWeights, allocation);
202+
}
203+
}
204+
205+
private boolean assertShardAssignmentChanges(
206+
RoutingAllocation allocation,
207+
boolean shardAssigned,
208+
boolean shardMoved,
209+
boolean shardBalanced
210+
) {
211+
if (allocation.isSimulating() == false) {
212+
return true;
213+
}
214+
assert shardAssigned == false || allocation.routingNodes().hasInactiveShards()
215+
: "expect initializing shard, but got " + allocation.routingNodes();
216+
217+
assert (shardMoved == false && shardBalanced == false) || allocation.routingNodes().getRelocatingShardCount() > 0
218+
: "expect relocating shard, but got " + allocation.routingNodes();
219+
return true;
162220
}
163221

164222
private void collectAndRecordNodeWeightStats(Balancer balancer, BalancingWeights balancingWeights, RoutingAllocation allocation) {
@@ -188,7 +246,8 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f
188246
writeLoadForecaster,
189247
allocation,
190248
balancerSettings.getThreshold(),
191-
balancingWeightsFactory.create()
249+
balancingWeightsFactory.create(),
250+
balancerSettings.completeEarlyOnShardAssignmentChange()
192251
);
193252
AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN;
194253
MoveDecision moveDecision = MoveDecision.NOT_TAKEN;
@@ -248,12 +307,14 @@ public static class Balancer {
248307
private final Map<String, ModelNode> nodes;
249308
private final BalancingWeights balancingWeights;
250309
private final NodeSorters nodeSorters;
310+
private final boolean completeEarlyOnShardAssignmentChange;
251311

252312
private Balancer(
253313
WriteLoadForecaster writeLoadForecaster,
254314
RoutingAllocation allocation,
255315
float threshold,
256-
BalancingWeights balancingWeights
316+
BalancingWeights balancingWeights,
317+
boolean completeEarlyOnShardAssignmentChange
257318
) {
258319
this.writeLoadForecaster = writeLoadForecaster;
259320
this.allocation = allocation;
@@ -266,6 +327,7 @@ private Balancer(
266327
nodes = Collections.unmodifiableMap(buildModelFromAssigned());
267328
this.nodeSorters = balancingWeights.createNodeSorters(nodesArray(), this);
268329
this.balancingWeights = balancingWeights;
330+
this.completeEarlyOnShardAssignmentChange = completeEarlyOnShardAssignmentChange;
269331
}
270332

271333
private static long getShardDiskUsageInBytes(ShardRouting shardRouting, IndexMetadata indexMetadata, ClusterInfo clusterInfo) {
@@ -358,7 +420,7 @@ private IndexMetadata indexMetadata(ProjectIndex index) {
358420
* Balances the nodes on the cluster model according to the weight function.
359421
* The actual balancing is delegated to {@link #balanceByWeights(NodeSorter)}
360422
*/
361-
private void balance() {
423+
private boolean balance() {
362424
if (logger.isTraceEnabled()) {
363425
logger.trace("Start balancing cluster");
364426
}
@@ -371,21 +433,27 @@ private void balance() {
371433
* Therefore we only do a rebalance if we have fetched all information.
372434
*/
373435
logger.debug("skipping rebalance due to in-flight shard/store fetches");
374-
return;
436+
return false;
375437
}
376438
if (allocation.deciders().canRebalance(allocation).type() != Type.YES) {
377439
logger.trace("skipping rebalance as it is disabled");
378-
return;
440+
return false;
379441
}
380442

443+
boolean shardBalanced = false;
381444
// Balance each partition
382445
for (NodeSorter nodeSorter : nodeSorters) {
383446
if (nodeSorter.modelNodes.length < 2) { /* skip if we only have one node */
384447
logger.trace("skipping rebalance as the partition has single node only");
385448
continue;
386449
}
387-
balanceByWeights(nodeSorter);
450+
shardBalanced |= balanceByWeights(nodeSorter);
451+
// TODO: We could choose to account shardBalanced separately for each partition since they do not overlap.
452+
if (shardBalanced && completeEarlyOnShardAssignmentChange) {
453+
return true;
454+
}
388455
}
456+
return shardBalanced;
389457
}
390458

391459
/**
@@ -531,7 +599,8 @@ private MoveDecision decideRebalance(final ProjectIndex index, final ShardRoutin
531599
* only, or in other words relocations that move the weight delta closer
532600
* to {@code 0.0}
533601
*/
534-
private void balanceByWeights(NodeSorter sorter) {
602+
private boolean balanceByWeights(NodeSorter sorter) {
603+
boolean shardBalanced = false;
535604
final AllocationDeciders deciders = allocation.deciders();
536605
final ModelNode[] modelNodes = sorter.modelNodes;
537606
final float[] weights = sorter.weights;
@@ -630,6 +699,18 @@ private void balanceByWeights(NodeSorter sorter) {
630699
sorter.sort(0, relevantNodes);
631700
lowIdx = 0;
632701
highIdx = relevantNodes - 1;
702+
703+
if (routingNodes.getRelocatingShardCount() > 0) {
704+
// ES-12955: Check routingNodes.getRelocatingShardCount() > 0 in case the first relocation is a THROTTLE.
705+
// This should rarely happen since in most cases, we don't throttle unless there is an existing relocation.
706+
// But it can happen in production for frozen indices when the cache is still being prepared. It can also
707+
// happen in tests because we have decider like RandomAllocationDecider that can randomly return THROTTLE
708+
// when there is no existing relocation.
709+
shardBalanced = true;
710+
}
711+
if (completeEarlyOnShardAssignmentChange && shardBalanced) {
712+
return true;
713+
}
633714
continue;
634715
}
635716
}
@@ -651,6 +732,7 @@ private void balanceByWeights(NodeSorter sorter) {
651732
}
652733
}
653734
}
735+
return shardBalanced;
654736
}
655737

656738
/**
@@ -721,7 +803,8 @@ protected int comparePivot(int j) {
721803
* shard is created with an incremented version in the state
722804
* {@link ShardRoutingState#INITIALIZING}.
723805
*/
724-
public void moveShards() {
806+
public boolean moveShards() {
807+
boolean shardMoved = false;
725808
// Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
726809
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
727810
// offloading the shards.
@@ -745,10 +828,15 @@ public void moveShards() {
745828
if (logger.isTraceEnabled()) {
746829
logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode());
747830
}
831+
shardMoved = true;
832+
if (completeEarlyOnShardAssignmentChange) {
833+
return true;
834+
}
748835
} else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) {
749836
logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
750837
}
751838
}
839+
return shardMoved;
752840
}
753841

754842
/**
@@ -888,14 +976,14 @@ private Map<String, ModelNode> buildModelFromAssigned() {
888976
* Allocates all given shards on the minimal eligible node for the shards index
889977
* with respect to the weight function. All given shards must be unassigned.
890978
*/
891-
private void allocateUnassigned() {
979+
private boolean allocateUnassigned() {
892980
RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
893981
assert nodes.isEmpty() == false;
894982
if (logger.isTraceEnabled()) {
895983
logger.trace("Start allocating unassigned shards");
896984
}
897985
if (unassigned.isEmpty()) {
898-
return;
986+
return false;
899987
}
900988

901989
/*
@@ -932,6 +1020,7 @@ private void allocateUnassigned() {
9321020
int secondaryLength = 0;
9331021
int primaryLength = primary.length;
9341022
ArrayUtil.timSort(primary, comparator);
1023+
boolean shardAssignmentChanged = false;
9351024
do {
9361025
for (int i = 0; i < primaryLength; i++) {
9371026
ShardRouting shard = primary[i];
@@ -949,6 +1038,7 @@ private void allocateUnassigned() {
9491038

9501039
final long shardSize = getExpectedShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, allocation);
9511040
shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize, allocation.changes());
1041+
shardAssignmentChanged = true;
9521042
minNode.addShard(index, shard);
9531043
if (shard.primary() == false) {
9541044
// copy over the same replica shards to the secondary array so they will get allocated
@@ -972,6 +1062,9 @@ private void allocateUnassigned() {
9721062
assert allocationDecision.getAllocationStatus() == AllocationStatus.DECIDERS_THROTTLED;
9731063
final long shardSize = getExpectedShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, allocation);
9741064
minNode.addShard(projectIndex(shard), shard.initialize(minNode.getNodeId(), null, shardSize));
1065+
// If we see a throttle decision in simulation, there must be other shards that got assigned before it.
1066+
assert allocation.isSimulating() == false || shardAssignmentChanged
1067+
: "shard " + shard + " was throttled but no other shards were assigned";
9751068
} else {
9761069
if (logger.isTraceEnabled()) {
9771070
logger.trace("No Node found to assign shard [{}]", shard);
@@ -994,6 +1087,7 @@ private void allocateUnassigned() {
9941087
secondaryLength = 0;
9951088
} while (primaryLength > 0);
9961089
// clear everything we have either added it or moved to ignoreUnassigned
1090+
return shardAssignmentChanged;
9971091
}
9981092

9991093
private ProjectIndex projectIndex(ShardRouting shardRouting) {

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancerSettings.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.cluster.routing.allocation.allocator;
1111

12+
import org.elasticsearch.cluster.ClusterModule;
1213
import org.elasticsearch.common.settings.ClusterSettings;
1314
import org.elasticsearch.common.settings.Settings;
1415

@@ -26,6 +27,7 @@ public class BalancerSettings {
2627
private volatile float writeLoadBalanceFactor;
2728
private volatile float diskUsageBalanceFactor;
2829
private volatile float threshold;
30+
private final boolean completeEarlyOnShardAssignmentChange;
2931

3032
public BalancerSettings(Settings settings) {
3133
this(ClusterSettings.createBuiltInClusterSettings(settings));
@@ -37,6 +39,9 @@ public BalancerSettings(ClusterSettings clusterSettings) {
3739
clusterSettings.initializeAndWatch(WRITE_LOAD_BALANCE_FACTOR_SETTING, value -> this.writeLoadBalanceFactor = value);
3840
clusterSettings.initializeAndWatch(DISK_USAGE_BALANCE_FACTOR_SETTING, value -> this.diskUsageBalanceFactor = value);
3941
clusterSettings.initializeAndWatch(THRESHOLD_SETTING, value -> this.threshold = value);
42+
this.completeEarlyOnShardAssignmentChange = ClusterModule.DESIRED_BALANCE_ALLOCATOR.equals(
43+
clusterSettings.get(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING)
44+
);
4045
}
4146

4247
/**
@@ -67,4 +72,8 @@ public float getDiskUsageBalanceFactor() {
6772
public float getThreshold() {
6873
return threshold;
6974
}
75+
76+
public boolean completeEarlyOnShardAssignmentChange() {
77+
return completeEarlyOnShardAssignmentChange;
78+
}
7079
}

0 commit comments

Comments
 (0)