Skip to content

Commit ce198c2

Browse files
nielsbaumanSamiul-TheSoccerFan
authored andcommitted
React more prompty to task cancellation while waiting for the cluster to unblock (elastic#128737)
Instead of waiting for the next run of the `ClusterStateObserver` (which might be arbitrarily far in the future, but bound by the timeout if one is set), we notify the listener immediately that the task has been cancelled. While doing so, we ensure we invoke the listener only once. Fixes elastic#117971
1 parent 5fc4573 commit ce198c2

File tree

6 files changed

+122
-7
lines changed

6 files changed

+122
-7
lines changed

docs/changelog/128737.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
pr: 128737
2+
summary: React more prompty to task cancellation while waiting for the cluster to
3+
unblock
4+
area: Task Management
5+
type: enhancement
6+
issues:
7+
- 117971

libs/core/src/main/java/org/elasticsearch/core/Predicates.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
package org.elasticsearch.core;
1111

12+
import java.util.concurrent.atomic.AtomicBoolean;
13+
import java.util.function.BooleanSupplier;
1214
import java.util.function.Predicate;
1315

1416
/**
@@ -90,4 +92,22 @@ public static <T> Predicate<T> always() {
9092
public static <T> Predicate<T> never() {
9193
return (Predicate<T>) NEVER;
9294
}
95+
96+
private static class OnceTrue extends AtomicBoolean implements BooleanSupplier {
97+
OnceTrue() {
98+
super(true);
99+
}
100+
101+
@Override
102+
public boolean getAsBoolean() {
103+
return getAndSet(false);
104+
}
105+
}
106+
107+
/**
108+
* @return a {@link BooleanSupplier} which supplies {@code true} the first time it is called, and {@code false} subsequently.
109+
*/
110+
public static BooleanSupplier once() {
111+
return new OnceTrue();
112+
}
93113
}

server/src/main/java/org/elasticsearch/action/support/local/TransportLocalClusterStateAction.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@
2323
import org.elasticsearch.cluster.service.ClusterService;
2424
import org.elasticsearch.common.util.concurrent.EsExecutors;
2525
import org.elasticsearch.core.FixForMultiProject;
26+
import org.elasticsearch.core.Predicates;
2627
import org.elasticsearch.core.TimeValue;
2728
import org.elasticsearch.node.NodeClosedException;
2829
import org.elasticsearch.tasks.CancellableTask;
2930
import org.elasticsearch.tasks.Task;
31+
import org.elasticsearch.tasks.TaskCancelledException;
3032
import org.elasticsearch.tasks.TaskManager;
3133

3234
import java.util.concurrent.Executor;
@@ -104,20 +106,41 @@ private void waitForClusterUnblock(
104106
logger,
105107
clusterService.threadPool().getThreadContext()
106108
);
109+
// We track whether we already notified the listener or started executing the action, to avoid invoking the listener twice.
110+
// Because of that second part, we can not use ActionListener#notifyOnce.
111+
final var waitComplete = Predicates.once();
112+
if (task instanceof CancellableTask cancellableTask) {
113+
cancellableTask.addListener(() -> {
114+
if (waitComplete.getAsBoolean() == false) {
115+
return;
116+
}
117+
listener.onFailure(new TaskCancelledException("Task was cancelled"));
118+
logger.trace("task [{}] was cancelled, notifying listener", task.getId());
119+
});
120+
}
107121
observer.waitForNextChange(new ClusterStateObserver.Listener() {
108122
@Override
109123
public void onNewClusterState(ClusterState state) {
124+
if (waitComplete.getAsBoolean() == false) {
125+
return;
126+
}
110127
logger.trace("retrying with cluster state version [{}]", state.version());
111128
innerDoExecute(task, request, listener, state);
112129
}
113130

114131
@Override
115132
public void onClusterServiceClose() {
133+
if (waitComplete.getAsBoolean() == false) {
134+
return;
135+
}
116136
listener.onFailure(new NodeClosedException(clusterService.localNode()));
117137
}
118138

119139
@Override
120140
public void onTimeout(TimeValue timeout) {
141+
if (waitComplete.getAsBoolean() == false) {
142+
return;
143+
}
121144
logger.debug(
122145
() -> format("timed out while waiting for cluster to unblock in [%s] (timeout [%s])", actionName, timeout),
123146
exception

server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.common.io.stream.Writeable;
3131
import org.elasticsearch.common.util.concurrent.EsExecutors;
3232
import org.elasticsearch.core.FixForMultiProject;
33+
import org.elasticsearch.core.Predicates;
3334
import org.elasticsearch.core.TimeValue;
3435
import org.elasticsearch.discovery.MasterNotDiscoveredException;
3536
import org.elasticsearch.gateway.GatewayService;
@@ -304,20 +305,41 @@ private void retry(long currentStateVersion, final Throwable failure, final Pred
304305
threadPool.getThreadContext()
305306
);
306307
}
308+
// We track whether we already notified the listener or started executing the action, to avoid invoking the listener twice.
309+
// Because of that second part, we can not use ActionListener#notifyOnce.
310+
final var waitComplete = Predicates.once();
311+
if (task instanceof CancellableTask cancellableTask) {
312+
cancellableTask.addListener(() -> {
313+
if (waitComplete.getAsBoolean() == false) {
314+
return;
315+
}
316+
listener.onFailure(new TaskCancelledException("Task was cancelled"));
317+
logger.trace("task [{}] was cancelled, notifying listener", task.getId());
318+
});
319+
}
307320
observer.waitForNextChange(new ClusterStateObserver.Listener() {
308321
@Override
309322
public void onNewClusterState(ClusterState state) {
323+
if (waitComplete.getAsBoolean() == false) {
324+
return;
325+
}
310326
logger.trace("retrying with cluster state version [{}]", state.version());
311327
doStart(state);
312328
}
313329

314330
@Override
315331
public void onClusterServiceClose() {
332+
if (waitComplete.getAsBoolean() == false) {
333+
return;
334+
}
316335
listener.onFailure(new NodeClosedException(clusterService.localNode()));
317336
}
318337

319338
@Override
320339
public void onTimeout(TimeValue timeout) {
340+
if (waitComplete.getAsBoolean() == false) {
341+
return;
342+
}
321343
logger.debug(() -> format("timed out while retrying [%s] after failure (timeout [%s])", actionName, timeout), failure);
322344
listener.onFailure(new MasterNotDiscoveredException(failure));
323345
}

server/src/test/java/org/elasticsearch/action/support/local/TransportLocalClusterStateActionTests.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,23 @@ public void testNonRetryableBlock() {
117117
assertTrue(exception.getCause() instanceof ClusterBlockException);
118118
}
119119

120-
public void testTaskCancelledAfterBlock() {
120+
public void testTaskCancelledImmediately() {
121+
var request = new Request();
122+
ClusterBlock block = new ClusterBlock(randomInt(), "", true, true, false, randomFrom(RestStatus.values()), ClusterBlockLevel.ALL);
123+
var state = ClusterState.builder(clusterService.state()).blocks(ClusterBlocks.builder().addGlobalBlock(block)).build();
124+
setState(clusterService, state);
125+
126+
CancellableTask task = new CancellableTask(randomLong(), "test", Action.ACTION_NAME, "", TaskId.EMPTY_TASK_ID, Map.of());
127+
PlainActionFuture<Response> listener = new PlainActionFuture<>();
128+
ActionTestUtils.execute(new Action(taskManager, clusterService), task, request, listener);
129+
130+
TaskCancelHelper.cancel(task, "test");
131+
assertTrue(listener.isDone());
132+
var exception = assertThrows(ExecutionException.class, listener::get);
133+
assertTrue(exception.getCause() instanceof TaskCancelledException);
134+
}
135+
136+
public void testTaskCancelled() {
121137
var request = new Request();
122138
ClusterBlock block = new ClusterBlock(randomInt(), "", true, true, false, randomFrom(RestStatus.values()), ClusterBlockLevel.ALL);
123139
var state = ClusterState.builder(clusterService.state()).blocks(ClusterBlocks.builder().addGlobalBlock(block)).build();
@@ -128,7 +144,6 @@ public void testTaskCancelledAfterBlock() {
128144
ActionTestUtils.execute(new Action(taskManager, clusterService), task, request, listener);
129145

130146
TaskCancelHelper.cancel(task, "test");
131-
assertFalse(listener.isDone());
132147
setState(clusterService, ClusterState.builder(state).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build());
133148
assertTrue(listener.isDone());
134149
var exception = assertThrows(ExecutionException.class, listener::get);

server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -676,11 +676,6 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
676676
}
677677
}, task, request, listener);
678678

679-
final int genericThreads = threadPool.info(ThreadPool.Names.GENERIC).getMax();
680-
final EsThreadPoolExecutor executor = (EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.GENERIC);
681-
final CyclicBarrier barrier = new CyclicBarrier(genericThreads + 1);
682-
final CountDownLatch latch = new CountDownLatch(1);
683-
684679
if (cancelBeforeStart == false) {
685680
assertThat(listener.isDone(), equalTo(false));
686681

@@ -702,6 +697,39 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
702697
expectThrows(TaskCancelledException.class, listener);
703698
}
704699

700+
/**
701+
* Verify that the listener is invoked immediately when the task is cancelled, instead of waiting for the next ClusterStateObserver run.
702+
*/
703+
public void testTaskCancellationDirectly() {
704+
ClusterBlock block = new ClusterBlock(1, "", true, true, false, randomFrom(RestStatus.values()), ClusterBlockLevel.ALL);
705+
ClusterState stateWithBlock = ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes))
706+
.blocks(ClusterBlocks.builder().addGlobalBlock(block))
707+
.build();
708+
709+
// Update the cluster state with a block so the request waits until it's unblocked
710+
setState(clusterService, stateWithBlock);
711+
712+
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
713+
714+
Request request = new Request();
715+
final CancellableTask task = (CancellableTask) taskManager.register("type", "internal:testAction", request);
716+
717+
PlainActionFuture<Response> listener = new PlainActionFuture<>();
718+
ActionTestUtils.execute(new Action("internal:testAction", transportService, clusterService, threadPool) {
719+
@Override
720+
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
721+
Set<ClusterBlock> blocks = state.blocks().global();
722+
return blocks.isEmpty() ? null : new ClusterBlockException(blocks);
723+
}
724+
}, task, request, listener);
725+
726+
assertThat(listener.isDone(), equalTo(false));
727+
728+
taskManager.cancel(task, "", () -> {});
729+
assertThat(task.isCancelled(), equalTo(true));
730+
expectThrows(TaskCancelledException.class, listener);
731+
}
732+
705733
public void testTaskCancellationOnceActionItIsDispatchedToMaster() throws Exception {
706734
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
707735

0 commit comments

Comments
 (0)