Skip to content

Commit 7d97f77

Browse files
fdesureta
andauthored
Keep track and release Reactor Netty 4 Transport accepted Http Channels during the Node shutdown (#20106)
* Close RestClient after MSearch API calls to reliably close HttpChannels Signed-off-by: Sergei Ustimenko <[email protected]> * Add the CHANGELOG entry Signed-off-by: Sergei Ustimenko <[email protected]> * Keep track of Reactor Netty 4 Transport accepted Http Channels Signed-off-by: Sergei Ustimenko <[email protected]> * Update CHANGELOG Signed-off-by: Sergei Ustimenko <[email protected]> * Add missing visibility modifier to RestCancellableNodeClient#getNumTasks Signed-off-by: Sergei Ustimenko <[email protected]> * Extract channel wrapper into a separate class and extend the get() for proper context access Signed-off-by: Sergei Ustimenko <[email protected]> * Add missing javadocs Signed-off-by: Sergei Ustimenko <[email protected]> * Hook into AbstractHttpServerTransport directly from Reactor Request Consumers Signed-off-by: Sergei Ustimenko <[email protected]> * Make sure the CloseContext is always called even if the channel is disposed already Signed-off-by: Andriy Redko <[email protected]> * Remove extra code Signed-off-by: Sergei Ustimenko <[email protected]> * Add more coverage for streaming and non-streaming netty connections Signed-off-by: Sergei Ustimenko <[email protected]> * Revert visibility modifier of the RestCancellableNodeClient#getNumTasks Signed-off-by: Sergei Ustimenko <[email protected]> --------- Signed-off-by: Sergei Ustimenko <[email protected]> Signed-off-by: Andriy Redko <[email protected]> Co-authored-by: Andriy Redko <[email protected]>
1 parent da18cc6 commit 7d97f77

File tree

12 files changed

+373
-52
lines changed

12 files changed

+373
-52
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
9292
- Fix ClassCastException in FlightClientChannel for requests larger than 16KB ([#20010](https://github.com/opensearch-project/OpenSearch/pull/20010))
9393
- Fix GRPC Bulk ([#19937](https://github.com/opensearch-project/OpenSearch/pull/19937))
9494
- Fix node bootstrap error when enable stream transport and remote cluster state ([#19948](https://github.com/opensearch-project/OpenSearch/pull/19948))
95+
- Keep track and release Reactor Netty 4 Transport accepted Http Channels during the Node shutdown ([#20106](https://github.com/opensearch-project/OpenSearch/pull/20106))
9596
- Fix deletion failure/error of unused index template; case when an index template matches a data stream but has a lower priority. ([#20102](https://github.com/opensearch-project/OpenSearch/pull/20102))
9697
- Fix toBuilder method in EngineConfig to include mergedSegmentTransferTracker([#20105](https://github.com/opensearch-project/OpenSearch/pull/20105))
9798
- Fixed handling of property index in BulkRequest during deserialization ([#20132](https://github.com/opensearch-project/OpenSearch/pull/20132))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.transport.netty4;
10+
11+
import org.opensearch.OpenSearchNetty4IntegTestCase;
12+
import org.opensearch.client.Request;
13+
import org.opensearch.client.Response;
14+
import org.opensearch.core.common.Strings;
15+
import org.opensearch.core.xcontent.MediaTypeRegistry;
16+
import org.opensearch.index.query.MatchAllQueryBuilder;
17+
import org.opensearch.rest.action.RestCancellableNodeClient;
18+
import org.opensearch.search.builder.SearchSourceBuilder;
19+
import org.opensearch.test.OpenSearchIntegTestCase;
20+
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
21+
import org.opensearch.threadpool.TestThreadPool;
22+
import org.opensearch.threadpool.ThreadPool;
23+
import org.junit.After;
24+
import org.junit.Before;
25+
26+
import java.io.IOException;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.TimeUnit;
29+
30+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
31+
import static org.hamcrest.Matchers.anyOf;
32+
import static org.hamcrest.Matchers.equalTo;
33+
34+
@ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
35+
public class Netty4HttpChannelsReleaseIntegTests extends OpenSearchNetty4IntegTestCase {
36+
37+
@Override
38+
protected boolean addMockHttpTransport() {
39+
return false; // enable http
40+
}
41+
42+
private ThreadPool threadPool;
43+
44+
@Before
45+
public void createThreadPool() {
46+
threadPool = new TestThreadPool(getClass().getName());
47+
}
48+
49+
@After
50+
public void stopThreadPool() {
51+
ThreadPool.terminate(threadPool, 5, TimeUnit.SECONDS);
52+
}
53+
54+
public void testAcceptedChannelsGetCleanedUpOnTheNodeShutdown() throws InterruptedException {
55+
String testIndex = "test_idx";
56+
assertAcked(client().admin().indices().prepareCreate(testIndex));
57+
58+
int initialHttpChannels = RestCancellableNodeClient.getNumChannels();
59+
int numChannels = randomIntBetween(50, 100);
60+
CountDownLatch countDownLatch = new CountDownLatch(numChannels);
61+
for (int i = 0; i < numChannels; i++) {
62+
threadPool.generic().execute(() -> {
63+
executeRequest(testIndex);
64+
countDownLatch.countDown();
65+
});
66+
}
67+
countDownLatch.await();
68+
69+
// no channels get closed in this test, hence we expect as many channels as we created in the map
70+
assertEquals("All channels remain open", initialHttpChannels + numChannels, RestCancellableNodeClient.getNumChannels());
71+
}
72+
73+
/**
74+
* Execute a Search request against the given index. The Search requests are tracked
75+
* by the RestCancellableNodeClient to verify that channels are released properly.
76+
*
77+
* @param index the index to search against
78+
*/
79+
private static void executeRequest(String index) {
80+
try {
81+
Request request = new Request("GET", "/" + index + "/_search");
82+
SearchSourceBuilder searchSource = new SearchSourceBuilder().query(new MatchAllQueryBuilder());
83+
request.setJsonEntity(Strings.toString(MediaTypeRegistry.JSON, searchSource));
84+
Response response = getRestClient().performRequest(request);
85+
assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201)));
86+
} catch (IOException e) {
87+
throw new IllegalStateException("Failed to execute the request", e);
88+
}
89+
}
90+
91+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.http.reactor.netty4;
10+
11+
import org.opensearch.OpenSearchReactorNetty4IntegTestCase;
12+
import org.opensearch.client.Request;
13+
import org.opensearch.client.Response;
14+
import org.opensearch.core.common.Strings;
15+
import org.opensearch.core.xcontent.MediaTypeRegistry;
16+
import org.opensearch.index.query.MatchAllQueryBuilder;
17+
import org.opensearch.rest.action.RestCancellableNodeClient;
18+
import org.opensearch.search.builder.SearchSourceBuilder;
19+
import org.opensearch.test.OpenSearchIntegTestCase;
20+
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
21+
import org.opensearch.threadpool.TestThreadPool;
22+
import org.opensearch.threadpool.ThreadPool;
23+
import org.junit.After;
24+
import org.junit.Before;
25+
26+
import java.io.IOException;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.TimeUnit;
29+
30+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
31+
import static org.hamcrest.Matchers.anyOf;
32+
import static org.hamcrest.Matchers.equalTo;
33+
34+
@ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
35+
public class ReactorNetty4HttpChannelsReleaseIntegTests extends OpenSearchReactorNetty4IntegTestCase {
36+
37+
@Override
38+
protected boolean addMockHttpTransport() {
39+
return false; // enable http
40+
}
41+
42+
private ThreadPool threadPool;
43+
44+
@Before
45+
public void createThreadPool() {
46+
threadPool = new TestThreadPool(getClass().getName());
47+
}
48+
49+
@After
50+
public void stopThreadPool() {
51+
ThreadPool.terminate(threadPool, 5, TimeUnit.SECONDS);
52+
}
53+
54+
public void testAcceptedChannelsGetCleanedUpOnTheNodeShutdown() throws InterruptedException {
55+
String testIndex = "test_idx";
56+
assertAcked(client().admin().indices().prepareCreate(testIndex));
57+
58+
int initialHttpChannels = RestCancellableNodeClient.getNumChannels();
59+
int numChannels = randomIntBetween(50, 100);
60+
CountDownLatch countDownLatch = new CountDownLatch(numChannels);
61+
for (int i = 0; i < numChannels; i++) {
62+
threadPool.generic().execute(() -> {
63+
executeRequest(testIndex);
64+
countDownLatch.countDown();
65+
});
66+
}
67+
countDownLatch.await();
68+
69+
// no channels get closed in this test, hence we expect as many channels as we created in the map
70+
assertEquals("All channels remain open", initialHttpChannels + numChannels, RestCancellableNodeClient.getNumChannels());
71+
}
72+
73+
/**
74+
* Execute a Search request against the given index. The Search requests are tracked
75+
* by the RestCancellableNodeClient to verify that channels are released properly.
76+
*
77+
* @param index the index to search against
78+
*/
79+
private static void executeRequest(String index) {
80+
try {
81+
Request request = new Request("GET", "/" + index + "/_search");
82+
SearchSourceBuilder searchSource = new SearchSourceBuilder().query(new MatchAllQueryBuilder());
83+
request.setJsonEntity(Strings.toString(MediaTypeRegistry.JSON, searchSource));
84+
Response response = getRestClient().performRequest(request);
85+
assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201)));
86+
} catch (IOException e) {
87+
throw new IllegalStateException("Failed to execute the request", e);
88+
}
89+
}
90+
91+
}

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerChannel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,6 @@ public void close() {
4848

4949
@Override
5050
public String toString() {
51-
return "ReactorNetty4HttpChannel{localAddress=" + getLocalAddress() + "}";
51+
return "ReactorNetty4HttpServerChannel{localAddress=" + getLocalAddress() + "}";
5252
}
5353
}

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ public class ReactorNetty4HttpServerTransport extends AbstractHttpServerTranspor
157157

158158
/**
159159
* Creates new HTTP transport implementations based on Reactor Netty (see please {@link HttpServer}).
160+
*
160161
* @param settings settings
161162
* @param networkService network service
162163
* @param bigArrays big array allocator
@@ -194,6 +195,7 @@ public ReactorNetty4HttpServerTransport(
194195

195196
/**
196197
* Creates new HTTP transport implementations based on Reactor Netty (see please {@link HttpServer}).
198+
*
197199
* @param settings settings
198200
* @param networkService network service
199201
* @param bigArrays big array allocator
@@ -232,6 +234,7 @@ public ReactorNetty4HttpServerTransport(
232234

233235
/**
234236
* Binds the transport engine to the socket address
237+
*
235238
* @param socketAddress socket address to bind to
236239
*/
237240
@Override
@@ -345,8 +348,20 @@ private HttpServer configure(final HttpServer server) throws Exception {
345348
return configured;
346349
}
347350

351+
/**
352+
* An override to be able to keep track of accepted channels by the
353+
* {@link ReactorNetty4NonStreamingRequestConsumer} and {@link ReactorNetty4StreamingRequestConsumer}
354+
*
355+
* @param httpChannel the accepted channel
356+
*/
357+
@Override
358+
public void serverAcceptedChannel(HttpChannel httpChannel) {
359+
super.serverAcceptedChannel(httpChannel);
360+
}
361+
348362
/**
349363
* Handles incoming Reactor Netty request
364+
*
350365
* @param request request instance
351366
* @param response response instances
352367
* @return response publisher
@@ -367,6 +382,7 @@ protected Publisher<Void> incomingRequest(HttpServerRequest request, HttpServerR
367382
);
368383
if (dispatchHandlerOpt.map(RestHandler::supportsStreaming).orElse(false)) {
369384
final ReactorNetty4StreamingRequestConsumer<HttpContent> consumer = new ReactorNetty4StreamingRequestConsumer<>(
385+
this,
370386
request,
371387
response
372388
);
@@ -457,4 +473,5 @@ public void onException(HttpChannel channel, Exception cause) {
457473
super.onException(channel, cause);
458474
}
459475
}
476+
460477
}

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingHttpChannel.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,13 @@ public boolean isOpen() {
4646

4747
@Override
4848
public void close() {
49-
request.withConnection(connection -> connection.channel().close());
49+
request.withConnection(connection -> {
50+
if (closeContext.isDone() == false) {
51+
Netty4Utils.addListener(connection.channel().close(), closeContext);
52+
} else {
53+
connection.channel().close();
54+
}
55+
});
5056
}
5157

5258
@Override

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingRequestConsumer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
package org.opensearch.http.reactor.netty4;
1010

11-
import org.opensearch.http.AbstractHttpServerTransport;
1211
import org.opensearch.http.HttpRequest;
1312

1413
import java.util.concurrent.atomic.AtomicBoolean;
@@ -30,12 +29,12 @@ class ReactorNetty4NonStreamingRequestConsumer<T extends HttpContent> implements
3029
private final HttpServerResponse response;
3130
private final CompositeByteBuf content;
3231
private final Publisher<HttpContent> publisher;
33-
private final AbstractHttpServerTransport transport;
32+
private final ReactorNetty4HttpServerTransport transport;
3433
private final AtomicBoolean disposed = new AtomicBoolean(false);
3534
private volatile FluxSink<HttpContent> emitter;
3635

3736
ReactorNetty4NonStreamingRequestConsumer(
38-
AbstractHttpServerTransport transport,
37+
ReactorNetty4HttpServerTransport transport,
3938
HttpServerRequest request,
4039
HttpServerResponse response,
4140
int maxCompositeBufferComponents
@@ -73,6 +72,7 @@ void process(HttpContent in, FluxSink<HttpContent> emitter) {
7372
final HttpRequest r = createRequest(request, content);
7473

7574
try {
75+
transport.serverAcceptedChannel(channel);
7676
transport.incomingRequest(r, channel);
7777
} catch (Exception ex) {
7878
emitter.error(ex);

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,13 @@ public boolean isOpen() {
5656

5757
@Override
5858
public void close() {
59-
request.withConnection(connection -> connection.channel().close());
59+
request.withConnection(connection -> {
60+
if (closeContext.isDone() == false) {
61+
Netty4Utils.addListener(connection.channel().close(), closeContext);
62+
} else {
63+
connection.channel().close();
64+
}
65+
});
6066
}
6167

6268
@Override

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,14 @@ class ReactorNetty4StreamingRequestConsumer<T extends HttpContent> implements Co
2424
private final ReactorNetty4StreamingResponseProducer sender;
2525
private final StreamingHttpChannel httpChannel;
2626

27-
ReactorNetty4StreamingRequestConsumer(HttpServerRequest request, HttpServerResponse response) {
27+
ReactorNetty4StreamingRequestConsumer(
28+
ReactorNetty4HttpServerTransport transport,
29+
HttpServerRequest request,
30+
HttpServerResponse response
31+
) {
2832
this.sender = new ReactorNetty4StreamingResponseProducer();
2933
this.httpChannel = new ReactorNetty4StreamingHttpChannel(request, response, sender);
34+
transport.serverAcceptedChannel(httpChannel);
3035
}
3136

3237
@Override

0 commit comments

Comments
 (0)