Skip to content

Commit 9f9220c

Browse files
authored
Merge branch 'main' into remove-accidentally-mutes
2 parents e9ebf82 + 9de75e6 commit 9f9220c

File tree

20 files changed

+602
-18
lines changed

20 files changed

+602
-18
lines changed

docs/changelog/127910.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 127910
2+
summary: Add Microsoft Graph Delegated Authorization Realm Plugin
3+
area: Authorization
4+
type: enhancement
5+
issues: []

docs/changelog/128025.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 128025
2+
summary: "Set `connection: close` header on shutdown"
3+
area: Network
4+
type: enhancement
5+
issues:
6+
- 127984

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4PipeliningIT.java

Lines changed: 191 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,26 @@
99

1010
package org.elasticsearch.http.netty4;
1111

12+
import io.netty.bootstrap.Bootstrap;
13+
import io.netty.channel.ChannelHandlerContext;
14+
import io.netty.channel.ChannelInitializer;
15+
import io.netty.channel.ChannelOption;
16+
import io.netty.channel.EventLoopGroup;
17+
import io.netty.channel.SimpleChannelInboundHandler;
18+
import io.netty.channel.nio.NioEventLoopGroup;
19+
import io.netty.channel.socket.SocketChannel;
20+
import io.netty.handler.codec.http.DefaultFullHttpRequest;
21+
import io.netty.handler.codec.http.HttpClientCodec;
22+
import io.netty.handler.codec.http.HttpMethod;
23+
import io.netty.handler.codec.http.HttpResponse;
24+
import io.netty.handler.codec.http.HttpVersion;
1225
import io.netty.util.ReferenceCounted;
1326

1427
import org.apache.lucene.util.BytesRef;
1528
import org.elasticsearch.ESNetty4IntegTestCase;
29+
import org.elasticsearch.ExceptionsHelper;
1630
import org.elasticsearch.action.ActionListener;
31+
import org.elasticsearch.action.ActionResponse;
1732
import org.elasticsearch.action.support.CountDownActionListener;
1833
import org.elasticsearch.action.support.SubscribableListener;
1934
import org.elasticsearch.client.internal.node.NodeClient;
@@ -29,6 +44,8 @@
2944
import org.elasticsearch.common.settings.SettingsFilter;
3045
import org.elasticsearch.common.unit.ByteSizeUnit;
3146
import org.elasticsearch.common.util.CollectionUtils;
47+
import org.elasticsearch.core.Releasable;
48+
import org.elasticsearch.core.Releasables;
3249
import org.elasticsearch.core.Strings;
3350
import org.elasticsearch.features.NodeFeature;
3451
import org.elasticsearch.http.HttpServerTransport;
@@ -41,29 +58,42 @@
4158
import org.elasticsearch.rest.RestRequest;
4259
import org.elasticsearch.rest.RestResponse;
4360
import org.elasticsearch.rest.RestStatus;
61+
import org.elasticsearch.rest.action.EmptyResponseListener;
4462
import org.elasticsearch.rest.action.RestToXContentListener;
4563
import org.elasticsearch.test.ESIntegTestCase;
64+
import org.elasticsearch.transport.netty4.NettyAllocator;
4665
import org.elasticsearch.xcontent.ToXContentObject;
4766

4867
import java.io.IOException;
68+
import java.util.ArrayList;
4969
import java.util.Arrays;
5070
import java.util.Collection;
71+
import java.util.Collections;
5172
import java.util.List;
73+
import java.util.concurrent.CountDownLatch;
74+
import java.util.concurrent.TimeUnit;
75+
import java.util.concurrent.atomic.AtomicBoolean;
76+
import java.util.concurrent.atomic.AtomicInteger;
5277
import java.util.function.Predicate;
5378
import java.util.function.Supplier;
5479

5580
import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS;
5681
import static org.elasticsearch.rest.RestRequest.Method.GET;
82+
import static org.hamcrest.Matchers.equalTo;
5783
import static org.hamcrest.Matchers.hasSize;
5884
import static org.hamcrest.Matchers.is;
5985
import static org.hamcrest.Matchers.lessThanOrEqualTo;
86+
import static org.hamcrest.Matchers.oneOf;
6087

6188
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
6289
public class Netty4PipeliningIT extends ESNetty4IntegTestCase {
6390

6491
@Override
6592
protected Collection<Class<? extends Plugin>> nodePlugins() {
66-
return CollectionUtils.concatLists(List.of(CountDown3Plugin.class, ChunkAndFailPlugin.class), super.nodePlugins());
93+
return CollectionUtils.concatLists(
94+
List.of(CountDown3Plugin.class, ChunkAndFailPlugin.class, KeepPipeliningPlugin.class),
95+
super.nodePlugins()
96+
);
6797
}
6898

6999
private static final int MAX_PIPELINE_EVENTS = 10;
@@ -142,6 +172,115 @@ private void runPipeliningTest(int expectedResponseCount, String... routes) thro
142172
}
143173
}
144174

175+
public void testSetCloseConnectionHeaderWhenShuttingDown() throws IOException {
176+
177+
// This test works using KeepPipeliningPlugin to keep a HTTP connection from becoming idle with a sequence of requests while the
178+
// node shuts down and ensures that these requests start to receive responses with `Connection: close` and that the node does not
179+
// shut down until all requests have received a response.
180+
181+
final var victimNode = internalCluster().startNode();
182+
183+
final var releasables = new ArrayList<Releasable>(3);
184+
try {
185+
final var keepPipeliningRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, KeepPipeliningPlugin.ROUTE);
186+
releasables.add(keepPipeliningRequest::release);
187+
188+
final var enoughResponsesToCloseLatch = new CountDownLatch(between(1, 5));
189+
final var outstandingRequestsCounter = new AtomicInteger();
190+
final var nodeShuttingDown = new AtomicBoolean();
191+
final var stoppedPipelining = new AtomicBoolean();
192+
193+
final EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
194+
releasables.add(() -> eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).awaitUninterruptibly());
195+
final var clientBootstrap = new Bootstrap().channel(NettyAllocator.getChannelType())
196+
.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator())
197+
.group(eventLoopGroup)
198+
.handler(new ChannelInitializer<SocketChannel>() {
199+
@Override
200+
protected void initChannel(SocketChannel ch) {
201+
ch.pipeline().addLast(new HttpClientCodec());
202+
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpResponse>() {
203+
204+
private int closeHeadersToIgnore = between(0, 5);
205+
206+
private boolean ignoreCloseHeader() {
207+
if (closeHeadersToIgnore == 0) {
208+
return false;
209+
} else {
210+
closeHeadersToIgnore -= 1;
211+
return true;
212+
}
213+
}
214+
215+
@Override
216+
protected void channelRead0(ChannelHandlerContext ctx, HttpResponse msg) {
217+
enoughResponsesToCloseLatch.countDown();
218+
assertThat(
219+
outstandingRequestsCounter.decrementAndGet(),
220+
stoppedPipelining.get() ? oneOf(0, 1) : equalTo(1)
221+
);
222+
223+
if ("close".equals(msg.headers().get("connection")) && ignoreCloseHeader() == false) {
224+
assertTrue(nodeShuttingDown.get());
225+
// send one more request with `?respond_immediately` to stop the pipelining
226+
if (stoppedPipelining.compareAndSet(false, true)) {
227+
assertThat(outstandingRequestsCounter.incrementAndGet(), equalTo(2));
228+
ctx.writeAndFlush(
229+
new DefaultFullHttpRequest(
230+
HttpVersion.HTTP_1_1,
231+
HttpMethod.GET,
232+
KeepPipeliningPlugin.ROUTE + "?" + KeepPipeliningPlugin.RESPOND_IMMEDIATELY
233+
)
234+
);
235+
}
236+
} else {
237+
// still pipelining, send another request to trigger the next response
238+
assertThat(outstandingRequestsCounter.incrementAndGet(), equalTo(2));
239+
ctx.writeAndFlush(keepPipeliningRequest.retain());
240+
}
241+
}
242+
243+
@Override
244+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
245+
ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(cause));
246+
}
247+
});
248+
}
249+
});
250+
251+
final var httpServerTransport = internalCluster().getInstance(HttpServerTransport.class, victimNode);
252+
final var httpServerAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses()).address();
253+
254+
// Open a channel on which we will pipeline the requests to KeepPipeliningPlugin.ROUTE
255+
final var pipeliningChannel = clientBootstrap.connect(httpServerAddress).syncUninterruptibly().channel();
256+
releasables.add(() -> pipeliningChannel.close().syncUninterruptibly());
257+
258+
// Send two pipelined requests so that we start to receive responses
259+
assertTrue(outstandingRequestsCounter.compareAndSet(0, 2));
260+
pipeliningChannel.writeAndFlush(keepPipeliningRequest.retain());
261+
pipeliningChannel.writeAndFlush(keepPipeliningRequest.retain());
262+
263+
// wait until we've started to receive responses
264+
safeAwait(enoughResponsesToCloseLatch);
265+
266+
// Shut down the node
267+
assertTrue(nodeShuttingDown.compareAndSet(false, true));
268+
internalCluster().stopNode(victimNode);
269+
270+
// Wait for the pipelining channel to be closed, indicating that it stopped pipelining (because it received a response with
271+
// `Connection: close`) and allowed the node to shut down
272+
pipeliningChannel.closeFuture().syncUninterruptibly();
273+
274+
// The shutdown did not happen until all requests had had a response.
275+
assertTrue(stoppedPipelining.get());
276+
assertEquals(0, outstandingRequestsCounter.get());
277+
278+
} finally {
279+
Collections.reverse(releasables);
280+
Releasables.close(releasables);
281+
}
282+
}
283+
145284
private void assertOpaqueIdsInOrder(Collection<String> opaqueIds) {
146285
// check if opaque ids are monotonically increasing
147286
int i = 0;
@@ -203,7 +342,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
203342
}
204343

205344
/**
206-
* Adds an HTTP route that waits for 3 concurrent executions before returning any of them
345+
* Adds an HTTP route that starts to emit a chunked response and then fails before its completion.
207346
*/
208347
public static class ChunkAndFailPlugin extends Plugin implements ActionPlugin {
209348

@@ -285,4 +424,54 @@ public String getResponseContentTypeString() {
285424
});
286425
}
287426
}
427+
428+
/**
429+
* Adds an HTTP route that only responds when starting to process a second request, ensuring that there is always at least one in-flight
430+
* request in the pipeline which keeps a connection from becoming idle.
431+
*/
432+
public static class KeepPipeliningPlugin extends Plugin implements ActionPlugin {
433+
434+
static final String ROUTE = "/_test/keep_pipelining";
435+
static final String RESPOND_IMMEDIATELY = "respond_immediately";
436+
437+
@Override
438+
public Collection<RestHandler> getRestHandlers(
439+
Settings settings,
440+
NamedWriteableRegistry namedWriteableRegistry,
441+
RestController restController,
442+
ClusterSettings clusterSettings,
443+
IndexScopedSettings indexScopedSettings,
444+
SettingsFilter settingsFilter,
445+
IndexNameExpressionResolver indexNameExpressionResolver,
446+
Supplier<DiscoveryNodes> nodesInCluster,
447+
Predicate<NodeFeature> clusterSupportsFeature
448+
) {
449+
return List.of(new BaseRestHandler() {
450+
451+
private SubscribableListener<Void> lastRequestTrigger = new SubscribableListener<>();
452+
453+
@Override
454+
public String getName() {
455+
return ROUTE;
456+
}
457+
458+
@Override
459+
public List<Route> routes() {
460+
return List.of(new Route(GET, ROUTE));
461+
}
462+
463+
@Override
464+
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
465+
final var respondImmediately = request.paramAsBoolean(RESPOND_IMMEDIATELY, false);
466+
return channel -> {
467+
// all happens on a single thread in these tests, no need for concurrency protection
468+
final var previousRequestTrigger = lastRequestTrigger;
469+
lastRequestTrigger = respondImmediately ? SubscribableListener.nullSuccess() : new SubscribableListener<>();
470+
lastRequestTrigger.addListener(new EmptyResponseListener(channel).map(ignored -> ActionResponse.Empty.INSTANCE));
471+
previousRequestTrigger.onResponse(null);
472+
};
473+
}
474+
});
475+
}
476+
}
288477
}

muted-tests.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,18 @@ tests:
462462
- class: org.elasticsearch.compute.aggregation.PercentileIntGroupingAggregatorFunctionTests
463463
method: testManyInitialManyPartialFinalRunner
464464
issue: https://github.com/elastic/elasticsearch/issues/128092
465+
- class: org.elasticsearch.packaging.test.DockerTests
466+
method: test042KeystorePermissionsAreCorrect
467+
issue: https://github.com/elastic/elasticsearch/issues/128018
468+
- class: org.elasticsearch.packaging.test.DockerTests
469+
method: test072RunEsAsDifferentUserAndGroup
470+
issue: https://github.com/elastic/elasticsearch/issues/128031
471+
- class: org.elasticsearch.packaging.test.DockerTests
472+
method: test122CanUseDockerLoggingConfig
473+
issue: https://github.com/elastic/elasticsearch/issues/128110
474+
- class: org.elasticsearch.packaging.test.DockerTests
475+
method: test041AmazonCaCertsAreInTheKeystore
476+
issue: https://github.com/elastic/elasticsearch/issues/128006
465477

466478
# Examples:
467479
#
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
apply plugin: "elasticsearch.internal-java-rest-test"
11+
12+
esplugin {
13+
name = "microsoft-graph-authz"
14+
description = "Microsoft Graph Delegated Authorization Realm Plugin"
15+
classname = "org.elasticsearch.xpack.security.authz.microsoft.MicrosoftGraphAuthzPlugin"
16+
extendedPlugins = ["x-pack-security"]
17+
}
18+
19+
dependencies {
20+
compileOnly project(":x-pack:plugin:core")
21+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
import org.elasticsearch.xpack.security.authz.microsoft.MicrosoftGraphAuthzPlugin;
11+
12+
module org.elasticsearch.plugin.security.authz {
13+
requires org.elasticsearch.base;
14+
requires org.elasticsearch.server;
15+
requires org.elasticsearch.xcore;
16+
requires org.elasticsearch.logging;
17+
18+
provides org.elasticsearch.xpack.core.security.SecurityExtension with MicrosoftGraphAuthzPlugin;
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.xpack.security.authz.microsoft;
11+
12+
import org.elasticsearch.common.settings.Setting;
13+
import org.elasticsearch.plugins.Plugin;
14+
import org.elasticsearch.xpack.core.security.SecurityExtension;
15+
import org.elasticsearch.xpack.core.security.authc.Realm;
16+
17+
import java.util.List;
18+
import java.util.Map;
19+
20+
public class MicrosoftGraphAuthzPlugin extends Plugin implements SecurityExtension {
21+
@Override
22+
public Map<String, Realm.Factory> getRealms(SecurityComponents components) {
23+
return Map.of(MicrosoftGraphAuthzRealmSettings.REALM_TYPE, MicrosoftGraphAuthzRealm::new);
24+
}
25+
26+
@Override
27+
public List<Setting<?>> getSettings() {
28+
return MicrosoftGraphAuthzRealmSettings.getSettings();
29+
}
30+
}

0 commit comments

Comments
 (0)