Skip to content

Commit 47f9a34

Browse files
committed
React more prompty to task cancellation while waiting for the cluster to unblock
Instead of waiting for the next run of the `ClusterStateObserver` (which might be arbitrarily far in the future, but bound by the timeout if one is set), we notify the listener immediately that the task has been cancelled. While doing so, we ensure we invoke the listener only once. Fixes #117971
1 parent 2be74a4 commit 47f9a34

File tree

4 files changed

+91
-7
lines changed

4 files changed

+91
-7
lines changed

server/src/main/java/org/elasticsearch/action/support/local/TransportLocalClusterStateAction.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@
2727
import org.elasticsearch.node.NodeClosedException;
2828
import org.elasticsearch.tasks.CancellableTask;
2929
import org.elasticsearch.tasks.Task;
30+
import org.elasticsearch.tasks.TaskCancelledException;
3031
import org.elasticsearch.tasks.TaskManager;
3132

3233
import java.util.concurrent.Executor;
34+
import java.util.concurrent.atomic.AtomicBoolean;
3335

3436
import static org.elasticsearch.common.Strings.format;
3537

@@ -104,20 +106,39 @@ private void waitForClusterUnblock(
104106
logger,
105107
clusterService.threadPool().getThreadContext()
106108
);
109+
final var notifiedCancellation = new AtomicBoolean(false);
110+
if (task instanceof CancellableTask cancellableTask) {
111+
cancellableTask.addListener(() -> {
112+
if (notifiedCancellation.compareAndSet(false, true) == false) {
113+
return;
114+
}
115+
listener.onFailure(new TaskCancelledException("Task was cancelled"));
116+
logger.trace("task [{}] was cancelled, notifying listener", task.getId());
117+
});
118+
}
107119
observer.waitForNextChange(new ClusterStateObserver.Listener() {
108120
@Override
109121
public void onNewClusterState(ClusterState state) {
122+
if (notifiedCancellation.compareAndSet(false, true) == false) {
123+
return;
124+
}
110125
logger.trace("retrying with cluster state version [{}]", state.version());
111126
innerDoExecute(task, request, listener, state);
112127
}
113128

114129
@Override
115130
public void onClusterServiceClose() {
131+
if (notifiedCancellation.compareAndSet(false, true) == false) {
132+
return;
133+
}
116134
listener.onFailure(new NodeClosedException(clusterService.localNode()));
117135
}
118136

119137
@Override
120138
public void onTimeout(TimeValue timeout) {
139+
if (notifiedCancellation.compareAndSet(false, true) == false) {
140+
return;
141+
}
121142
logger.debug(
122143
() -> format("timed out while waiting for cluster to unblock in [%s] (timeout [%s])", actionName, timeout),
123144
exception

server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747

4848
import java.util.Optional;
4949
import java.util.concurrent.Executor;
50+
import java.util.concurrent.atomic.AtomicBoolean;
5051
import java.util.function.Predicate;
5152

5253
import static org.elasticsearch.core.Strings.format;
@@ -304,20 +305,39 @@ private void retry(long currentStateVersion, final Throwable failure, final Pred
304305
threadPool.getThreadContext()
305306
);
306307
}
308+
final var notifiedCancellation = new AtomicBoolean(false);
309+
if (task instanceof CancellableTask cancellableTask) {
310+
cancellableTask.addListener(() -> {
311+
if (notifiedCancellation.compareAndSet(false, true) == false) {
312+
return;
313+
}
314+
listener.onFailure(new TaskCancelledException("Task was cancelled"));
315+
logger.trace("task [{}] was cancelled, notifying listener", task.getId());
316+
});
317+
}
307318
observer.waitForNextChange(new ClusterStateObserver.Listener() {
308319
@Override
309320
public void onNewClusterState(ClusterState state) {
321+
if (notifiedCancellation.compareAndSet(false, true) == false) {
322+
return;
323+
}
310324
logger.trace("retrying with cluster state version [{}]", state.version());
311325
doStart(state);
312326
}
313327

314328
@Override
315329
public void onClusterServiceClose() {
330+
if (notifiedCancellation.compareAndSet(false, true) == false) {
331+
return;
332+
}
316333
listener.onFailure(new NodeClosedException(clusterService.localNode()));
317334
}
318335

319336
@Override
320337
public void onTimeout(TimeValue timeout) {
338+
if (notifiedCancellation.compareAndSet(false, true) == false) {
339+
return;
340+
}
321341
logger.debug(() -> format("timed out while retrying [%s] after failure (timeout [%s])", actionName, timeout), failure);
322342
listener.onFailure(new MasterNotDiscoveredException(failure));
323343
}

server/src/test/java/org/elasticsearch/action/support/local/TransportLocalClusterStateActionTests.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,23 @@ public void testNonRetryableBlock() {
117117
assertTrue(exception.getCause() instanceof ClusterBlockException);
118118
}
119119

120-
public void testTaskCancelledAfterBlock() {
120+
public void testTaskCancelledImmediately() {
121+
var request = new Request();
122+
ClusterBlock block = new ClusterBlock(randomInt(), "", true, true, false, randomFrom(RestStatus.values()), ClusterBlockLevel.ALL);
123+
var state = ClusterState.builder(clusterService.state()).blocks(ClusterBlocks.builder().addGlobalBlock(block)).build();
124+
setState(clusterService, state);
125+
126+
CancellableTask task = new CancellableTask(randomLong(), "test", Action.ACTION_NAME, "", TaskId.EMPTY_TASK_ID, Map.of());
127+
PlainActionFuture<Response> listener = new PlainActionFuture<>();
128+
ActionTestUtils.execute(new Action(taskManager, clusterService), task, request, listener);
129+
130+
TaskCancelHelper.cancel(task, "test");
131+
assertTrue(listener.isDone());
132+
var exception = assertThrows(ExecutionException.class, listener::get);
133+
assertTrue(exception.getCause() instanceof TaskCancelledException);
134+
}
135+
136+
public void testTaskCancelled() {
121137
var request = new Request();
122138
ClusterBlock block = new ClusterBlock(randomInt(), "", true, true, false, randomFrom(RestStatus.values()), ClusterBlockLevel.ALL);
123139
var state = ClusterState.builder(clusterService.state()).blocks(ClusterBlocks.builder().addGlobalBlock(block)).build();
@@ -128,7 +144,6 @@ public void testTaskCancelledAfterBlock() {
128144
ActionTestUtils.execute(new Action(taskManager, clusterService), task, request, listener);
129145

130146
TaskCancelHelper.cancel(task, "test");
131-
assertFalse(listener.isDone());
132147
setState(clusterService, ClusterState.builder(state).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build());
133148
assertTrue(listener.isDone());
134149
var exception = assertThrows(ExecutionException.class, listener::get);

server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -676,11 +676,6 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
676676
}
677677
}, task, request, listener);
678678

679-
final int genericThreads = threadPool.info(ThreadPool.Names.GENERIC).getMax();
680-
final EsThreadPoolExecutor executor = (EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.GENERIC);
681-
final CyclicBarrier barrier = new CyclicBarrier(genericThreads + 1);
682-
final CountDownLatch latch = new CountDownLatch(1);
683-
684679
if (cancelBeforeStart == false) {
685680
assertThat(listener.isDone(), equalTo(false));
686681

@@ -702,6 +697,39 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
702697
expectThrows(TaskCancelledException.class, listener);
703698
}
704699

700+
/**
701+
* Verify that the listener is invoked immediately when the task is cancelled, instead of waiting for the next ClusterStateObserver run.
702+
*/
703+
public void testTaskCancellationDirectly() {
704+
ClusterBlock block = new ClusterBlock(1, "", true, true, false, randomFrom(RestStatus.values()), ClusterBlockLevel.ALL);
705+
ClusterState stateWithBlock = ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes))
706+
.blocks(ClusterBlocks.builder().addGlobalBlock(block))
707+
.build();
708+
709+
// Update the cluster state with a block so the request waits until it's unblocked
710+
setState(clusterService, stateWithBlock);
711+
712+
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
713+
714+
Request request = new Request();
715+
final CancellableTask task = (CancellableTask) taskManager.register("type", "internal:testAction", request);
716+
717+
PlainActionFuture<Response> listener = new PlainActionFuture<>();
718+
ActionTestUtils.execute(new Action("internal:testAction", transportService, clusterService, threadPool) {
719+
@Override
720+
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
721+
Set<ClusterBlock> blocks = state.blocks().global();
722+
return blocks.isEmpty() ? null : new ClusterBlockException(blocks);
723+
}
724+
}, task, request, listener);
725+
726+
assertThat(listener.isDone(), equalTo(false));
727+
728+
taskManager.cancel(task, "", () -> {});
729+
assertThat(task.isCancelled(), equalTo(true));
730+
expectThrows(TaskCancelledException.class, listener);
731+
}
732+
705733
public void testTaskCancellationOnceActionItIsDispatchedToMaster() throws Exception {
706734
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
707735

0 commit comments

Comments
 (0)