Skip to content

Commit 9b066ee

Browse files
committed
React more prompty to task cancellation while waiting for the cluster to unblock
Instead of waiting for the next run of the `ClusterStateObserver` (which might be arbitrarily far in the future, but bound by the timeout if one is set), we notify the listener immediately that the task has been cancelled. While doing so, we ensure we invoke the listener only once. Fixes #117971
1 parent 8312613 commit 9b066ee

File tree

4 files changed

+93
-7
lines changed

4 files changed

+93
-7
lines changed

server/src/main/java/org/elasticsearch/action/support/local/TransportLocalClusterStateAction.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@
2727
import org.elasticsearch.node.NodeClosedException;
2828
import org.elasticsearch.tasks.CancellableTask;
2929
import org.elasticsearch.tasks.Task;
30+
import org.elasticsearch.tasks.TaskCancelledException;
3031
import org.elasticsearch.tasks.TaskManager;
3132

3233
import java.util.concurrent.Executor;
34+
import java.util.concurrent.atomic.AtomicBoolean;
3335

3436
import static org.elasticsearch.common.Strings.format;
3537

@@ -104,20 +106,40 @@ private void waitForClusterUnblock(
104106
logger,
105107
clusterService.threadPool().getThreadContext()
106108
);
109+
// We track whether we already notified the listener of cancellation, to avoid invoking the listener twice.
110+
final var notifiedCancellation = new AtomicBoolean(false);
111+
if (task instanceof CancellableTask cancellableTask) {
112+
cancellableTask.addListener(() -> {
113+
if (notifiedCancellation.compareAndSet(false, true) == false) {
114+
return;
115+
}
116+
listener.onFailure(new TaskCancelledException("Task was cancelled"));
117+
logger.trace("task [{}] was cancelled, notifying listener", task.getId());
118+
});
119+
}
107120
observer.waitForNextChange(new ClusterStateObserver.Listener() {
108121
@Override
109122
public void onNewClusterState(ClusterState state) {
123+
if (notifiedCancellation.compareAndSet(false, true) == false) {
124+
return;
125+
}
110126
logger.trace("retrying with cluster state version [{}]", state.version());
111127
innerDoExecute(task, request, listener, state);
112128
}
113129

114130
@Override
115131
public void onClusterServiceClose() {
132+
if (notifiedCancellation.compareAndSet(false, true) == false) {
133+
return;
134+
}
116135
listener.onFailure(new NodeClosedException(clusterService.localNode()));
117136
}
118137

119138
@Override
120139
public void onTimeout(TimeValue timeout) {
140+
if (notifiedCancellation.compareAndSet(false, true) == false) {
141+
return;
142+
}
121143
logger.debug(
122144
() -> format("timed out while waiting for cluster to unblock in [%s] (timeout [%s])", actionName, timeout),
123145
exception

server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747

4848
import java.util.Optional;
4949
import java.util.concurrent.Executor;
50+
import java.util.concurrent.atomic.AtomicBoolean;
5051
import java.util.function.Predicate;
5152

5253
import static org.elasticsearch.core.Strings.format;
@@ -304,20 +305,40 @@ private void retry(long currentStateVersion, final Throwable failure, final Pred
304305
threadPool.getThreadContext()
305306
);
306307
}
308+
// We track whether we already notified the listener of cancellation, to avoid invoking the listener twice.
309+
final var notifiedCancellation = new AtomicBoolean(false);
310+
if (task instanceof CancellableTask cancellableTask) {
311+
cancellableTask.addListener(() -> {
312+
if (notifiedCancellation.compareAndSet(false, true) == false) {
313+
return;
314+
}
315+
listener.onFailure(new TaskCancelledException("Task was cancelled"));
316+
logger.trace("task [{}] was cancelled, notifying listener", task.getId());
317+
});
318+
}
307319
observer.waitForNextChange(new ClusterStateObserver.Listener() {
308320
@Override
309321
public void onNewClusterState(ClusterState state) {
322+
if (notifiedCancellation.compareAndSet(false, true) == false) {
323+
return;
324+
}
310325
logger.trace("retrying with cluster state version [{}]", state.version());
311326
doStart(state);
312327
}
313328

314329
@Override
315330
public void onClusterServiceClose() {
331+
if (notifiedCancellation.compareAndSet(false, true) == false) {
332+
return;
333+
}
316334
listener.onFailure(new NodeClosedException(clusterService.localNode()));
317335
}
318336

319337
@Override
320338
public void onTimeout(TimeValue timeout) {
339+
if (notifiedCancellation.compareAndSet(false, true) == false) {
340+
return;
341+
}
321342
logger.debug(() -> format("timed out while retrying [%s] after failure (timeout [%s])", actionName, timeout), failure);
322343
listener.onFailure(new MasterNotDiscoveredException(failure));
323344
}

server/src/test/java/org/elasticsearch/action/support/local/TransportLocalClusterStateActionTests.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,23 @@ public void testNonRetryableBlock() {
117117
assertTrue(exception.getCause() instanceof ClusterBlockException);
118118
}
119119

120-
public void testTaskCancelledAfterBlock() {
120+
public void testTaskCancelledImmediately() {
121+
var request = new Request();
122+
ClusterBlock block = new ClusterBlock(randomInt(), "", true, true, false, randomFrom(RestStatus.values()), ClusterBlockLevel.ALL);
123+
var state = ClusterState.builder(clusterService.state()).blocks(ClusterBlocks.builder().addGlobalBlock(block)).build();
124+
setState(clusterService, state);
125+
126+
CancellableTask task = new CancellableTask(randomLong(), "test", Action.ACTION_NAME, "", TaskId.EMPTY_TASK_ID, Map.of());
127+
PlainActionFuture<Response> listener = new PlainActionFuture<>();
128+
ActionTestUtils.execute(new Action(taskManager, clusterService), task, request, listener);
129+
130+
TaskCancelHelper.cancel(task, "test");
131+
assertTrue(listener.isDone());
132+
var exception = assertThrows(ExecutionException.class, listener::get);
133+
assertTrue(exception.getCause() instanceof TaskCancelledException);
134+
}
135+
136+
public void testTaskCancelled() {
121137
var request = new Request();
122138
ClusterBlock block = new ClusterBlock(randomInt(), "", true, true, false, randomFrom(RestStatus.values()), ClusterBlockLevel.ALL);
123139
var state = ClusterState.builder(clusterService.state()).blocks(ClusterBlocks.builder().addGlobalBlock(block)).build();
@@ -128,7 +144,6 @@ public void testTaskCancelledAfterBlock() {
128144
ActionTestUtils.execute(new Action(taskManager, clusterService), task, request, listener);
129145

130146
TaskCancelHelper.cancel(task, "test");
131-
assertFalse(listener.isDone());
132147
setState(clusterService, ClusterState.builder(state).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build());
133148
assertTrue(listener.isDone());
134149
var exception = assertThrows(ExecutionException.class, listener::get);

server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -676,11 +676,6 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
676676
}
677677
}, task, request, listener);
678678

679-
final int genericThreads = threadPool.info(ThreadPool.Names.GENERIC).getMax();
680-
final EsThreadPoolExecutor executor = (EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.GENERIC);
681-
final CyclicBarrier barrier = new CyclicBarrier(genericThreads + 1);
682-
final CountDownLatch latch = new CountDownLatch(1);
683-
684679
if (cancelBeforeStart == false) {
685680
assertThat(listener.isDone(), equalTo(false));
686681

@@ -702,6 +697,39 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
702697
expectThrows(TaskCancelledException.class, listener);
703698
}
704699

700+
/**
701+
* Verify that the listener is invoked immediately when the task is cancelled, instead of waiting for the next ClusterStateObserver run.
702+
*/
703+
public void testTaskCancellationDirectly() {
704+
ClusterBlock block = new ClusterBlock(1, "", true, true, false, randomFrom(RestStatus.values()), ClusterBlockLevel.ALL);
705+
ClusterState stateWithBlock = ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes))
706+
.blocks(ClusterBlocks.builder().addGlobalBlock(block))
707+
.build();
708+
709+
// Update the cluster state with a block so the request waits until it's unblocked
710+
setState(clusterService, stateWithBlock);
711+
712+
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
713+
714+
Request request = new Request();
715+
final CancellableTask task = (CancellableTask) taskManager.register("type", "internal:testAction", request);
716+
717+
PlainActionFuture<Response> listener = new PlainActionFuture<>();
718+
ActionTestUtils.execute(new Action("internal:testAction", transportService, clusterService, threadPool) {
719+
@Override
720+
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
721+
Set<ClusterBlock> blocks = state.blocks().global();
722+
return blocks.isEmpty() ? null : new ClusterBlockException(blocks);
723+
}
724+
}, task, request, listener);
725+
726+
assertThat(listener.isDone(), equalTo(false));
727+
728+
taskManager.cancel(task, "", () -> {});
729+
assertThat(task.isCancelled(), equalTo(true));
730+
expectThrows(TaskCancelledException.class, listener);
731+
}
732+
705733
public void testTaskCancellationOnceActionItIsDispatchedToMaster() throws Exception {
706734
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
707735

0 commit comments

Comments
 (0)