Skip to content

Commit a86aacd

Browse files
authored
Integrate threadpool scheduling with AbstractRunnable (#106552)
Today `ThreadPool#scheduleWithFixedDelay` does not interact as expected with `AbstractRunnable`: if the task fails or is rejected then this isn't passed back to the relevant callback, and the task cannot specify that it should be force-executed. This commit fixes that. Backport of #106542 to 7.17
1 parent 59f0e6f commit a86aacd

File tree

3 files changed

+174
-12
lines changed

3 files changed

+174
-12
lines changed

server/src/main/java/org/elasticsearch/threadpool/Scheduler.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ static boolean awaitTermination(
106106
* not be interrupted.
107107
*/
108108
default Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, String executor) {
109-
return new ReschedulingRunnable(command, interval, executor, this, (e) -> {}, (e) -> {});
109+
return new ReschedulingRunnable(command, interval, executor, this, e -> {}, e -> {});
110110
}
111111

112112
/**
@@ -217,13 +217,25 @@ public void doRun() {
217217

218218
@Override
219219
public void onFailure(Exception e) {
220-
failureConsumer.accept(e);
220+
try {
221+
if (runnable instanceof AbstractRunnable) {
222+
((AbstractRunnable) runnable).onFailure(e);
223+
}
224+
} finally {
225+
failureConsumer.accept(e);
226+
}
221227
}
222228

223229
@Override
224230
public void onRejection(Exception e) {
225231
run = false;
226-
rejectionConsumer.accept(e);
232+
try {
233+
if (runnable instanceof AbstractRunnable) {
234+
((AbstractRunnable) runnable).onRejection(e);
235+
}
236+
} finally {
237+
rejectionConsumer.accept(e);
238+
}
227239
}
228240

229241
@Override
@@ -238,6 +250,11 @@ public void onAfter() {
238250
}
239251
}
240252

253+
@Override
254+
public boolean isForceExecution() {
255+
return runnable instanceof AbstractRunnable && ((AbstractRunnable) runnable).isForceExecution();
256+
}
257+
241258
@Override
242259
public String toString() {
243260
return "ReschedulingRunnable{" + "runnable=" + runnable + ", interval=" + interval + '}';

server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -488,15 +488,13 @@ public void scheduleUnlessShuttingDown(TimeValue delay, String executor, Runnabl
488488

489489
@Override
490490
public Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, String executor) {
491-
return new ReschedulingRunnable(command, interval, executor, this, (e) -> {
492-
if (logger.isDebugEnabled()) {
493-
logger.debug(() -> new ParameterizedMessage("scheduled task [{}] was rejected on thread pool [{}]", command, executor), e);
494-
}
495-
},
496-
(e) -> logger.warn(
497-
() -> new ParameterizedMessage("failed to run scheduled task [{}] on thread pool [{}]", command, executor),
498-
e
499-
)
491+
return new ReschedulingRunnable(
492+
command,
493+
interval,
494+
executor,
495+
this,
496+
e -> logger.debug(() -> new ParameterizedMessage("scheduled task [{}] was rejected on thread pool [{}]", command, executor), e),
497+
e -> logger.warn(() -> new ParameterizedMessage("failed to run scheduled task [{}] on thread pool [{}]", command, executor), e)
500498
);
501499
}
502500

server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,21 @@
1111
import org.apache.logging.log4j.Level;
1212
import org.apache.logging.log4j.LogManager;
1313
import org.apache.logging.log4j.Logger;
14+
import org.elasticsearch.action.ActionRunnable;
15+
import org.elasticsearch.action.support.PlainActionFuture;
1416
import org.elasticsearch.common.logging.Loggers;
1517
import org.elasticsearch.common.settings.Settings;
18+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
1619
import org.elasticsearch.common.util.concurrent.EsExecutors;
20+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
1721
import org.elasticsearch.common.util.concurrent.FutureUtils;
1822
import org.elasticsearch.core.TimeValue;
1923
import org.elasticsearch.test.ESTestCase;
2024
import org.elasticsearch.test.MockLogAppender;
2125

2226
import java.util.concurrent.CountDownLatch;
2327
import java.util.concurrent.ExecutorService;
28+
import java.util.concurrent.TimeUnit;
2429

2530
import static org.elasticsearch.threadpool.ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING;
2631
import static org.elasticsearch.threadpool.ThreadPool.LATE_TIME_INTERVAL_WARN_THRESHOLD_SETTING;
@@ -304,4 +309,146 @@ public String toString() {
304309
assertTrue(terminate(threadPool));
305310
}
306311
}
312+
313+
public void testScheduledOneShotRejection() {
314+
final String name = "fixed-bounded";
315+
final ThreadPool threadPool = new TestThreadPool(
316+
getTestName(),
317+
new FixedExecutorBuilder(Settings.EMPTY, name, between(1, 5), between(1, 5), "prefix")
318+
);
319+
320+
final PlainActionFuture<Void> future = new PlainActionFuture<>();
321+
final CountDownLatch latch = new CountDownLatch(1);
322+
try {
323+
blockExecution(threadPool.executor(name), latch);
324+
threadPool.schedule(
325+
ActionRunnable.run(future, () -> fail("should not execute")),
326+
TimeValue.timeValueMillis(between(1, 100)),
327+
name
328+
);
329+
330+
expectThrows(EsRejectedExecutionException.class, () -> FutureUtils.get(future, 10, TimeUnit.SECONDS));
331+
} finally {
332+
latch.countDown();
333+
assertTrue(terminate(threadPool));
334+
}
335+
}
336+
337+
public void testScheduledOneShotForceExecution() {
338+
final String name = "fixed-bounded";
339+
final ThreadPool threadPool = new TestThreadPool(
340+
getTestName(),
341+
new FixedExecutorBuilder(Settings.EMPTY, name, between(1, 5), between(1, 5), "prefix")
342+
);
343+
344+
final PlainActionFuture<Void> future = new PlainActionFuture<>();
345+
final CountDownLatch latch = new CountDownLatch(1);
346+
try {
347+
blockExecution(threadPool.executor(name), latch);
348+
threadPool.schedule(forceExecution(ActionRunnable.run(future, () -> {})), TimeValue.timeValueMillis(between(1, 100)), name);
349+
350+
Thread.yield();
351+
assertFalse(future.isDone());
352+
353+
latch.countDown();
354+
FutureUtils.get(future, 10, TimeUnit.SECONDS); // shouldn't throw
355+
} finally {
356+
latch.countDown();
357+
assertTrue(terminate(threadPool));
358+
}
359+
}
360+
361+
public void testScheduledFixedDelayRejection() {
362+
final String name = "fixed-bounded";
363+
final ThreadPool threadPool = new TestThreadPool(
364+
getTestName(),
365+
new FixedExecutorBuilder(Settings.EMPTY, name, between(1, 5), between(1, 5), "prefix")
366+
);
367+
368+
final PlainActionFuture<Void> future = new PlainActionFuture<>();
369+
final CountDownLatch latch = new CountDownLatch(1);
370+
try {
371+
threadPool.scheduleWithFixedDelay(
372+
ActionRunnable.wrap(future, ignored -> Thread.yield()),
373+
TimeValue.timeValueMillis(between(1, 100)),
374+
name
375+
);
376+
377+
while (future.isDone() == false) {
378+
// might not block all threads the first time round if the scheduled runnable is running, so must keep trying
379+
blockExecution(threadPool.executor(name), latch);
380+
}
381+
expectThrows(EsRejectedExecutionException.class, () -> FutureUtils.get(future));
382+
} finally {
383+
latch.countDown();
384+
assertTrue(terminate(threadPool));
385+
}
386+
}
387+
388+
public void testScheduledFixedDelayForceExecution() {
389+
final String name = "fixed-bounded";
390+
final ThreadPool threadPool = new TestThreadPool(
391+
getTestName(),
392+
new FixedExecutorBuilder(Settings.EMPTY, name, between(1, 5), between(1, 5), "prefix")
393+
);
394+
395+
final PlainActionFuture<Void> future = new PlainActionFuture<>();
396+
final CountDownLatch latch = new CountDownLatch(1);
397+
try {
398+
blockExecution(threadPool.executor(name), latch);
399+
400+
threadPool.scheduleWithFixedDelay(
401+
forceExecution(ActionRunnable.run(future, Thread::yield)),
402+
TimeValue.timeValueMillis(between(1, 100)),
403+
name
404+
);
405+
406+
assertFalse(future.isDone());
407+
408+
latch.countDown();
409+
FutureUtils.get(future, 10, TimeUnit.SECONDS); // shouldn't throw
410+
} finally {
411+
latch.countDown();
412+
assertTrue(terminate(threadPool));
413+
}
414+
}
415+
416+
private static AbstractRunnable forceExecution(AbstractRunnable delegate) {
417+
return new AbstractRunnable() {
418+
@Override
419+
public void onFailure(Exception e) {
420+
delegate.onFailure(e);
421+
}
422+
423+
@Override
424+
protected void doRun() {
425+
delegate.run();
426+
}
427+
428+
@Override
429+
public void onRejection(Exception e) {
430+
delegate.onRejection(e);
431+
}
432+
433+
@Override
434+
public void onAfter() {
435+
delegate.onAfter();
436+
}
437+
438+
@Override
439+
public boolean isForceExecution() {
440+
return true;
441+
}
442+
};
443+
}
444+
445+
private static void blockExecution(ExecutorService executor, CountDownLatch latch) {
446+
while (true) {
447+
try {
448+
executor.execute(() -> safeAwait(latch));
449+
} catch (EsRejectedExecutionException e) {
450+
break;
451+
}
452+
}
453+
}
307454
}

0 commit comments

Comments
 (0)