|
38 | 38 | import org.elasticsearch.common.component.Lifecycle; |
39 | 39 | import org.elasticsearch.common.settings.ClusterSettings; |
40 | 40 | import org.elasticsearch.common.settings.Settings; |
| 41 | +import org.elasticsearch.common.util.concurrent.AbstractRunnable; |
41 | 42 | import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; |
42 | 43 | import org.elasticsearch.common.util.concurrent.EsExecutors; |
43 | 44 | import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; |
@@ -2099,7 +2100,7 @@ public void onFailure(Exception e) { |
2099 | 2100 | } |
2100 | 2101 | } |
2101 | 2102 |
|
2102 | | - public void testRejectionBehaviour() { |
| 2103 | + public void testRejectionBehaviourAtSubmission() { |
2103 | 2104 |
|
2104 | 2105 | final var deterministicTaskQueue = new DeterministicTaskQueue(); |
2105 | 2106 | final var threadPool = deterministicTaskQueue.getThreadPool(); |
@@ -2167,6 +2168,81 @@ public void onFailure(Exception e) { |
2167 | 2168 | } |
2168 | 2169 | } |
2169 | 2170 |
|
| 2171 | + @TestLogging(reason = "verifying DEBUG logs", value = "org.elasticsearch.cluster.service.MasterService:DEBUG") |
| 2172 | + public void testRejectionBehaviourAtCompletion() { |
| 2173 | + |
| 2174 | + final var deterministicTaskQueue = new DeterministicTaskQueue(); |
| 2175 | + final var threadPool = deterministicTaskQueue.getThreadPool(); |
| 2176 | + final var threadPoolExecutor = new StoppableExecutorServiceWrapper(threadPool.generic()) { |
| 2177 | + |
| 2178 | + boolean executedTask = false; |
| 2179 | + |
| 2180 | + @Override |
| 2181 | + public void execute(Runnable command) { |
| 2182 | + if (command instanceof AbstractRunnable abstractRunnable) { |
| 2183 | + if (executedTask) { |
| 2184 | + abstractRunnable.onRejection(new EsRejectedExecutionException("simulated", true)); |
| 2185 | + } else { |
| 2186 | + executedTask = true; |
| 2187 | + super.execute(command); |
| 2188 | + } |
| 2189 | + } else { |
| 2190 | + fail("not an AbstractRunnable: " + command); |
| 2191 | + } |
| 2192 | + } |
| 2193 | + }; |
| 2194 | + |
| 2195 | + final var appender = new MockLogAppender(); |
| 2196 | + appender.addExpectation( |
| 2197 | + new MockLogAppender.UnseenEventExpectation("warning", MasterService.class.getCanonicalName(), Level.WARN, "*") |
| 2198 | + ); |
| 2199 | + appender.addExpectation( |
| 2200 | + new MockLogAppender.SeenEventExpectation( |
| 2201 | + "debug", |
| 2202 | + MasterService.class.getCanonicalName(), |
| 2203 | + Level.DEBUG, |
| 2204 | + "shut down during publication of cluster state version*" |
| 2205 | + ) |
| 2206 | + ); |
| 2207 | + |
| 2208 | + try ( |
| 2209 | + var ignored = appender.capturing(MasterService.class); |
| 2210 | + var masterService = createMasterService(true, null, threadPool, threadPoolExecutor) |
| 2211 | + ) { |
| 2212 | + |
| 2213 | + final var testHeader = "test-header"; |
| 2214 | + |
| 2215 | + class TestTask implements ClusterStateTaskListener { |
| 2216 | + private final String expectedHeader = threadPool.getThreadContext().getHeader(testHeader); |
| 2217 | + |
| 2218 | + @Override |
| 2219 | + public void onFailure(Exception e) { |
| 2220 | + // post-publication rejections are currently just dropped, see https://github.com/elastic/elasticsearch/issues/94930 |
| 2221 | + throw new AssertionError("unexpected exception", e); |
| 2222 | + } |
| 2223 | + } |
| 2224 | + |
| 2225 | + final var queue = masterService.createTaskQueue("queue", randomFrom(Priority.values()), batchExecutionContext -> { |
| 2226 | + for (var taskContext : batchExecutionContext.taskContexts()) { |
| 2227 | + taskContext.success(() -> fail("should not succeed")); |
| 2228 | + } |
| 2229 | + return ClusterState.builder(batchExecutionContext.initialState()).build(); |
| 2230 | + }); |
| 2231 | + |
| 2232 | + try (var ignoredContext = threadPool.getThreadContext().stashContext()) { |
| 2233 | + threadPool.getThreadContext().putHeader(testHeader, randomAlphaOfLength(10)); |
| 2234 | + queue.submitTask("batched", new TestTask(), null); |
| 2235 | + } |
| 2236 | + |
| 2237 | + threadPool.getThreadContext().markAsSystemContext(); |
| 2238 | + deterministicTaskQueue.runAllTasks(); |
| 2239 | + assertFalse(deterministicTaskQueue.hasRunnableTasks()); |
| 2240 | + assertFalse(deterministicTaskQueue.hasDeferredTasks()); |
| 2241 | + |
| 2242 | + appender.assertAllExpectationsMatched(); |
| 2243 | + } |
| 2244 | + } |
| 2245 | + |
2170 | 2246 | public void testLifecycleBehaviour() { |
2171 | 2247 |
|
2172 | 2248 | final var deterministicTaskQueue = new DeterministicTaskQueue(); |
|
0 commit comments