Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/changelog/128737.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 128737
summary: React more prompty to task cancellation while waiting for the cluster to
unblock
area: Task Management
type: enhancement
issues:
- 117971
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskManager;

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.common.Strings.format;

Expand Down Expand Up @@ -104,20 +106,40 @@ private void waitForClusterUnblock(
logger,
clusterService.threadPool().getThreadContext()
);
// We track whether we already notified the listener of cancellation, to avoid invoking the listener twice.
final var notifiedCancellation = new AtomicBoolean(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming is a bit odd, this gets set to true even if we haven't notified the listener of cancellation. Maybe waitComplete?

I was going to suggest using org.elasticsearch.action.ActionListener#notifyOnce but it's more subtle than that: we want to suppress the cancellation listener before calling innerDoExecute. I think that deserves a comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed the variable to notifiedListener. Even though we don't notify the listener directly in onNewClusterState, we do in innerDoExecute. I also updated the comment. Let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think that's still going to be confusing to some future reader. The flag doesn't indicate we've completed the listener, it indicates that the wait for an appropriate cluster state in TransportLocalClusterStateAction is over and we've started to execute the action, so I think waitComplete would be a better name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, renamed to waitComplete in 3c6aafa.

if (task instanceof CancellableTask cancellableTask) {
cancellableTask.addListener(() -> {
if (notifiedCancellation.compareAndSet(false, true) == false) {
return;
}
listener.onFailure(new TaskCancelledException("Task was cancelled"));
logger.trace("task [{}] was cancelled, notifying listener", task.getId());
});
}
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
if (notifiedCancellation.compareAndSet(false, true) == false) {
return;
}
logger.trace("retrying with cluster state version [{}]", state.version());
innerDoExecute(task, request, listener, state);
}

@Override
public void onClusterServiceClose() {
if (notifiedCancellation.compareAndSet(false, true) == false) {
return;
}
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
public void onTimeout(TimeValue timeout) {
if (notifiedCancellation.compareAndSet(false, true) == false) {
return;
}
logger.debug(
() -> format("timed out while waiting for cluster to unblock in [%s] (timeout [%s])", actionName, timeout),
exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;

import static org.elasticsearch.core.Strings.format;
Expand Down Expand Up @@ -304,20 +305,40 @@ private void retry(long currentStateVersion, final Throwable failure, final Pred
threadPool.getThreadContext()
);
}
// We track whether we already notified the listener of cancellation, to avoid invoking the listener twice.
final var notifiedCancellation = new AtomicBoolean(false);
if (task instanceof CancellableTask cancellableTask) {
cancellableTask.addListener(() -> {
if (notifiedCancellation.compareAndSet(false, true) == false) {
return;
}
listener.onFailure(new TaskCancelledException("Task was cancelled"));
logger.trace("task [{}] was cancelled, notifying listener", task.getId());
});
}
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
if (notifiedCancellation.compareAndSet(false, true) == false) {
return;
}
logger.trace("retrying with cluster state version [{}]", state.version());
doStart(state);
}

@Override
public void onClusterServiceClose() {
if (notifiedCancellation.compareAndSet(false, true) == false) {
return;
}
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
public void onTimeout(TimeValue timeout) {
if (notifiedCancellation.compareAndSet(false, true) == false) {
return;
}
logger.debug(() -> format("timed out while retrying [%s] after failure (timeout [%s])", actionName, timeout), failure);
listener.onFailure(new MasterNotDiscoveredException(failure));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,23 @@ public void testNonRetryableBlock() {
assertTrue(exception.getCause() instanceof ClusterBlockException);
}

public void testTaskCancelledAfterBlock() {
public void testTaskCancelledImmediately() {
var request = new Request();
ClusterBlock block = new ClusterBlock(randomInt(), "", true, true, false, randomFrom(RestStatus.values()), ClusterBlockLevel.ALL);
var state = ClusterState.builder(clusterService.state()).blocks(ClusterBlocks.builder().addGlobalBlock(block)).build();
setState(clusterService, state);

CancellableTask task = new CancellableTask(randomLong(), "test", Action.ACTION_NAME, "", TaskId.EMPTY_TASK_ID, Map.of());
PlainActionFuture<Response> listener = new PlainActionFuture<>();
ActionTestUtils.execute(new Action(taskManager, clusterService), task, request, listener);

TaskCancelHelper.cancel(task, "test");
assertTrue(listener.isDone());
var exception = assertThrows(ExecutionException.class, listener::get);
assertTrue(exception.getCause() instanceof TaskCancelledException);
}

public void testTaskCancelled() {
var request = new Request();
ClusterBlock block = new ClusterBlock(randomInt(), "", true, true, false, randomFrom(RestStatus.values()), ClusterBlockLevel.ALL);
var state = ClusterState.builder(clusterService.state()).blocks(ClusterBlocks.builder().addGlobalBlock(block)).build();
Expand All @@ -128,7 +144,6 @@ public void testTaskCancelledAfterBlock() {
ActionTestUtils.execute(new Action(taskManager, clusterService), task, request, listener);

TaskCancelHelper.cancel(task, "test");
assertFalse(listener.isDone());
setState(clusterService, ClusterState.builder(state).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build());
assertTrue(listener.isDone());
var exception = assertThrows(ExecutionException.class, listener::get);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,11 +676,6 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
}
}, task, request, listener);

final int genericThreads = threadPool.info(ThreadPool.Names.GENERIC).getMax();
final EsThreadPoolExecutor executor = (EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.GENERIC);
final CyclicBarrier barrier = new CyclicBarrier(genericThreads + 1);
final CountDownLatch latch = new CountDownLatch(1);

if (cancelBeforeStart == false) {
assertThat(listener.isDone(), equalTo(false));

Expand All @@ -702,6 +697,39 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
expectThrows(TaskCancelledException.class, listener);
}

/**
* Verify that the listener is invoked immediately when the task is cancelled, instead of waiting for the next ClusterStateObserver run.
*/
public void testTaskCancellationDirectly() {
ClusterBlock block = new ClusterBlock(1, "", true, true, false, randomFrom(RestStatus.values()), ClusterBlockLevel.ALL);
ClusterState stateWithBlock = ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes))
.blocks(ClusterBlocks.builder().addGlobalBlock(block))
.build();

// Update the cluster state with a block so the request waits until it's unblocked
setState(clusterService, stateWithBlock);

TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());

Request request = new Request();
final CancellableTask task = (CancellableTask) taskManager.register("type", "internal:testAction", request);

PlainActionFuture<Response> listener = new PlainActionFuture<>();
ActionTestUtils.execute(new Action("internal:testAction", transportService, clusterService, threadPool) {
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
Set<ClusterBlock> blocks = state.blocks().global();
return blocks.isEmpty() ? null : new ClusterBlockException(blocks);
}
}, task, request, listener);

assertThat(listener.isDone(), equalTo(false));

taskManager.cancel(task, "", () -> {});
assertThat(task.isCancelled(), equalTo(true));
expectThrows(TaskCancelledException.class, listener);
}

public void testTaskCancellationOnceActionItIsDispatchedToMaster() throws Exception {
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());

Expand Down
Loading