Skip to content

Commit edc917d

Browse files
authored
Close channel on stream handler exception (#115505)
In case a stream handler throws uncaught exception, we should close the channel and release associated resources to avoid the channel entering a limbo state. This PR does that. Resolves: ES-9537
1 parent bb6ec6e commit edc917d

File tree

3 files changed

+45
-1
lines changed

3 files changed

+45
-1
lines changed

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import org.elasticsearch.test.ESIntegTestCase;
7171
import org.elasticsearch.test.MockLog;
7272
import org.elasticsearch.test.junit.annotations.TestLogging;
73+
import org.elasticsearch.transport.Transports;
7374
import org.elasticsearch.transport.netty4.Netty4Utils;
7475

7576
import java.util.Collection;
@@ -215,6 +216,29 @@ public void testServerCloseConnectionMidStream() throws Exception {
215216
}
216217
}
217218

219+
public void testServerExceptionMidStream() throws Exception {
220+
try (var ctx = setupClientCtx()) {
221+
var opaqueId = opaqueId(0);
222+
223+
// write half of http request
224+
ctx.clientChannel.write(httpRequest(opaqueId, 2 * 1024));
225+
ctx.clientChannel.writeAndFlush(randomContent(1024, false));
226+
227+
// await stream handler is ready and request full content
228+
var handler = ctx.awaitRestChannelAccepted(opaqueId);
229+
assertBusy(() -> assertNotNull(handler.stream.buf()));
230+
assertFalse(handler.streamClosed);
231+
232+
handler.shouldThrowInsideHandleChunk = true;
233+
handler.stream.next();
234+
235+
assertBusy(() -> {
236+
assertNull(handler.stream.buf());
237+
assertTrue(handler.streamClosed);
238+
});
239+
}
240+
}
241+
218242
// ensure that client's socket buffers data when server is not consuming data
219243
public void testClientBackpressure() throws Exception {
220244
try (var ctx = setupClientCtx()) {
@@ -598,6 +622,7 @@ static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkCon
598622
RestChannel channel;
599623
boolean recvLast = false;
600624
volatile boolean streamClosed = false;
625+
volatile boolean shouldThrowInsideHandleChunk = false;
601626

602627
ServerRequestHandler(String opaqueId, Netty4HttpRequestBodyStream stream) {
603628
this.opaqueId = opaqueId;
@@ -606,6 +631,12 @@ static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkCon
606631

607632
@Override
608633
public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) {
634+
Transports.assertTransportThread();
635+
if (shouldThrowInsideHandleChunk) {
636+
// Must close the chunk. This is the contract of this method.
637+
chunk.close();
638+
throw new RuntimeException("simulated exception inside handleChunk");
639+
}
609640
recvChunks.add(new Chunk(chunk, isLast));
610641
}
611642

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,11 @@ public void next() {
7171
if (buf == null) {
7272
channel.read();
7373
} else {
74-
send();
74+
try {
75+
send();
76+
} catch (Exception e) {
77+
channel.pipeline().fireExceptionCaught(e);
78+
}
7579
}
7680
});
7781
}

server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,15 @@ default void close() {}
210210
}
211211

212212
public interface RequestBodyChunkConsumer extends RestChannelConsumer {
213+
214+
/**
215+
* Handle one chunk of the request body. The handler <b>must</b> close the chunk once it is no longer
216+
* needed to avoid leaking.
217+
*
218+
* @param channel The rest channel associated to the request
219+
* @param chunk The chunk of request body that is ready for processing
220+
* @param isLast Whether the chunk is the last one of the request
221+
*/
213222
void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast);
214223

215224
/**

0 commit comments

Comments
 (0)