Skip to content

Commit e013297

Browse files
committed
Improve rejection handling in ThreadedActionListener (#87042)
Today if the submission within `ThreadedActionListener#onResponse` is rejected from its threadpool then we call `delegate#onFailure` with the rejection exception on the calling thread. However, if the submission within `ThreadedActionListener#onFailure` is rejected then we just drop the listener and log an error. In most cases completing a listener exceptionally triggers some cleanup which is often fairly lightweight and therefore safe enough to complete on the calling thread. In any case it's generally preferable to complete a listener exceptionally on the wrong thread rather than just dropping it entirely. This commit fixes this and adds a test to verify that `ThreadedActionListener` completes properly even in the face of rejections.
1 parent fbea34c commit e013297

File tree

3 files changed

+115
-3
lines changed

3 files changed

+115
-3
lines changed

server/src/main/java/org/elasticsearch/action/support/ThreadedActionListener.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package org.elasticsearch.action.support;
1010

1111
import org.apache.logging.log4j.Logger;
12-
import org.apache.logging.log4j.message.ParameterizedMessage;
1312
import org.elasticsearch.action.ActionListener;
1413
import org.elasticsearch.action.ActionRunnable;
1514
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -51,6 +50,11 @@ public boolean isForceExecution() {
5150
protected void doRun() {
5251
listener.onResponse(response);
5352
}
53+
54+
@Override
55+
public String toString() {
56+
return ThreadedActionListener.this + "/onResponse";
57+
}
5458
});
5559
}
5660

@@ -63,14 +67,36 @@ public boolean isForceExecution() {
6367
}
6468

6569
@Override
66-
protected void doRun() throws Exception {
70+
protected void doRun() {
6771
delegate.onFailure(e);
6872
}
6973

74+
@Override
75+
public void onRejection(Exception e2) {
76+
e.addSuppressed(e2);
77+
try {
78+
delegate.onFailure(e);
79+
} catch (Exception e3) {
80+
e.addSuppressed(e3);
81+
onFailure(e);
82+
}
83+
}
84+
7085
@Override
7186
public void onFailure(Exception e) {
72-
logger.warn(() -> new ParameterizedMessage("failed to execute failure callback on [{}]", delegate), e);
87+
assert false : e;
88+
logger.error(() -> "failed to execute failure callback on [" + delegate + "]", e);
89+
}
90+
91+
@Override
92+
public String toString() {
93+
return ThreadedActionListener.this + "/onFailure";
7394
}
7495
});
7596
}
97+
98+
@Override
99+
public String toString() {
100+
return "ThreadedActionListener[" + executor + "/" + delegate + "]";
101+
}
76102
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.action.support;
10+
11+
import org.elasticsearch.ElasticsearchException;
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.common.settings.Settings;
14+
import org.elasticsearch.core.TimeValue;
15+
import org.elasticsearch.test.ESTestCase;
16+
import org.elasticsearch.threadpool.FixedExecutorBuilder;
17+
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
18+
import org.elasticsearch.threadpool.TestThreadPool;
19+
20+
import java.util.List;
21+
import java.util.Set;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicBoolean;
25+
26+
public class ThreadedActionListenerTests extends ESTestCase {
27+
28+
public void testRejectionHandling() throws InterruptedException {
29+
final var listenerCount = between(1, 1000);
30+
final var countdownLatch = new CountDownLatch(listenerCount);
31+
final var threadPool = new TestThreadPool(
32+
"test",
33+
Settings.EMPTY,
34+
new FixedExecutorBuilder(Settings.EMPTY, "fixed-bounded-queue", between(1, 3), 10, "fbq", randomBoolean()),
35+
new FixedExecutorBuilder(Settings.EMPTY, "fixed-unbounded-queue", between(1, 3), -1, "fnq", randomBoolean()),
36+
new ScalingExecutorBuilder("scaling-drop-if-shutdown", between(1, 3), between(3, 5), TimeValue.timeValueSeconds(1), false),
37+
new ScalingExecutorBuilder("scaling-reject-if-shutdown", between(1, 3), between(3, 5), TimeValue.timeValueSeconds(1), true)
38+
);
39+
final var closeFlag = new AtomicBoolean();
40+
try {
41+
final var pools = randomNonEmptySubsetOf(
42+
List.of("fixed-bounded-queue", "fixed-unbounded-queue", "scaling-drop-if-shutdown", "scaling-reject-if-shutdown")
43+
);
44+
final var shutdownUnsafePools = Set.of("fixed-bounded-queue", "scaling-drop-if-shutdown");
45+
46+
threadPool.generic().execute(() -> {
47+
for (int i = 0; i < listenerCount; i++) {
48+
final var pool = randomFrom(pools);
49+
final var listener = new ThreadedActionListener<Void>(
50+
logger,
51+
threadPool,
52+
pool,
53+
ActionListener.wrap(countdownLatch::countDown),
54+
(pool.equals("fixed-bounded-queue") || pool.startsWith("scaling")) && rarely()
55+
);
56+
synchronized (closeFlag) {
57+
if (closeFlag.get() && shutdownUnsafePools.contains(pool)) {
58+
// closing, so tasks submitted to this pool may just be dropped
59+
countdownLatch.countDown();
60+
} else if (randomBoolean()) {
61+
listener.onResponse(null);
62+
} else {
63+
listener.onFailure(new ElasticsearchException("simulated"));
64+
}
65+
}
66+
Thread.yield();
67+
}
68+
});
69+
} finally {
70+
synchronized (closeFlag) {
71+
assertTrue(closeFlag.compareAndSet(false, true));
72+
threadPool.shutdown();
73+
}
74+
assertTrue(threadPool.awaitTermination(10, TimeUnit.SECONDS));
75+
}
76+
assertTrue(countdownLatch.await(10, TimeUnit.SECONDS));
77+
}
78+
79+
}

test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1208,6 +1208,13 @@ public static <T> List<T> randomSubsetOf(Collection<T> collection) {
12081208
return randomSubsetOf(randomInt(collection.size()), collection);
12091209
}
12101210

1211+
public static <T> List<T> randomNonEmptySubsetOf(Collection<T> collection) {
1212+
if (collection.isEmpty()) {
1213+
throw new IllegalArgumentException("Can't pick non-empty subset of an empty collection");
1214+
}
1215+
return randomSubsetOf(randomIntBetween(1, collection.size()), collection);
1216+
}
1217+
12111218
/**
12121219
* Returns size random values
12131220
*/

0 commit comments

Comments
 (0)