Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/changelog/128737.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 128737
summary: React more prompty to task cancellation while waiting for the cluster to
unblock
area: Task Management
type: enhancement
issues:
- 117971
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskManager;

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.common.Strings.format;

Expand Down Expand Up @@ -104,20 +106,41 @@ private void waitForClusterUnblock(
logger,
clusterService.threadPool().getThreadContext()
);
// We track whether we already notified the listener or started executing the action, to avoid invoking the listener twice.
// Because of that second part, we can not use ActionListener#notifyOnce.
final var waitComplete = new AtomicBoolean(false);
if (task instanceof CancellableTask cancellableTask) {
cancellableTask.addListener(() -> {
if (waitComplete.compareAndSet(false, true) == false) {
return;
}
listener.onFailure(new TaskCancelledException("Task was cancelled"));
logger.trace("task [{}] was cancelled, notifying listener", task.getId());
});
}
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
if (waitComplete.compareAndSet(false, true) == false) {
return;
}
logger.trace("retrying with cluster state version [{}]", state.version());
innerDoExecute(task, request, listener, state);
}

@Override
public void onClusterServiceClose() {
if (waitComplete.compareAndSet(false, true) == false) {
return;
}
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
public void onTimeout(TimeValue timeout) {
if (waitComplete.compareAndSet(false, true) == false) {
return;
}
logger.debug(
() -> format("timed out while waiting for cluster to unblock in [%s] (timeout [%s])", actionName, timeout),
exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;

import static org.elasticsearch.core.Strings.format;
Expand Down Expand Up @@ -304,20 +305,41 @@ private void retry(long currentStateVersion, final Throwable failure, final Pred
threadPool.getThreadContext()
);
}
// We track whether we already notified the listener or started executing the action, to avoid invoking the listener twice.
// Because of that second part, we can not use ActionListener#notifyOnce.
final var waitComplete = new AtomicBoolean(false);
if (task instanceof CancellableTask cancellableTask) {
cancellableTask.addListener(() -> {
if (waitComplete.compareAndSet(false, true) == false) {
return;
}
listener.onFailure(new TaskCancelledException("Task was cancelled"));
logger.trace("task [{}] was cancelled, notifying listener", task.getId());
});
}
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
if (waitComplete.compareAndSet(false, true) == false) {
return;
}
logger.trace("retrying with cluster state version [{}]", state.version());
doStart(state);
}

@Override
public void onClusterServiceClose() {
if (waitComplete.compareAndSet(false, true) == false) {
return;
}
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
public void onTimeout(TimeValue timeout) {
if (waitComplete.compareAndSet(false, true) == false) {
return;
}
logger.debug(() -> format("timed out while retrying [%s] after failure (timeout [%s])", actionName, timeout), failure);
listener.onFailure(new MasterNotDiscoveredException(failure));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,23 @@ public void testNonRetryableBlock() {
assertTrue(exception.getCause() instanceof ClusterBlockException);
}

public void testTaskCancelledAfterBlock() {
public void testTaskCancelledImmediately() {
var request = new Request();
ClusterBlock block = new ClusterBlock(randomInt(), "", true, true, false, randomFrom(RestStatus.values()), ClusterBlockLevel.ALL);
var state = ClusterState.builder(clusterService.state()).blocks(ClusterBlocks.builder().addGlobalBlock(block)).build();
setState(clusterService, state);

CancellableTask task = new CancellableTask(randomLong(), "test", Action.ACTION_NAME, "", TaskId.EMPTY_TASK_ID, Map.of());
PlainActionFuture<Response> listener = new PlainActionFuture<>();
ActionTestUtils.execute(new Action(taskManager, clusterService), task, request, listener);

TaskCancelHelper.cancel(task, "test");
assertTrue(listener.isDone());
var exception = assertThrows(ExecutionException.class, listener::get);
assertTrue(exception.getCause() instanceof TaskCancelledException);
}

public void testTaskCancelled() {
var request = new Request();
ClusterBlock block = new ClusterBlock(randomInt(), "", true, true, false, randomFrom(RestStatus.values()), ClusterBlockLevel.ALL);
var state = ClusterState.builder(clusterService.state()).blocks(ClusterBlocks.builder().addGlobalBlock(block)).build();
Expand All @@ -128,7 +144,6 @@ public void testTaskCancelledAfterBlock() {
ActionTestUtils.execute(new Action(taskManager, clusterService), task, request, listener);

TaskCancelHelper.cancel(task, "test");
assertFalse(listener.isDone());
setState(clusterService, ClusterState.builder(state).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build());
assertTrue(listener.isDone());
var exception = assertThrows(ExecutionException.class, listener::get);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,11 +676,6 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
}
}, task, request, listener);

final int genericThreads = threadPool.info(ThreadPool.Names.GENERIC).getMax();
final EsThreadPoolExecutor executor = (EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.GENERIC);
final CyclicBarrier barrier = new CyclicBarrier(genericThreads + 1);
final CountDownLatch latch = new CountDownLatch(1);

if (cancelBeforeStart == false) {
assertThat(listener.isDone(), equalTo(false));

Expand All @@ -702,6 +697,39 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
expectThrows(TaskCancelledException.class, listener);
}

/**
* Verify that the listener is invoked immediately when the task is cancelled, instead of waiting for the next ClusterStateObserver run.
*/
public void testTaskCancellationDirectly() {
ClusterBlock block = new ClusterBlock(1, "", true, true, false, randomFrom(RestStatus.values()), ClusterBlockLevel.ALL);
ClusterState stateWithBlock = ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes))
.blocks(ClusterBlocks.builder().addGlobalBlock(block))
.build();

// Update the cluster state with a block so the request waits until it's unblocked
setState(clusterService, stateWithBlock);

TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());

Request request = new Request();
final CancellableTask task = (CancellableTask) taskManager.register("type", "internal:testAction", request);

PlainActionFuture<Response> listener = new PlainActionFuture<>();
ActionTestUtils.execute(new Action("internal:testAction", transportService, clusterService, threadPool) {
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
Set<ClusterBlock> blocks = state.blocks().global();
return blocks.isEmpty() ? null : new ClusterBlockException(blocks);
}
}, task, request, listener);

assertThat(listener.isDone(), equalTo(false));

taskManager.cancel(task, "", () -> {});
assertThat(task.isCancelled(), equalTo(true));
expectThrows(TaskCancelledException.class, listener);
}

public void testTaskCancellationOnceActionItIsDispatchedToMaster() throws Exception {
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());

Expand Down
Loading