Skip to content

Commit c2ce4f8

Browse files
authored
Use applied state after DiskThresholdMonitor reroute (#94916)
Today when the `DiskThresholdMonitor` triggers a reroute, it supplies a listener which accepts the exact cluster state that the reroute produced. However it may be that there have been some other cluster state updates in between the reroute computation and the completion of this listener, so the cluster state it receives is potentially stale. Moreover by accepting the exact cluster state that the reroute produced, we require the `BatchedRerouteService` to retain every resulting state for this and all other batched reroute listeners. There's no need for this, we can instead retrieve the last-applied cluster state afresh, and avoid this unnecessary retention of cluster states. Relates #94914
1 parent cf2ae18 commit c2ce4f8

File tree

16 files changed

+40
-39
lines changed

16 files changed

+40
-39
lines changed

docs/changelog/94916.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 94916
2+
summary: Use applied state after `DiskThresholdMonitor` reroute
3+
area: Allocation
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ public void onFailure(Exception e) {
161161
}
162162

163163
private static class UpdateDesiredNodesExecutor implements ClusterStateTaskExecutor<UpdateDesiredNodesTask> {
164-
private static final ActionListener<ClusterState> REROUTE_LISTENER = ActionListener.wrap(
164+
private static final ActionListener<Void> REROUTE_LISTENER = ActionListener.wrap(
165165
r -> logger.trace("reroute after desired nodes update completed"),
166166
e -> logger.debug("reroute after desired nodes update failed", e)
167167
);

server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ private void reroute(final boolean updateSettingsAcked) {
202202
// completed.
203203
clusterService.getRerouteService().reroute(REROUTE_TASK_SOURCE, Priority.URGENT, new ActionListener<>() {
204204
@Override
205-
public void onResponse(ClusterState clusterState) {
205+
public void onResponse(Void ignored) {
206206
listener.onResponse(
207207
new ClusterUpdateSettingsResponse(
208208
updateSettingsAcked,

server/src/main/java/org/elasticsearch/cluster/routing/BatchedRerouteService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class BatchedRerouteService implements RerouteService {
4343

4444
private final Object mutex = new Object();
4545
@Nullable // null if no reroute is currently pending
46-
private List<ActionListener<ClusterState>> pendingRerouteListeners;
46+
private List<ActionListener<Void>> pendingRerouteListeners;
4747
private Priority pendingTaskPriority = Priority.LANGUID;
4848

4949
public interface RerouteAction {
@@ -62,12 +62,12 @@ public BatchedRerouteService(ClusterService clusterService, RerouteAction rerout
6262
* Initiates a reroute.
6363
*/
6464
@Override
65-
public final void reroute(String reason, Priority priority, ActionListener<ClusterState> listener) {
66-
final ActionListener<ClusterState> wrappedListener = ContextPreservingActionListener.wrapPreservingContext(
65+
public final void reroute(String reason, Priority priority, ActionListener<Void> listener) {
66+
final ActionListener<Void> wrappedListener = ContextPreservingActionListener.wrapPreservingContext(
6767
listener,
6868
clusterService.getClusterApplierService().threadPool().getThreadContext()
6969
);
70-
final List<ActionListener<ClusterState>> currentListeners;
70+
final List<ActionListener<Void>> currentListeners;
7171
synchronized (mutex) {
7272
if (pendingRerouteListeners != null) {
7373
if (priority.sameOrAfter(pendingTaskPriority)) {
@@ -152,7 +152,7 @@ public void onFailure(Exception e) {
152152

153153
@Override
154154
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
155-
future.addListener(ActionListener.running(() -> ActionListener.onResponse(currentListeners, newState)));
155+
future.addListener(ActionListener.running(() -> ActionListener.onResponse(currentListeners, null)));
156156
}
157157
});
158158
} catch (Exception e) {

server/src/main/java/org/elasticsearch/cluster/routing/RerouteService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
package org.elasticsearch.cluster.routing;
99

1010
import org.elasticsearch.action.ActionListener;
11-
import org.elasticsearch.cluster.ClusterState;
1211
import org.elasticsearch.common.Priority;
1312

1413
/**
@@ -23,5 +22,5 @@ public interface RerouteService {
2322
* this reroute is batched with the pending one; if there is already a pending reroute at a lower priority then
2423
* the priority of the pending batch is raised to the given priority.
2524
*/
26-
void reroute(String reason, Priority priority, ActionListener<ClusterState> listener);
25+
void reroute(String reason, Priority priority, ActionListener<Void> listener);
2726
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -317,8 +317,8 @@ public void onNewInfo(ClusterInfo info) {
317317
rerouteService.reroute(
318318
"disk threshold monitor",
319319
Priority.HIGH,
320-
ActionListener.releaseAfter(ActionListener.runAfter(ActionListener.wrap(reroutedClusterState -> {
321-
320+
ActionListener.releaseAfter(ActionListener.runAfter(ActionListener.wrap(ignored -> {
321+
final var reroutedClusterState = clusterStateSupplier.get();
322322
for (DiskUsage diskUsage : usagesOverHighThreshold) {
323323
final RoutingNode routingNode = reroutedClusterState.getRoutingNodes().node(diskUsage.getNodeId());
324324
final DiskUsage usageIncludingRelocations;

server/src/main/java/org/elasticsearch/snapshots/InternalSnapshotsInfoService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public class InternalSnapshotsInfoService implements ClusterStateListener, Snaps
5353

5454
private static final Logger logger = LogManager.getLogger(InternalSnapshotsInfoService.class);
5555

56-
private static final ActionListener<ClusterState> REROUTE_LISTENER = ActionListener.wrap(
56+
private static final ActionListener<Void> REROUTE_LISTENER = ActionListener.wrap(
5757
r -> logger.trace("reroute after snapshot shard size update completed"),
5858
e -> logger.debug("reroute after snapshot shard size update failed", e)
5959
);

server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestC
4444
private ShardStateAction.ShardStartedClusterStateTaskExecutor executor;
4545

4646
@SuppressWarnings("unused")
47-
private static void neverReroutes(String reason, Priority priority, ActionListener<ClusterState> listener) {
47+
private static void neverReroutes(String reason, Priority priority, ActionListener<Void> listener) {
4848
fail("unexpectedly ran a deferred reroute");
4949
}
5050

server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ private void doTestDoesNotSubmitRerouteTaskTooFrequently(boolean testMaxHeadroom
266266
.nodes(DiscoveryNodes.builder().add(newNormalNode("node1")).add(newNormalNode("node2")))
267267
.build();
268268
AtomicLong currentTime = new AtomicLong();
269-
AtomicReference<ActionListener<ClusterState>> listenerReference = new AtomicReference<>();
269+
AtomicReference<ActionListener<Void>> listenerReference = new AtomicReference<>();
270270
DiskThresholdMonitor monitor = new DiskThresholdMonitor(
271271
Settings.EMPTY,
272272
() -> clusterState,
@@ -316,7 +316,7 @@ protected void updateIndicesReadOnly(Set<String> indicesToMarkReadOnly, Releasab
316316
currentTime.addAndGet(randomLongBetween(0, 120000));
317317
monitor.onNewInfo(clusterInfo(allDisksOk));
318318
assertNotNull(listenerReference.get());
319-
listenerReference.getAndSet(null).onResponse(clusterState);
319+
listenerReference.getAndSet(null).onResponse(null);
320320

321321
// should not reroute when all disks are ok and no new info received
322322
currentTime.addAndGet(randomLongBetween(0, 120000));
@@ -327,7 +327,7 @@ protected void updateIndicesReadOnly(Set<String> indicesToMarkReadOnly, Releasab
327327
if (randomBoolean()) {
328328
currentTime.addAndGet(randomLongBetween(0, 120000));
329329
monitor.onNewInfo(clusterInfo(oneDiskAboveWatermark));
330-
Optional.ofNullable(listenerReference.getAndSet(null)).ifPresent(l -> l.onResponse(clusterState));
330+
Optional.ofNullable(listenerReference.getAndSet(null)).ifPresent(l -> l.onResponse(null));
331331
}
332332

333333
// however once the reroute interval has elapsed then we must reroute again
@@ -339,7 +339,7 @@ protected void updateIndicesReadOnly(Set<String> indicesToMarkReadOnly, Releasab
339339
);
340340
monitor.onNewInfo(clusterInfo(oneDiskAboveWatermark));
341341
assertNotNull(listenerReference.get());
342-
listenerReference.getAndSet(null).onResponse(clusterState);
342+
listenerReference.getAndSet(null).onResponse(null);
343343

344344
if (randomBoolean()) {
345345
// should not re-route again within the reroute interval
@@ -362,15 +362,15 @@ protected void updateIndicesReadOnly(Set<String> indicesToMarkReadOnly, Releasab
362362
);
363363
monitor.onNewInfo(clusterInfo(oneDiskAboveWatermark));
364364
assertNotNull(listenerReference.get());
365-
final ActionListener<ClusterState> rerouteListener1 = listenerReference.getAndSet(null);
365+
final ActionListener<Void> rerouteListener1 = listenerReference.getAndSet(null);
366366

367367
// should not re-route again before reroute has completed
368368
currentTime.addAndGet(randomLongBetween(0, 120000));
369369
monitor.onNewInfo(clusterInfo(allDisksOk));
370370
assertNull(listenerReference.get());
371371

372372
// complete reroute
373-
rerouteListener1.onResponse(clusterState);
373+
rerouteListener1.onResponse(null);
374374

375375
if (randomBoolean()) {
376376
// should not re-route again within the reroute interval
@@ -475,7 +475,7 @@ private void doTestAutoReleaseIndices(boolean testMaxHeadroom) {
475475
(reason, priority, listener) -> {
476476
assertNotNull(listener);
477477
assertThat(priority, equalTo(Priority.HIGH));
478-
listener.onResponse(clusterState);
478+
listener.onResponse(null);
479479
}
480480
) {
481481
@Override
@@ -567,7 +567,7 @@ protected void updateIndicesReadOnly(Set<String> indicesToUpdate, Releasable onC
567567
(reason, priority, listener) -> {
568568
assertNotNull(listener);
569569
assertThat(priority, equalTo(Priority.HIGH));
570-
listener.onResponse(clusterStateWithBlocks);
570+
listener.onResponse(null);
571571
}
572572
) {
573573
@Override
@@ -813,7 +813,7 @@ private void doTestNoAutoReleaseOfIndicesOnReplacementNodes(boolean testMaxHeadr
813813
(reason, priority, listener) -> {
814814
assertNotNull(listener);
815815
assertThat(priority, equalTo(Priority.HIGH));
816-
listener.onResponse(currentClusterState.get());
816+
listener.onResponse(null);
817817
}
818818
) {
819819
@Override
@@ -1058,7 +1058,7 @@ public long getAsLong() {
10581058
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
10591059
null,
10601060
timeSupplier,
1061-
(reason, priority, listener) -> listener.onResponse(clusterStateRef.get())
1061+
(reason, priority, listener) -> listener.onResponse(null)
10621062
) {
10631063
@Override
10641064
protected void updateIndicesReadOnly(Set<String> indicesToMarkReadOnly, Releasable onCompletion, boolean readOnly) {

server/src/test/java/org/elasticsearch/snapshots/InternalSnapshotsInfoServiceTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public void setUp() throws Exception {
9191
threadPool = new TestThreadPool(getTestName());
9292
clusterService = ClusterServiceUtils.createClusterService(threadPool);
9393
repositoriesService = mock(RepositoriesService.class);
94-
rerouteService = (reason, priority, listener) -> listener.onResponse(clusterService.state());
94+
rerouteService = (reason, priority, listener) -> listener.onResponse(null);
9595
}
9696

9797
@After
@@ -109,7 +109,7 @@ public void testSnapshotShardSizes() throws Exception {
109109
final int numberOfShards = randomIntBetween(1, 50);
110110
final CountDownLatch rerouteLatch = new CountDownLatch(numberOfShards);
111111
final RerouteService rerouteService = (reason, priority, listener) -> {
112-
listener.onResponse(clusterService.state());
112+
listener.onResponse(null);
113113
assertThat(rerouteLatch.getCount(), greaterThanOrEqualTo(0L));
114114
rerouteLatch.countDown();
115115
};
@@ -181,7 +181,7 @@ public void testErroneousSnapshotShardSizes() throws Exception {
181181
final CountDown reroutes = new CountDown(maxShardsToCreate);
182182
final RerouteService rerouteService = (reason, priority, listener) -> {
183183
try {
184-
listener.onResponse(clusterService.state());
184+
listener.onResponse(null);
185185
} finally {
186186
if (reroutes.countDown()) {
187187
waitForAllReroutesProcessed.onResponse(null);

0 commit comments

Comments
 (0)