Skip to content

Commit 6addf18

Browse files
authored
[8.12] Fix leaked HTTP response sent after close (#105293) (#105305)
* Fix leaked HTTP response sent after close (#105293) Today a `HttpResponse` is always released via a `ChannelPromise` which means the release happens on a network thread. However, it's possible we try and send a `HttpResponse` after the node has got far enough through shutdown that it doesn't have any running network threads left, which means the response just leaks. This is no big deal in production, it becomes irrelevant when the process exits, but in tests we start and stop many nodes within the same process so mustn't leak anything. At this point in shutdown, all HTTP channels are now closed, so it's sufficient to check whether the channel is open first, and to fail the listener on the calling thread if not. That's what this commit does. Closes #104651 * Fix backport
1 parent c83df53 commit 6addf18

File tree

3 files changed

+75
-1
lines changed

3 files changed

+75
-1
lines changed

docs/changelog/105293.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 105293
2+
summary: Fix leaked HTTP response sent after close
3+
area: Network
4+
type: bug
5+
issues:
6+
- 104651

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.net.InetSocketAddress;
2020
import java.net.SocketAddress;
21+
import java.nio.channels.ClosedChannelException;
2122

2223
public class Netty4HttpChannel implements HttpChannel {
2324

@@ -31,7 +32,13 @@ public class Netty4HttpChannel implements HttpChannel {
3132

3233
@Override
3334
public void sendResponse(HttpResponse response, ActionListener<Void> listener) {
34-
channel.writeAndFlush(response, Netty4TcpChannel.addPromise(listener, channel));
35+
if (isOpen()) {
36+
channel.writeAndFlush(response, Netty4TcpChannel.addPromise(listener, channel));
37+
} else {
38+
// No need to dispatch to the event loop just to fail this listener; moreover the channel might be closed because the whole
39+
// node is shutting down, in which case the event loop might not exist any more so the channel promise cannot be completed.
40+
listener.onFailure(new ClosedChannelException());
41+
}
3542
}
3643

3744
@Override

modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,16 @@
3939
import io.netty.handler.codec.http.HttpUtil;
4040
import io.netty.handler.codec.http.HttpVersion;
4141

42+
import org.apache.http.HttpHost;
4243
import org.apache.lucene.util.SetOnce;
4344
import org.elasticsearch.ElasticsearchException;
4445
import org.elasticsearch.ElasticsearchSecurityException;
4546
import org.elasticsearch.ElasticsearchWrapperException;
47+
import org.elasticsearch.action.ActionListener;
48+
import org.elasticsearch.action.support.ActionTestUtils;
49+
import org.elasticsearch.action.support.SubscribableListener;
50+
import org.elasticsearch.client.Request;
51+
import org.elasticsearch.client.RestClient;
4652
import org.elasticsearch.common.bytes.BytesArray;
4753
import org.elasticsearch.common.collect.Iterators;
4854
import org.elasticsearch.common.network.NetworkAddress;
@@ -956,6 +962,61 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
956962
}
957963
}
958964

965+
public void testRespondAfterClose() throws Exception {
966+
final String url = "/thing";
967+
final CountDownLatch responseReleasedLatch = new CountDownLatch(1);
968+
final SubscribableListener<Void> transportClosedFuture = new SubscribableListener<>();
969+
final CountDownLatch handlingRequestLatch = new CountDownLatch(1);
970+
971+
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
972+
@Override
973+
public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
974+
assertEquals(request.uri(), url);
975+
final var response = RestResponse.chunked(
976+
OK,
977+
ChunkedRestResponseBody.fromTextChunks(
978+
RestResponse.TEXT_CONTENT_TYPE,
979+
Collections.emptyIterator(),
980+
responseReleasedLatch::countDown
981+
)
982+
);
983+
transportClosedFuture.addListener(ActionListener.running(() -> channel.sendResponse(response)));
984+
handlingRequestLatch.countDown();
985+
}
986+
987+
@Override
988+
public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) {
989+
fail(cause, "--> Unexpected bad request [%s]", FakeRestRequest.requestToString(channel.request()));
990+
}
991+
};
992+
993+
try (
994+
Netty4HttpServerTransport transport = new Netty4HttpServerTransport(
995+
Settings.EMPTY,
996+
networkService,
997+
threadPool,
998+
xContentRegistry(),
999+
dispatcher,
1000+
clusterSettings,
1001+
new SharedGroupFactory(Settings.EMPTY),
1002+
Tracer.NOOP,
1003+
TLSConfig.noTLS(),
1004+
null,
1005+
randomFrom((httpPreRequest, channel, listener) -> listener.onResponse(null), null)
1006+
)
1007+
) {
1008+
transport.start();
1009+
final var address = randomFrom(transport.boundAddress().boundAddresses()).address();
1010+
try (var client = RestClient.builder(new HttpHost(address.getAddress(), address.getPort())).build()) {
1011+
client.performRequestAsync(new Request("GET", url), ActionTestUtils.wrapAsRestResponseListener(ActionListener.noop()));
1012+
safeAwait(handlingRequestLatch);
1013+
transport.close();
1014+
transportClosedFuture.onResponse(null);
1015+
safeAwait(responseReleasedLatch);
1016+
}
1017+
}
1018+
}
1019+
9591020
private Netty4HttpServerTransport getTestNetty4HttpServerTransport(
9601021
HttpServerTransport.Dispatcher dispatcher,
9611022
HttpValidator httpValidator,

0 commit comments

Comments
 (0)