Skip to content

Commit cca2627

Browse files
authored
Handle scheduler shutdown in MasterService (#105552)
Today if `ThreadPool#scheduler` is shut down while submitting a task for execution by the `MasterService` then we directly throw the rejection exception to the caller. By the looks of it most callers don't expect this method to throw anything, so this commit adjusts the behaviour to fail the task instead. Closes #105549
1 parent 8133209 commit cca2627

File tree

3 files changed

+90
-7
lines changed

3 files changed

+90
-7
lines changed

server/src/main/java/org/elasticsearch/cluster/service/MasterService.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1470,8 +1470,13 @@ private void completeTask(Exception e) {
14701470

14711471
@Override
14721472
public String toString() {
1473-
return Strings.format("master service timeout handler for [%s][%s] after [%s]", source, taskHolder.get(), timeout);
1473+
return getTimeoutTaskDescription(source, taskHolder.get(), timeout);
14741474
}
1475+
1476+
}
1477+
1478+
static String getTimeoutTaskDescription(String source, Object task, TimeValue timeout) {
1479+
return Strings.format("master service timeout handler for [%s][%s] after [%s]", source, task, timeout);
14751480
}
14761481

14771482
/**
@@ -1522,11 +1527,25 @@ public void submitTask(String source, T task, @Nullable TimeValue timeout) {
15221527
final var taskHolder = new AtomicReference<>(task);
15231528
final Scheduler.Cancellable timeoutCancellable;
15241529
if (timeout != null && timeout.millis() > 0) {
1525-
timeoutCancellable = threadPool.schedule(
1526-
new TaskTimeoutHandler<>(timeout, source, taskHolder),
1527-
timeout,
1528-
threadPool.generic()
1529-
);
1530+
try {
1531+
timeoutCancellable = threadPool.schedule(
1532+
new TaskTimeoutHandler<>(timeout, source, taskHolder),
1533+
timeout,
1534+
threadPool.generic()
1535+
);
1536+
} catch (Exception e) {
1537+
assert e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown() : e;
1538+
task.onFailure(
1539+
new FailedToCommitClusterStateException(
1540+
"could not schedule timeout handler for [%s][%s] on queue [%s]",
1541+
e,
1542+
source,
1543+
task,
1544+
name
1545+
)
1546+
);
1547+
return;
1548+
}
15301549
} else {
15311550
timeoutCancellable = null;
15321551
}

server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2121,6 +2121,59 @@ public void onFailure(Exception e) {
21212121
}
21222122
}
21232123

2124+
public void testTimeoutRejectionBehaviourAtSubmission() {
2125+
2126+
final var source = randomIdentifier();
2127+
final var taskDescription = randomIdentifier();
2128+
final var timeout = TimeValue.timeValueMillis(between(0, 100000));
2129+
2130+
final var actionCount = new AtomicInteger();
2131+
final var deterministicTaskQueue = new DeterministicTaskQueue();
2132+
final var threadPool =
2133+
// a threadpool which simulates the rejection of a master service timeout handler, but runs all other tasks as normal
2134+
deterministicTaskQueue.getThreadPool(r -> {
2135+
if (r.toString().equals(MasterService.getTimeoutTaskDescription(source, taskDescription, timeout))) {
2136+
// assertTrue because this should happen exactly once
2137+
assertTrue(actionCount.compareAndSet(0, 1));
2138+
throw new EsRejectedExecutionException("simulated rejection", true);
2139+
} else {
2140+
return r;
2141+
}
2142+
});
2143+
2144+
try (var masterService = createMasterService(true, null, threadPool, new StoppableExecutorServiceWrapper(threadPool.generic()))) {
2145+
masterService.createTaskQueue(
2146+
"queue",
2147+
randomFrom(Priority.values()),
2148+
batchExecutionContext -> fail(null, "should not execute batch")
2149+
).submitTask(source, new ClusterStateTaskListener() {
2150+
@Override
2151+
public void onFailure(Exception e) {
2152+
if (e instanceof FailedToCommitClusterStateException
2153+
&& e.getMessage().startsWith("could not schedule timeout handler")
2154+
&& e.getCause() instanceof EsRejectedExecutionException esre
2155+
&& esre.isExecutorShutdown()
2156+
&& esre.getMessage().equals("simulated rejection")) {
2157+
// assertTrue because we must receive the exception we synthesized, exactly once, after triggering the rejection
2158+
assertTrue(actionCount.compareAndSet(1, 2));
2159+
} else {
2160+
// fail the test if we get anything else
2161+
throw new AssertionError("unexpected exception", e);
2162+
}
2163+
}
2164+
2165+
@Override
2166+
public String toString() {
2167+
return taskDescription;
2168+
}
2169+
}, timeout);
2170+
2171+
assertFalse(deterministicTaskQueue.hasRunnableTasks());
2172+
assertFalse(deterministicTaskQueue.hasDeferredTasks());
2173+
assertEquals(2, actionCount.get()); // ensures this test doesn't accidentally become trivial: both expected actions happened
2174+
}
2175+
}
2176+
21242177
@TestLogging(reason = "verifying DEBUG logs", value = "org.elasticsearch.cluster.service.MasterService:DEBUG")
21252178
public void testRejectionBehaviourAtCompletion() {
21262179

test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@
177177
import java.util.concurrent.Callable;
178178
import java.util.concurrent.CopyOnWriteArrayList;
179179
import java.util.concurrent.CountDownLatch;
180+
import java.util.concurrent.ExecutionException;
180181
import java.util.concurrent.Executor;
181182
import java.util.concurrent.TimeUnit;
182183
import java.util.concurrent.atomic.AtomicInteger;
@@ -971,7 +972,17 @@ private ClusterHealthStatus ensureColor(
971972
// been removed by the master so that the health check applies to the set of nodes we expect to be part of the cluster.
972973
.waitForNodes(Integer.toString(cluster().size()));
973974

974-
final ClusterHealthResponse clusterHealthResponse = clusterAdmin().health(healthRequest).actionGet();
975+
final ClusterHealthResponse clusterHealthResponse;
976+
try {
977+
clusterHealthResponse = clusterAdmin().health(healthRequest).get();
978+
} catch (InterruptedException e) {
979+
Thread.currentThread().interrupt();
980+
logger.error("interrupted while waiting for health response", e);
981+
throw new AssertionError("interrupted while waiting for health response", e);
982+
} catch (ExecutionException e) {
983+
logger.error("failed to get health response", e);
984+
throw new AssertionError("failed to get health response", e);
985+
}
975986
if (clusterHealthResponse.isTimedOut()) {
976987
final var allocationExplainRef = new AtomicReference<ClusterAllocationExplainResponse>();
977988
final var clusterStateRef = new AtomicReference<ClusterStateResponse>();

0 commit comments

Comments
 (0)