Skip to content

Commit c27bc08

Browse files
authored
Handle InternalSendException inline for non-forking handlers (#114375)
When TransportService fails to send a transport action, it can complete the listener's `onFailure` with the `generic` executor. If the listener is a `PlainActionFuture` and also waits to be completed with a `generic` thread, it will trip the `assertCompleteAllowed` assertion. https://github.com/elastic/elasticsearch/blob/fb482f863d5430702b19bd3dd23e9d8652f12ddd/server/src/main/java/org/elasticsearch/transport/TransportService.java#L1062-L1064 With this PR, we no longer fork to the generic thread pool and instead just handle the exeption inline with the current thread. The expectation is that the downstream handler should take care potential stack overflow issues. This is similar to what is done in #109236
1 parent 27ac1b4 commit c27bc08

File tree

3 files changed

+42
-4
lines changed

3 files changed

+42
-4
lines changed

docs/changelog/114375.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 114375
2+
summary: Handle `InternalSendException` inline for non-forking handlers
3+
area: Distributed
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1059,8 +1059,9 @@ private Executor getInternalSendExceptionExecutor(Executor handlerExecutor) {
10591059
if (lifecycle.stoppedOrClosed()) {
10601060
// too late to try and dispatch anywhere else, let's just use the calling thread
10611061
return EsExecutors.DIRECT_EXECUTOR_SERVICE;
1062-
} else if (handlerExecutor == EsExecutors.DIRECT_EXECUTOR_SERVICE) {
1063-
// if the handler is non-forking then dispatch to GENERIC to avoid a possible stack overflow
1062+
} else if (handlerExecutor == EsExecutors.DIRECT_EXECUTOR_SERVICE && enableStackOverflowAvoidance) {
1063+
// If the handler is non-forking and stack overflow protection is enabled then dispatch to GENERIC
1064+
// Otherwise we let the handler deal with any potential stack overflow (this is the default)
10641065
return threadPool.generic();
10651066
} else {
10661067
return handlerExecutor;

server/src/test/java/org/elasticsearch/transport/TransportServiceLifecycleTests.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,13 @@ public void testInternalSendExceptionForksToHandlerExecutor() {
149149
}
150150
}
151151

152-
public void testInternalSendExceptionForksToGenericIfHandlerDoesNotFork() {
153-
try (var nodeA = new TestNode("node-A")) {
152+
public void testInternalSendExceptionForksToGenericIfHandlerDoesNotForkAndStackOverflowProtectionEnabled() {
153+
try (
154+
var nodeA = new TestNode(
155+
"node-A",
156+
Settings.builder().put(TransportService.ENABLE_STACK_OVERFLOW_AVOIDANCE.getKey(), true).build()
157+
)
158+
) {
154159
final var future = new PlainActionFuture<TransportResponse.Empty>();
155160
nodeA.transportService.sendRequest(
156161
nodeA.getThrowingConnection(),
@@ -165,6 +170,33 @@ public void testInternalSendExceptionForksToGenericIfHandlerDoesNotFork() {
165170

166171
assertEquals("simulated exception in sendRequest", getSendRequestException(future, IOException.class).getMessage());
167172
}
173+
assertWarnings(
174+
"[transport.enable_stack_protection] setting was deprecated in Elasticsearch and will be removed in a future release."
175+
);
176+
}
177+
178+
public void testInternalSendExceptionWithNonForkingResponseHandlerCompletesListenerInline() {
179+
try (var nodeA = new TestNode("node-A")) {
180+
final Thread callingThread = Thread.currentThread();
181+
assertEquals(
182+
"simulated exception in sendRequest",
183+
safeAwaitAndUnwrapFailure(
184+
IOException.class,
185+
TransportResponse.Empty.class,
186+
l -> nodeA.transportService.sendRequest(
187+
nodeA.getThrowingConnection(),
188+
TestNode.randomActionName(random()),
189+
new EmptyRequest(),
190+
TransportRequestOptions.EMPTY,
191+
new ActionListenerResponseHandler<>(
192+
ActionListener.runBefore(l, () -> assertSame(callingThread, Thread.currentThread())),
193+
unusedReader(),
194+
EsExecutors.DIRECT_EXECUTOR_SERVICE
195+
)
196+
)
197+
).getMessage()
198+
);
199+
}
168200
}
169201

170202
public void testInternalSendExceptionForcesExecutionOnHandlerExecutor() {

0 commit comments

Comments
 (0)