Skip to content

Commit f162d34

Browse files
mhl-bywangd
andauthored
Close channel on stream handler exception (elastic#115505) (elastic#115879)
In case a stream handler throws uncaught exception, we should close the channel and release associated resources to avoid the channel entering a limbo state. This PR does that. Resolves: ES-9537 Co-authored-by: Yang Wang <[email protected]>
1 parent 904b926 commit f162d34

File tree

3 files changed

+45
-1
lines changed

3 files changed

+45
-1
lines changed

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import org.elasticsearch.test.ESIntegTestCase;
7171
import org.elasticsearch.test.MockLog;
7272
import org.elasticsearch.test.junit.annotations.TestLogging;
73+
import org.elasticsearch.transport.Transports;
7374
import org.elasticsearch.transport.netty4.Netty4Utils;
7475

7576
import java.util.Collection;
@@ -211,6 +212,29 @@ public void testServerCloseConnectionMidStream() throws Exception {
211212
}
212213
}
213214

215+
public void testServerExceptionMidStream() throws Exception {
216+
try (var ctx = setupClientCtx()) {
217+
var opaqueId = opaqueId(0);
218+
219+
// write half of http request
220+
ctx.clientChannel.write(httpRequest(opaqueId, 2 * 1024));
221+
ctx.clientChannel.writeAndFlush(randomContent(1024, false));
222+
223+
// await stream handler is ready and request full content
224+
var handler = ctx.awaitRestChannelAccepted(opaqueId);
225+
assertBusy(() -> assertNotNull(handler.stream.buf()));
226+
assertFalse(handler.streamClosed);
227+
228+
handler.shouldThrowInsideHandleChunk = true;
229+
handler.stream.next();
230+
231+
assertBusy(() -> {
232+
assertNull(handler.stream.buf());
233+
assertTrue(handler.streamClosed);
234+
});
235+
}
236+
}
237+
214238
// ensure that client's socket buffers data when server is not consuming data
215239
public void testClientBackpressure() throws Exception {
216240
try (var ctx = setupClientCtx()) {
@@ -594,6 +618,7 @@ static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkCon
594618
RestChannel channel;
595619
boolean recvLast = false;
596620
volatile boolean streamClosed = false;
621+
volatile boolean shouldThrowInsideHandleChunk = false;
597622

598623
ServerRequestHandler(String opaqueId, Netty4HttpRequestBodyStream stream) {
599624
this.opaqueId = opaqueId;
@@ -602,6 +627,12 @@ static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkCon
602627

603628
@Override
604629
public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) {
630+
Transports.assertTransportThread();
631+
if (shouldThrowInsideHandleChunk) {
632+
// Must close the chunk. This is the contract of this method.
633+
chunk.close();
634+
throw new RuntimeException("simulated exception inside handleChunk");
635+
}
605636
recvChunks.add(new Chunk(chunk, isLast));
606637
}
607638

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,11 @@ public void next() {
7171
if (buf == null) {
7272
channel.read();
7373
} else {
74-
send();
74+
try {
75+
send();
76+
} catch (Exception e) {
77+
channel.pipeline().fireExceptionCaught(e);
78+
}
7579
}
7680
});
7781
}

server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,15 @@ default void close() {}
217217
}
218218

219219
public interface RequestBodyChunkConsumer extends RestChannelConsumer {
220+
221+
/**
222+
* Handle one chunk of the request body. The handler <b>must</b> close the chunk once it is no longer
223+
* needed to avoid leaking.
224+
*
225+
* @param channel The rest channel associated to the request
226+
* @param chunk The chunk of request body that is ready for processing
227+
* @param isLast Whether the chunk is the last one of the request
228+
*/
220229
void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast);
221230

222231
/**

0 commit comments

Comments
 (0)