Skip to content

Commit 32c4284

Browse files
authored
Merge branch 'main' into add_microsoft_graph_plugin
2 parents 2d0977a + 20c02f4 commit 32c4284

File tree

6 files changed

+257
-32
lines changed

6 files changed

+257
-32
lines changed

docs/changelog/128025.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 128025
2+
summary: "Set `connection: close` header on shutdown"
3+
area: Network
4+
type: enhancement
5+
issues:
6+
- 127984

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4PipeliningIT.java

Lines changed: 191 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,26 @@
99

1010
package org.elasticsearch.http.netty4;
1111

12+
import io.netty.bootstrap.Bootstrap;
13+
import io.netty.channel.ChannelHandlerContext;
14+
import io.netty.channel.ChannelInitializer;
15+
import io.netty.channel.ChannelOption;
16+
import io.netty.channel.EventLoopGroup;
17+
import io.netty.channel.SimpleChannelInboundHandler;
18+
import io.netty.channel.nio.NioEventLoopGroup;
19+
import io.netty.channel.socket.SocketChannel;
20+
import io.netty.handler.codec.http.DefaultFullHttpRequest;
21+
import io.netty.handler.codec.http.HttpClientCodec;
22+
import io.netty.handler.codec.http.HttpMethod;
23+
import io.netty.handler.codec.http.HttpResponse;
24+
import io.netty.handler.codec.http.HttpVersion;
1225
import io.netty.util.ReferenceCounted;
1326

1427
import org.apache.lucene.util.BytesRef;
1528
import org.elasticsearch.ESNetty4IntegTestCase;
29+
import org.elasticsearch.ExceptionsHelper;
1630
import org.elasticsearch.action.ActionListener;
31+
import org.elasticsearch.action.ActionResponse;
1732
import org.elasticsearch.action.support.CountDownActionListener;
1833
import org.elasticsearch.action.support.SubscribableListener;
1934
import org.elasticsearch.client.internal.node.NodeClient;
@@ -29,6 +44,8 @@
2944
import org.elasticsearch.common.settings.SettingsFilter;
3045
import org.elasticsearch.common.unit.ByteSizeUnit;
3146
import org.elasticsearch.common.util.CollectionUtils;
47+
import org.elasticsearch.core.Releasable;
48+
import org.elasticsearch.core.Releasables;
3249
import org.elasticsearch.core.Strings;
3350
import org.elasticsearch.features.NodeFeature;
3451
import org.elasticsearch.http.HttpServerTransport;
@@ -41,29 +58,42 @@
4158
import org.elasticsearch.rest.RestRequest;
4259
import org.elasticsearch.rest.RestResponse;
4360
import org.elasticsearch.rest.RestStatus;
61+
import org.elasticsearch.rest.action.EmptyResponseListener;
4462
import org.elasticsearch.rest.action.RestToXContentListener;
4563
import org.elasticsearch.test.ESIntegTestCase;
64+
import org.elasticsearch.transport.netty4.NettyAllocator;
4665
import org.elasticsearch.xcontent.ToXContentObject;
4766

4867
import java.io.IOException;
68+
import java.util.ArrayList;
4969
import java.util.Arrays;
5070
import java.util.Collection;
71+
import java.util.Collections;
5172
import java.util.List;
73+
import java.util.concurrent.CountDownLatch;
74+
import java.util.concurrent.TimeUnit;
75+
import java.util.concurrent.atomic.AtomicBoolean;
76+
import java.util.concurrent.atomic.AtomicInteger;
5277
import java.util.function.Predicate;
5378
import java.util.function.Supplier;
5479

5580
import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS;
5681
import static org.elasticsearch.rest.RestRequest.Method.GET;
82+
import static org.hamcrest.Matchers.equalTo;
5783
import static org.hamcrest.Matchers.hasSize;
5884
import static org.hamcrest.Matchers.is;
5985
import static org.hamcrest.Matchers.lessThanOrEqualTo;
86+
import static org.hamcrest.Matchers.oneOf;
6087

6188
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
6289
public class Netty4PipeliningIT extends ESNetty4IntegTestCase {
6390

6491
@Override
6592
protected Collection<Class<? extends Plugin>> nodePlugins() {
66-
return CollectionUtils.concatLists(List.of(CountDown3Plugin.class, ChunkAndFailPlugin.class), super.nodePlugins());
93+
return CollectionUtils.concatLists(
94+
List.of(CountDown3Plugin.class, ChunkAndFailPlugin.class, KeepPipeliningPlugin.class),
95+
super.nodePlugins()
96+
);
6797
}
6898

6999
private static final int MAX_PIPELINE_EVENTS = 10;
@@ -142,6 +172,115 @@ private void runPipeliningTest(int expectedResponseCount, String... routes) thro
142172
}
143173
}
144174

175+
public void testSetCloseConnectionHeaderWhenShuttingDown() throws IOException {
176+
177+
// This test works using KeepPipeliningPlugin to keep a HTTP connection from becoming idle with a sequence of requests while the
178+
// node shuts down and ensures that these requests start to receive responses with `Connection: close` and that the node does not
179+
// shut down until all requests have received a response.
180+
181+
final var victimNode = internalCluster().startNode();
182+
183+
final var releasables = new ArrayList<Releasable>(3);
184+
try {
185+
final var keepPipeliningRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, KeepPipeliningPlugin.ROUTE);
186+
releasables.add(keepPipeliningRequest::release);
187+
188+
final var enoughResponsesToCloseLatch = new CountDownLatch(between(1, 5));
189+
final var outstandingRequestsCounter = new AtomicInteger();
190+
final var nodeShuttingDown = new AtomicBoolean();
191+
final var stoppedPipelining = new AtomicBoolean();
192+
193+
final EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
194+
releasables.add(() -> eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).awaitUninterruptibly());
195+
final var clientBootstrap = new Bootstrap().channel(NettyAllocator.getChannelType())
196+
.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator())
197+
.group(eventLoopGroup)
198+
.handler(new ChannelInitializer<SocketChannel>() {
199+
@Override
200+
protected void initChannel(SocketChannel ch) {
201+
ch.pipeline().addLast(new HttpClientCodec());
202+
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpResponse>() {
203+
204+
private int closeHeadersToIgnore = between(0, 5);
205+
206+
private boolean ignoreCloseHeader() {
207+
if (closeHeadersToIgnore == 0) {
208+
return false;
209+
} else {
210+
closeHeadersToIgnore -= 1;
211+
return true;
212+
}
213+
}
214+
215+
@Override
216+
protected void channelRead0(ChannelHandlerContext ctx, HttpResponse msg) {
217+
enoughResponsesToCloseLatch.countDown();
218+
assertThat(
219+
outstandingRequestsCounter.decrementAndGet(),
220+
stoppedPipelining.get() ? oneOf(0, 1) : equalTo(1)
221+
);
222+
223+
if ("close".equals(msg.headers().get("connection")) && ignoreCloseHeader() == false) {
224+
assertTrue(nodeShuttingDown.get());
225+
// send one more request with `?respond_immediately` to stop the pipelining
226+
if (stoppedPipelining.compareAndSet(false, true)) {
227+
assertThat(outstandingRequestsCounter.incrementAndGet(), equalTo(2));
228+
ctx.writeAndFlush(
229+
new DefaultFullHttpRequest(
230+
HttpVersion.HTTP_1_1,
231+
HttpMethod.GET,
232+
KeepPipeliningPlugin.ROUTE + "?" + KeepPipeliningPlugin.RESPOND_IMMEDIATELY
233+
)
234+
);
235+
}
236+
} else {
237+
// still pipelining, send another request to trigger the next response
238+
assertThat(outstandingRequestsCounter.incrementAndGet(), equalTo(2));
239+
ctx.writeAndFlush(keepPipeliningRequest.retain());
240+
}
241+
}
242+
243+
@Override
244+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
245+
ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(cause));
246+
}
247+
});
248+
}
249+
});
250+
251+
final var httpServerTransport = internalCluster().getInstance(HttpServerTransport.class, victimNode);
252+
final var httpServerAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses()).address();
253+
254+
// Open a channel on which we will pipeline the requests to KeepPipeliningPlugin.ROUTE
255+
final var pipeliningChannel = clientBootstrap.connect(httpServerAddress).syncUninterruptibly().channel();
256+
releasables.add(() -> pipeliningChannel.close().syncUninterruptibly());
257+
258+
// Send two pipelined requests so that we start to receive responses
259+
assertTrue(outstandingRequestsCounter.compareAndSet(0, 2));
260+
pipeliningChannel.writeAndFlush(keepPipeliningRequest.retain());
261+
pipeliningChannel.writeAndFlush(keepPipeliningRequest.retain());
262+
263+
// wait until we've started to receive responses
264+
safeAwait(enoughResponsesToCloseLatch);
265+
266+
// Shut down the node
267+
assertTrue(nodeShuttingDown.compareAndSet(false, true));
268+
internalCluster().stopNode(victimNode);
269+
270+
// Wait for the pipelining channel to be closed, indicating that it stopped pipelining (because it received a response with
271+
// `Connection: close`) and allowed the node to shut down
272+
pipeliningChannel.closeFuture().syncUninterruptibly();
273+
274+
// The shutdown did not happen until all requests had had a response.
275+
assertTrue(stoppedPipelining.get());
276+
assertEquals(0, outstandingRequestsCounter.get());
277+
278+
} finally {
279+
Collections.reverse(releasables);
280+
Releasables.close(releasables);
281+
}
282+
}
283+
145284
private void assertOpaqueIdsInOrder(Collection<String> opaqueIds) {
146285
// check if opaque ids are monotonically increasing
147286
int i = 0;
@@ -203,7 +342,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
203342
}
204343

205344
/**
206-
* Adds an HTTP route that waits for 3 concurrent executions before returning any of them
345+
* Adds an HTTP route that starts to emit a chunked response and then fails before its completion.
207346
*/
208347
public static class ChunkAndFailPlugin extends Plugin implements ActionPlugin {
209348

@@ -285,4 +424,54 @@ public String getResponseContentTypeString() {
285424
});
286425
}
287426
}
427+
428+
/**
429+
* Adds an HTTP route that only responds when starting to process a second request, ensuring that there is always at least one in-flight
430+
* request in the pipeline which keeps a connection from becoming idle.
431+
*/
432+
public static class KeepPipeliningPlugin extends Plugin implements ActionPlugin {
433+
434+
static final String ROUTE = "/_test/keep_pipelining";
435+
static final String RESPOND_IMMEDIATELY = "respond_immediately";
436+
437+
@Override
438+
public Collection<RestHandler> getRestHandlers(
439+
Settings settings,
440+
NamedWriteableRegistry namedWriteableRegistry,
441+
RestController restController,
442+
ClusterSettings clusterSettings,
443+
IndexScopedSettings indexScopedSettings,
444+
SettingsFilter settingsFilter,
445+
IndexNameExpressionResolver indexNameExpressionResolver,
446+
Supplier<DiscoveryNodes> nodesInCluster,
447+
Predicate<NodeFeature> clusterSupportsFeature
448+
) {
449+
return List.of(new BaseRestHandler() {
450+
451+
private SubscribableListener<Void> lastRequestTrigger = new SubscribableListener<>();
452+
453+
@Override
454+
public String getName() {
455+
return ROUTE;
456+
}
457+
458+
@Override
459+
public List<Route> routes() {
460+
return List.of(new Route(GET, ROUTE));
461+
}
462+
463+
@Override
464+
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
465+
final var respondImmediately = request.paramAsBoolean(RESPOND_IMMEDIATELY, false);
466+
return channel -> {
467+
// all happens on a single thread in these tests, no need for concurrency protection
468+
final var previousRequestTrigger = lastRequestTrigger;
469+
lastRequestTrigger = respondImmediately ? SubscribableListener.nullSuccess() : new SubscribableListener<>();
470+
lastRequestTrigger.addListener(new EmptyResponseListener(channel).map(ignored -> ActionResponse.Empty.INSTANCE));
471+
previousRequestTrigger.onResponse(null);
472+
};
473+
}
474+
});
475+
}
476+
}
288477
}

muted-tests.yml

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -447,36 +447,42 @@ tests:
447447
- class: org.elasticsearch.indices.stats.IndexStatsIT
448448
method: testThrottleStats
449449
issue: https://github.com/elastic/elasticsearch/issues/126359
450-
- class: org.elasticsearch.packaging.test.DockerTests
451-
method: test040JavaUsesTheOsProvidedKeystore
452-
issue: https://github.com/elastic/elasticsearch/issues/127437
450+
- class: org.elasticsearch.search.vectors.IVFKnnFloatVectorQueryTests
451+
method: testRandomWithFilter
452+
issue: https://github.com/elastic/elasticsearch/issues/127963
453+
- class: org.elasticsearch.search.vectors.IVFKnnFloatVectorQueryTests
454+
method: testSearchBoost
455+
issue: https://github.com/elastic/elasticsearch/issues/127969
456+
- class: org.elasticsearch.search.vectors.IVFKnnFloatVectorQueryTests
457+
method: testFindFewer
458+
issue: https://github.com/elastic/elasticsearch/issues/128002
453459
- class: org.elasticsearch.xpack.remotecluster.RemoteClusterSecurityRestIT
454460
method: testTaskCancellation
455461
issue: https://github.com/elastic/elasticsearch/issues/128009
456462
- class: org.elasticsearch.xpack.esql.action.CrossClusterQueryWithPartialResultsIT
457463
method: testOneRemoteClusterPartial
458464
issue: https://github.com/elastic/elasticsearch/issues/124055
459-
- class: org.elasticsearch.packaging.test.DockerTests
460-
method: test041AmazonCaCertsAreInTheKeystore
461-
issue: https://github.com/elastic/elasticsearch/issues/128007
462465
- class: org.elasticsearch.compute.aggregation.SampleDoubleAggregatorFunctionTests
463466
method: testSimpleWithCranky
464467
issue: https://github.com/elastic/elasticsearch/issues/128024
465468
- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT
466469
method: test {lookup-join.MvJoinKeyOnTheLookupIndex ASYNC}
467470
issue: https://github.com/elastic/elasticsearch/issues/128030
471+
- class: org.elasticsearch.compute.aggregation.PercentileIntGroupingAggregatorFunctionTests
472+
method: testManyInitialManyPartialFinalRunner
473+
issue: https://github.com/elastic/elasticsearch/issues/128092
474+
- class: org.elasticsearch.packaging.test.DockerTests
475+
method: test042KeystorePermissionsAreCorrect
476+
issue: https://github.com/elastic/elasticsearch/issues/128018
468477
- class: org.elasticsearch.packaging.test.DockerTests
469478
method: test072RunEsAsDifferentUserAndGroup
470479
issue: https://github.com/elastic/elasticsearch/issues/128031
471480
- class: org.elasticsearch.packaging.test.DockerTests
472-
method: test042KeystorePermissionsAreCorrect
473-
issue: https://github.com/elastic/elasticsearch/issues/128019
481+
method: test122CanUseDockerLoggingConfig
482+
issue: https://github.com/elastic/elasticsearch/issues/128110
474483
- class: org.elasticsearch.packaging.test.DockerTests
475-
method: test073RunEsAsDifferentUserAndGroupWithoutBindMounting
476-
issue: https://github.com/elastic/elasticsearch/issues/128044
477-
- class: org.elasticsearch.compute.aggregation.PercentileIntGroupingAggregatorFunctionTests
478-
method: testManyInitialManyPartialFinalRunner
479-
issue: https://github.com/elastic/elasticsearch/issues/128092
484+
method: test041AmazonCaCertsAreInTheKeystore
485+
issue: https://github.com/elastic/elasticsearch/issues/128006
480486

481487
# Examples:
482488
#

qa/packaging/src/test/java/org/elasticsearch/packaging/util/docker/Docker.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,13 @@ public class Docker {
7575
public static final int STARTUP_SLEEP_INTERVAL_MILLISECONDS = 1000;
7676
public static final int STARTUP_ATTEMPTS_MAX = 30;
7777

78+
private static final String ELASTICSEARCH_FULL_CLASSNAME = "org.elasticsearch.bootstrap.Elasticsearch";
79+
private static final String FIND_ELASTICSEARCH_PROCESS = "for pid in $(ps -eo pid,comm | grep java | awk '\\''{print $1}'\\''); "
80+
+ "do cmdline=$(tr \"\\0\" \" \" < /proc/$pid/cmdline 2>/dev/null); [[ $cmdline == *"
81+
+ ELASTICSEARCH_FULL_CLASSNAME
82+
+ "* ]] && echo \"$pid: $cmdline\"; done";
83+
// The length of the command exceeds what we can use for COLUMNS so we use a pipe to detect the process we're looking for
84+
7885
/**
7986
* Tracks the currently running Docker image. An earlier implementation used a fixed container name,
8087
* but that appeared to cause problems with repeatedly destroying and recreating containers with
@@ -185,11 +192,8 @@ public static void waitForElasticsearchToStart() {
185192
try {
186193
// Give the container enough time for security auto-configuration or a chance to crash out
187194
Thread.sleep(STARTUP_SLEEP_INTERVAL_MILLISECONDS);
188-
189-
// Set COLUMNS so that `ps` doesn't truncate its output
190-
psOutput = dockerShell.run("bash -c 'COLUMNS=4000 ps ax'").stdout();
191-
192-
if (psOutput.contains("org.elasticsearch.bootstrap.Elasticsearch")) {
195+
psOutput = dockerShell.run("bash -c '" + FIND_ELASTICSEARCH_PROCESS + " | wc -l'").stdout();
196+
if (psOutput.contains("1")) {
193197
isElasticsearchRunning = true;
194198
break;
195199
}

0 commit comments

Comments
 (0)