Skip to content

Commit ee2d35e

Browse files
authored
Dispatch TransportSingleShardAction response handling (#113630) (#113930)
We shouldn't be handling these responses on the transport worker. Closes #110408
1 parent 8eca9f9 commit ee2d35e

File tree

3 files changed

+114
-72
lines changed

3 files changed

+114
-72
lines changed

modules/kibana/src/internalClusterTest/java/org/elasticsearch/kibana/KibanaThreadPoolIT.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.test.ESIntegTestCase;
2424
import org.elasticsearch.test.junit.annotations.TestLogging;
2525
import org.elasticsearch.threadpool.ThreadPool;
26+
import org.elasticsearch.threadpool.ThreadPoolStats;
2627

2728
import java.util.Arrays;
2829
import java.util.Collection;
@@ -121,8 +122,32 @@ private void assertThreadPoolsBlocked() {
121122
() -> client().prepareIndex(USER_INDEX).setSource(Map.of("foo", "bar")).get()
122123
);
123124
assertThat(e1.getMessage(), startsWith("rejected execution of TimedRunnable"));
124-
var e2 = expectThrows(EsRejectedExecutionException.class, () -> client().prepareGet(USER_INDEX, "id").get());
125-
assertThat(e2.getMessage(), startsWith("rejected execution of ActionRunnable"));
125+
126+
final var getFuture = client().prepareGet(USER_INDEX, "id").execute();
127+
// response handling is force-executed on GET pool, so we must
128+
// (a) wait for that task to be enqueued, expanding the queue beyond its configured limit, and
129+
// (b) check for the exception in the background
130+
131+
try {
132+
assertTrue(waitUntil(() -> {
133+
if (getFuture.isDone()) {
134+
return true;
135+
}
136+
for (ThreadPool threadPool : internalCluster().getInstances(ThreadPool.class)) {
137+
for (ThreadPoolStats.Stats stats : threadPool.stats().stats()) {
138+
if (stats.name().equals(ThreadPool.Names.GET) && stats.queue() > 1) {
139+
return true;
140+
}
141+
}
142+
}
143+
return false;
144+
}));
145+
} catch (Exception e) {
146+
fail(e);
147+
}
148+
149+
new Thread(() -> expectThrows(EsRejectedExecutionException.class, () -> getFuture.actionGet(SAFE_AWAIT_TIMEOUT))).start();
150+
126151
// intentionally commented out this test until https://github.com/elastic/elasticsearch/issues/97916 is fixed
127152
// var e3 = expectThrows(
128153
// SearchPhaseExecutionException.class,

0 commit comments

Comments
 (0)