Skip to content

Commit 69df7fb

Browse files
Long balance computation should not delay new index primary assignment (#115511) (#116316)
A long desired balance computation could delay a newly created index shard from being assigned since first the computation has to finish for the assignments to be published and the shards getting assigned. With this change we add a new setting which allows setting a maximum time for a computation in case there are unassigned primary shards. Note that this is similar to how a new cluster state causes early publishing of the desired balance. Closes ES-9616 Co-authored-by: Elastic Machine <[email protected]>
1 parent b7951c5 commit 69df7fb

File tree

8 files changed

+346
-27
lines changed

8 files changed

+346
-27
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ContinuousComputation.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,16 @@ public void onNewInput(T input) {
4949
}
5050
}
5151

52+
/**
53+
* enqueues {@code input} if {@code expectedLatestKnownInput} is the latest known input.
54+
* Neither of the parameters can be null.
55+
*/
56+
protected boolean compareAndEnqueue(T expectedLatestKnownInput, T input) {
57+
assert expectedLatestKnownInput != null;
58+
assert input != null;
59+
return enqueuedInput.compareAndSet(Objects.requireNonNull(expectedLatestKnownInput), Objects.requireNonNull(input));
60+
}
61+
5262
/**
5363
* @return {@code false} iff there are no active/enqueued computations
5464
*/
@@ -67,7 +77,7 @@ protected boolean isFresh(T input) {
6777
/**
6878
* Process the given input.
6979
*
70-
* @param input the value that was last received by {@link #onNewInput} before invocation.
80+
* @param input the value that was last received by {@link #onNewInput} or {@link #compareAndEnqueue} before invocation.
7181
*/
7282
protected abstract void processInput(T input);
7383

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,17 @@
2020
*
2121
* @param assignments a set of the (persistent) node IDs to which each {@link ShardId} should be allocated
2222
*/
23-
public record DesiredBalance(long lastConvergedIndex, Map<ShardId, ShardAssignment> assignments) {
23+
public record DesiredBalance(long lastConvergedIndex, Map<ShardId, ShardAssignment> assignments, ComputationFinishReason finishReason) {
24+
25+
enum ComputationFinishReason {
26+
CONVERGED,
27+
YIELD_TO_NEW_INPUT,
28+
STOP_EARLY
29+
}
30+
31+
public DesiredBalance(long lastConvergedIndex, Map<ShardId, ShardAssignment> assignments) {
32+
this(lastConvergedIndex, assignments, ComputationFinishReason.CONVERGED);
33+
}
2434

2535
public static final DesiredBalance INITIAL = new DesiredBalance(-1, Map.of());
2636

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java

Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.Set;
3939
import java.util.TreeMap;
4040
import java.util.TreeSet;
41+
import java.util.function.LongSupplier;
4142
import java.util.function.Predicate;
4243

4344
import static java.util.stream.Collectors.toUnmodifiableSet;
@@ -49,8 +50,8 @@ public class DesiredBalanceComputer {
4950

5051
private static final Logger logger = LogManager.getLogger(DesiredBalanceComputer.class);
5152

52-
private final ThreadPool threadPool;
5353
private final ShardsAllocator delegateAllocator;
54+
private final LongSupplier timeSupplierMillis;
5455

5556
// stats
5657
protected final MeanMetric iterations = new MeanMetric();
@@ -63,12 +64,28 @@ public class DesiredBalanceComputer {
6364
Setting.Property.NodeScope
6465
);
6566

67+
public static final Setting<TimeValue> MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING = Setting.timeSetting(
68+
"cluster.routing.allocation.desired_balance.max_balance_computation_time_during_index_creation",
69+
TimeValue.timeValueSeconds(1),
70+
Setting.Property.Dynamic,
71+
Setting.Property.NodeScope
72+
);
73+
6674
private TimeValue progressLogInterval;
75+
private long maxBalanceComputationTimeDuringIndexCreationMillis;
6776

6877
public DesiredBalanceComputer(ClusterSettings clusterSettings, ThreadPool threadPool, ShardsAllocator delegateAllocator) {
69-
this.threadPool = threadPool;
78+
this(clusterSettings, delegateAllocator, threadPool::relativeTimeInMillis);
79+
}
80+
81+
DesiredBalanceComputer(ClusterSettings clusterSettings, ShardsAllocator delegateAllocator, LongSupplier timeSupplierMillis) {
7082
this.delegateAllocator = delegateAllocator;
83+
this.timeSupplierMillis = timeSupplierMillis;
7184
clusterSettings.initializeAndWatch(PROGRESS_LOG_INTERVAL_SETTING, value -> this.progressLogInterval = value);
85+
clusterSettings.initializeAndWatch(
86+
MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING,
87+
value -> this.maxBalanceComputationTimeDuringIndexCreationMillis = value.millis()
88+
);
7289
}
7390

7491
public DesiredBalance compute(
@@ -77,7 +94,6 @@ public DesiredBalance compute(
7794
Queue<List<MoveAllocationCommand>> pendingDesiredBalanceMoves,
7895
Predicate<DesiredBalanceInput> isFresh
7996
) {
80-
8197
if (logger.isTraceEnabled()) {
8298
logger.trace(
8399
"Recomputing desired balance for [{}]: {}, {}, {}, {}",
@@ -97,9 +113,10 @@ public DesiredBalance compute(
97113
final var changes = routingAllocation.changes();
98114
final var ignoredShards = getIgnoredShardsWithDiscardedAllocationStatus(desiredBalanceInput.ignoredShards());
99115
final var clusterInfoSimulator = new ClusterInfoSimulator(routingAllocation);
116+
DesiredBalance.ComputationFinishReason finishReason = DesiredBalance.ComputationFinishReason.CONVERGED;
100117

101118
if (routingNodes.size() == 0) {
102-
return new DesiredBalance(desiredBalanceInput.index(), Map.of());
119+
return new DesiredBalance(desiredBalanceInput.index(), Map.of(), finishReason);
103120
}
104121

105122
// we assume that all ongoing recoveries will complete
@@ -263,11 +280,12 @@ public DesiredBalance compute(
263280

264281
final int iterationCountReportInterval = computeIterationCountReportInterval(routingAllocation);
265282
final long timeWarningInterval = progressLogInterval.millis();
266-
final long computationStartedTime = threadPool.relativeTimeInMillis();
283+
final long computationStartedTime = timeSupplierMillis.getAsLong();
267284
long nextReportTime = computationStartedTime + timeWarningInterval;
268285

269286
int i = 0;
270287
boolean hasChanges = false;
288+
boolean assignedNewlyCreatedPrimaryShards = false;
271289
while (true) {
272290
if (hasChanges) {
273291
// Not the first iteration, so every remaining unassigned shard has been ignored, perhaps due to throttling. We must bring
@@ -293,6 +311,15 @@ public DesiredBalance compute(
293311
for (final var shardRouting : routingNode) {
294312
if (shardRouting.initializing()) {
295313
hasChanges = true;
314+
if (shardRouting.primary()
315+
&& shardRouting.unassignedInfo() != null
316+
&& shardRouting.unassignedInfo().reason() == UnassignedInfo.Reason.INDEX_CREATED) {
317+
// TODO: we could include more cases that would cause early publishing of desired balance in case of a long
318+
// computation. e.g.:
319+
// - unassigned search replicas in case the shard has no assigned shard replicas
320+
// - other reasons for an unassigned shard such as NEW_INDEX_RESTORED
321+
assignedNewlyCreatedPrimaryShards = true;
322+
}
296323
clusterInfoSimulator.simulateShardStarted(shardRouting);
297324
routingNodes.startShard(shardRouting, changes, 0L);
298325
}
@@ -301,14 +328,14 @@ public DesiredBalance compute(
301328

302329
i++;
303330
final int iterations = i;
304-
final long currentTime = threadPool.relativeTimeInMillis();
331+
final long currentTime = timeSupplierMillis.getAsLong();
305332
final boolean reportByTime = nextReportTime <= currentTime;
306333
final boolean reportByIterationCount = i % iterationCountReportInterval == 0;
307334
if (reportByTime || reportByIterationCount) {
308335
nextReportTime = currentTime + timeWarningInterval;
309336
}
310337

311-
if (hasChanges == false) {
338+
if (hasComputationConverged(hasChanges, i)) {
312339
logger.debug(
313340
"Desired balance computation for [{}] converged after [{}] and [{}] iterations",
314341
desiredBalanceInput.index(),
@@ -324,9 +351,25 @@ public DesiredBalance compute(
324351
"Desired balance computation for [{}] interrupted after [{}] and [{}] iterations as newer cluster state received. "
325352
+ "Publishing intermediate desired balance and restarting computation",
326353
desiredBalanceInput.index(),
354+
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(),
355+
i
356+
);
357+
finishReason = DesiredBalance.ComputationFinishReason.YIELD_TO_NEW_INPUT;
358+
break;
359+
}
360+
361+
if (assignedNewlyCreatedPrimaryShards
362+
&& currentTime - computationStartedTime >= maxBalanceComputationTimeDuringIndexCreationMillis) {
363+
logger.info(
364+
"Desired balance computation for [{}] interrupted after [{}] and [{}] iterations "
365+
+ "in order to not delay assignment of newly created index shards for more than [{}]. "
366+
+ "Publishing intermediate desired balance and restarting computation",
367+
desiredBalanceInput.index(),
368+
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(),
327369
i,
328-
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString()
370+
TimeValue.timeValueMillis(maxBalanceComputationTimeDuringIndexCreationMillis).toString()
329371
);
372+
finishReason = DesiredBalance.ComputationFinishReason.STOP_EARLY;
330373
break;
331374
}
332375

@@ -368,7 +411,12 @@ public DesiredBalance compute(
368411
}
369412

370413
long lastConvergedIndex = hasChanges ? previousDesiredBalance.lastConvergedIndex() : desiredBalanceInput.index();
371-
return new DesiredBalance(lastConvergedIndex, assignments);
414+
return new DesiredBalance(lastConvergedIndex, assignments, finishReason);
415+
}
416+
417+
// visible for testing
418+
boolean hasComputationConverged(boolean hasRoutingChanges, int currentIteration) {
419+
return hasRoutingChanges == false;
372420
}
373421

374422
private static Map<ShardId, ShardAssignment> collectShardAssignments(RoutingNodes routingNodes) {

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,16 @@ protected void processInput(DesiredBalanceInput desiredBalanceInput) {
134134
)
135135
);
136136
computationsExecuted.inc();
137-
if (isFresh(desiredBalanceInput)) {
137+
138+
if (currentDesiredBalance.finishReason() == DesiredBalance.ComputationFinishReason.STOP_EARLY) {
139+
logger.debug(
140+
"Desired balance computation for [{}] terminated early with partial result, scheduling reconciliation",
141+
index
142+
);
143+
submitReconcileTask(currentDesiredBalance);
144+
var newInput = DesiredBalanceInput.create(indexGenerator.incrementAndGet(), desiredBalanceInput.routingAllocation());
145+
desiredBalanceComputation.compareAndEnqueue(desiredBalanceInput, newInput);
146+
} else if (isFresh(desiredBalanceInput)) {
138147
logger.debug("Desired balance computation for [{}] is completed, scheduling reconciliation", index);
139148
computationsConverged.inc();
140149
submitReconcileTask(currentDesiredBalance);

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ public void apply(Settings value, Settings current, Settings previous) {
219219
DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MAX_WRITE_THREADS,
220220
DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MIN_WRITE_THREADS,
221221
DesiredBalanceComputer.PROGRESS_LOG_INTERVAL_SETTING,
222+
DesiredBalanceComputer.MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING,
222223
DesiredBalanceReconciler.UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING,
223224
DesiredBalanceReconciler.UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING,
224225
BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING,

server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ContinuousComputationTests.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.concurrent.CyclicBarrier;
2222
import java.util.concurrent.Semaphore;
2323
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicBoolean;
2425
import java.util.concurrent.atomic.AtomicInteger;
2526
import java.util.concurrent.atomic.AtomicReference;
2627

@@ -73,6 +74,68 @@ protected void processInput(Integer input) {
7374
assertTrue(Arrays.toString(valuePerThread) + " vs " + result.get(), Arrays.stream(valuePerThread).anyMatch(i -> i == result.get()));
7475
}
7576

77+
public void testCompareAndEnqueue() throws Exception {
78+
final var initialInput = new Object();
79+
final var compareAndEnqueueCount = between(1, 10);
80+
final var remaining = new AtomicInteger(compareAndEnqueueCount);
81+
final var computationsExecuted = new AtomicInteger();
82+
final var result = new AtomicReference<>();
83+
final var computation = new ContinuousComputation<>(threadPool.generic()) {
84+
@Override
85+
protected void processInput(Object input) {
86+
result.set(input);
87+
if (remaining.decrementAndGet() >= 0) {
88+
compareAndEnqueue(input, new Object());
89+
}
90+
computationsExecuted.incrementAndGet();
91+
}
92+
};
93+
computation.onNewInput(initialInput);
94+
assertBusy(() -> assertFalse(computation.isActive()));
95+
assertNotEquals(result.get(), initialInput);
96+
assertEquals(computationsExecuted.get(), 1 + compareAndEnqueueCount);
97+
}
98+
99+
public void testCompareAndEnqueueSkipped() throws Exception {
100+
final var barrier = new CyclicBarrier(2);
101+
final var computationsExecuted = new AtomicInteger();
102+
final var initialInput = new Object();
103+
final var conditionalInput = new Object();
104+
final var newInput = new Object();
105+
final var submitConditional = new AtomicBoolean(true);
106+
final var result = new AtomicReference<>();
107+
108+
final var computation = new ContinuousComputation<>(threadPool.generic()) {
109+
@Override
110+
protected void processInput(Object input) {
111+
assertNotEquals(input, conditionalInput);
112+
safeAwait(barrier); // start
113+
safeAwait(barrier); // continue
114+
if (submitConditional.getAndSet(false)) {
115+
compareAndEnqueue(input, conditionalInput);
116+
}
117+
result.set(input);
118+
safeAwait(barrier); // finished
119+
computationsExecuted.incrementAndGet();
120+
}
121+
};
122+
computation.onNewInput(initialInput);
123+
124+
safeAwait(barrier); // start
125+
computation.onNewInput(newInput);
126+
safeAwait(barrier); // continue
127+
safeAwait(barrier); // finished
128+
assertEquals(result.get(), initialInput);
129+
130+
safeAwait(barrier); // start
131+
safeAwait(barrier); // continue
132+
safeAwait(barrier); // finished
133+
134+
assertBusy(() -> assertFalse(computation.isActive()));
135+
assertEquals(result.get(), newInput);
136+
assertEquals(computationsExecuted.get(), 2);
137+
}
138+
76139
public void testSkipsObsoleteValues() throws Exception {
77140
final var barrier = new CyclicBarrier(2);
78141
final Runnable await = () -> safeAwait(barrier);

server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1210,7 +1210,12 @@ private void checkIterationLogging(int iterations, long eachIterationDuration, M
12101210
var currentTime = new AtomicLong(0L);
12111211
when(mockThreadPool.relativeTimeInMillis()).thenAnswer(invocation -> currentTime.addAndGet(eachIterationDuration));
12121212

1213-
var desiredBalanceComputer = new DesiredBalanceComputer(createBuiltInClusterSettings(), mockThreadPool, new ShardsAllocator() {
1213+
// Some runs of this test try to simulate a long desired balance computation. Setting a high value on the following setting
1214+
// prevents interrupting a long computation.
1215+
var clusterSettings = createBuiltInClusterSettings(
1216+
Settings.builder().put(DesiredBalanceComputer.MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING.getKey(), "2m").build()
1217+
);
1218+
var desiredBalanceComputer = new DesiredBalanceComputer(clusterSettings, mockThreadPool, new ShardsAllocator() {
12141219
@Override
12151220
public void allocate(RoutingAllocation allocation) {
12161221
final var unassignedIterator = allocation.routingNodes().unassigned().iterator();

0 commit comments

Comments
 (0)