Skip to content

Commit 9df8e28

Browse files
[7.17] Fix lingering license warning header (#108408)
* Backport done * Get port range * Fix code license
1 parent 6042942 commit 9df8e28

File tree

7 files changed

+555
-8
lines changed

7 files changed

+555
-8
lines changed

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/IpFilterRemoteAddressFilter.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.netty.channel.ChannelHandlerContext;
1111
import io.netty.handler.ipfilter.AbstractRemoteAddressFilter;
1212

13+
import org.elasticsearch.common.util.concurrent.ThreadContext;
1314
import org.elasticsearch.xpack.security.transport.filter.IPFilter;
1415

1516
import java.net.InetSocketAddress;
@@ -19,16 +20,19 @@ class IpFilterRemoteAddressFilter extends AbstractRemoteAddressFilter<InetSocket
1920

2021
private final IPFilter filter;
2122
private final String profile;
23+
private final ThreadContext threadContext;
2224

23-
IpFilterRemoteAddressFilter(final IPFilter filter, final String profile) {
25+
IpFilterRemoteAddressFilter(final IPFilter filter, final String profile, final ThreadContext threadcontext) {
2426
this.filter = filter;
2527
this.profile = profile;
28+
this.threadContext = threadcontext;
2629
}
2730

2831
@Override
2932
protected boolean accept(final ChannelHandlerContext ctx, final InetSocketAddress remoteAddress) throws Exception {
30-
// at this stage no auth has happened, so we do not have any principal anyway
31-
return filter.accept(profile, remoteAddress);
33+
// this prevents thread-context changes to propagate beyond the channel accept test, as netty worker threads are reused
34+
try (ThreadContext.StoredContext ignore = threadContext.newStoredContext(false)) {
35+
return filter.accept(profile, remoteAddress);
36+
}
3237
}
33-
3438
}

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,11 @@ protected void initChannel(Channel ch) throws Exception {
110110
sslEngine.setUseClientMode(false);
111111
ch.pipeline().addFirst("ssl", new SslHandler(sslEngine));
112112
}
113-
ch.pipeline().addFirst("ip_filter", new IpFilterRemoteAddressFilter(ipFilter, IPFilter.HTTP_PROFILE_NAME));
113+
ch.pipeline()
114+
.addFirst(
115+
"ip_filter",
116+
new IpFilterRemoteAddressFilter(ipFilter, IPFilter.HTTP_PROFILE_NAME, threadPool.getThreadContext())
117+
);
114118
}
115119
}
116120
}

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ protected void initChannel(final Channel ch) throws Exception {
101101

102102
private void maybeAddIPFilter(final Channel ch, final String name) {
103103
if (authenticator != null) {
104-
ch.pipeline().addFirst("ipfilter", new IpFilterRemoteAddressFilter(authenticator, name));
104+
ch.pipeline().addFirst("ipfilter", new IpFilterRemoteAddressFilter(authenticator, name, threadPool.getThreadContext()));
105105
}
106106
}
107107

x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/IpFilterRemoteAddressFilterTests.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818
import org.elasticsearch.license.MockLicenseState;
1919
import org.elasticsearch.license.TestUtils;
2020
import org.elasticsearch.test.ESTestCase;
21+
import org.elasticsearch.threadpool.TestThreadPool;
22+
import org.elasticsearch.threadpool.ThreadPool;
2123
import org.elasticsearch.transport.Transport;
2224
import org.elasticsearch.xpack.security.Security;
2325
import org.elasticsearch.xpack.security.audit.AuditTrailService;
2426
import org.elasticsearch.xpack.security.transport.filter.IPFilter;
27+
import org.junit.After;
2528
import org.junit.Before;
2629

2730
import java.net.InetAddress;
@@ -36,6 +39,19 @@
3639

3740
public class IpFilterRemoteAddressFilterTests extends ESTestCase {
3841
private IpFilterRemoteAddressFilter handler;
42+
private ThreadPool threadPool;
43+
44+
@Override
45+
@Before
46+
public void setUp() throws Exception {
47+
super.setUp();
48+
this.threadPool = new TestThreadPool(getTestName());
49+
}
50+
51+
@After
52+
public void terminate() throws Exception {
53+
terminate(threadPool);
54+
}
3955

4056
@Before
4157
public void init() throws Exception {
@@ -80,9 +96,9 @@ public void init() throws Exception {
8096
}
8197

8298
if (isHttpEnabled) {
83-
handler = new IpFilterRemoteAddressFilter(ipFilter, IPFilter.HTTP_PROFILE_NAME);
99+
handler = new IpFilterRemoteAddressFilter(ipFilter, IPFilter.HTTP_PROFILE_NAME, threadPool.getThreadContext());
84100
} else {
85-
handler = new IpFilterRemoteAddressFilter(ipFilter, "default");
101+
handler = new IpFilterRemoteAddressFilter(ipFilter, "default", threadPool.getThreadContext());
86102
}
87103
}
88104

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.security.transport.netty4;
9+
10+
import io.netty.bootstrap.Bootstrap;
11+
import io.netty.buffer.ByteBuf;
12+
import io.netty.buffer.Unpooled;
13+
import io.netty.channel.ChannelFuture;
14+
import io.netty.channel.ChannelHandlerContext;
15+
import io.netty.channel.ChannelInitializer;
16+
import io.netty.channel.ChannelOption;
17+
import io.netty.channel.SimpleChannelInboundHandler;
18+
import io.netty.channel.nio.NioEventLoopGroup;
19+
import io.netty.channel.socket.SocketChannel;
20+
import io.netty.handler.codec.http.DefaultFullHttpRequest;
21+
import io.netty.handler.codec.http.FullHttpRequest;
22+
import io.netty.handler.codec.http.FullHttpResponse;
23+
import io.netty.handler.codec.http.HttpContentDecompressor;
24+
import io.netty.handler.codec.http.HttpHeaderNames;
25+
import io.netty.handler.codec.http.HttpMethod;
26+
import io.netty.handler.codec.http.HttpObject;
27+
import io.netty.handler.codec.http.HttpObjectAggregator;
28+
import io.netty.handler.codec.http.HttpRequest;
29+
import io.netty.handler.codec.http.HttpRequestEncoder;
30+
import io.netty.handler.codec.http.HttpResponse;
31+
import io.netty.handler.codec.http.HttpResponseDecoder;
32+
import io.netty.handler.codec.http.HttpVersion;
33+
34+
import org.elasticsearch.common.unit.ByteSizeUnit;
35+
import org.elasticsearch.common.unit.ByteSizeValue;
36+
import org.elasticsearch.core.Tuple;
37+
import org.elasticsearch.tasks.Task;
38+
import org.elasticsearch.transport.NettyAllocator;
39+
40+
import java.io.Closeable;
41+
import java.net.SocketAddress;
42+
import java.nio.charset.StandardCharsets;
43+
import java.util.ArrayList;
44+
import java.util.Collection;
45+
import java.util.Collections;
46+
import java.util.List;
47+
import java.util.concurrent.CountDownLatch;
48+
import java.util.concurrent.TimeUnit;
49+
50+
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
51+
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
52+
import static org.junit.Assert.fail;
53+
54+
/**
55+
* Tiny helper to send http requests over netty.
56+
*/
57+
class Netty4HttpClient implements Closeable {
58+
59+
static Collection<String> returnHttpResponseBodies(Collection<FullHttpResponse> responses) {
60+
List<String> list = new ArrayList<>(responses.size());
61+
for (FullHttpResponse response : responses) {
62+
list.add(response.content().toString(StandardCharsets.UTF_8));
63+
}
64+
return list;
65+
}
66+
67+
static Collection<String> returnOpaqueIds(Collection<FullHttpResponse> responses) {
68+
List<String> list = new ArrayList<>(responses.size());
69+
for (HttpResponse response : responses) {
70+
list.add(response.headers().get(Task.X_OPAQUE_ID_HTTP_HEADER));
71+
}
72+
return list;
73+
}
74+
75+
private final Bootstrap clientBootstrap;
76+
77+
Netty4HttpClient() {
78+
clientBootstrap = new Bootstrap().channel(NettyAllocator.getChannelType())
79+
.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator())
80+
.group(new NioEventLoopGroup(1));
81+
}
82+
83+
public List<FullHttpResponse> get(SocketAddress remoteAddress, String... uris) throws InterruptedException {
84+
List<HttpRequest> requests = new ArrayList<>(uris.length);
85+
for (int i = 0; i < uris.length; i++) {
86+
final HttpRequest httpRequest = new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, uris[i]);
87+
httpRequest.headers().add(HOST, "localhost");
88+
httpRequest.headers().add("X-Opaque-ID", String.valueOf(i));
89+
httpRequest.headers().add("traceparent", "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01");
90+
requests.add(httpRequest);
91+
}
92+
return sendRequests(remoteAddress, requests);
93+
}
94+
95+
public final Collection<FullHttpResponse> post(SocketAddress remoteAddress, List<Tuple<String, CharSequence>> urisAndBodies)
96+
throws InterruptedException {
97+
return processRequestsWithBody(HttpMethod.POST, remoteAddress, urisAndBodies);
98+
}
99+
100+
public final FullHttpResponse send(SocketAddress remoteAddress, FullHttpRequest httpRequest) throws InterruptedException {
101+
List<FullHttpResponse> responses = sendRequests(remoteAddress, Collections.singleton(httpRequest));
102+
assert responses.size() == 1 : "expected 1 and only 1 http response";
103+
return responses.get(0);
104+
}
105+
106+
public final Collection<FullHttpResponse> put(SocketAddress remoteAddress, List<Tuple<String, CharSequence>> urisAndBodies)
107+
throws InterruptedException {
108+
return processRequestsWithBody(HttpMethod.PUT, remoteAddress, urisAndBodies);
109+
}
110+
111+
private List<FullHttpResponse> processRequestsWithBody(
112+
HttpMethod method,
113+
SocketAddress remoteAddress,
114+
List<Tuple<String, CharSequence>> urisAndBodies
115+
) throws InterruptedException {
116+
List<HttpRequest> requests = new ArrayList<>(urisAndBodies.size());
117+
for (Tuple<String, CharSequence> uriAndBody : urisAndBodies) {
118+
ByteBuf content = Unpooled.copiedBuffer(uriAndBody.v2(), StandardCharsets.UTF_8);
119+
HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, uriAndBody.v1(), content);
120+
request.headers().add(HttpHeaderNames.HOST, "localhost");
121+
request.headers().add(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
122+
request.headers().add(HttpHeaderNames.CONTENT_TYPE, "application/json");
123+
requests.add(request);
124+
}
125+
return sendRequests(remoteAddress, requests);
126+
}
127+
128+
private synchronized List<FullHttpResponse> sendRequests(final SocketAddress remoteAddress, final Collection<HttpRequest> requests)
129+
throws InterruptedException {
130+
final CountDownLatch latch = new CountDownLatch(requests.size());
131+
final List<FullHttpResponse> content = Collections.synchronizedList(new ArrayList<>(requests.size()));
132+
133+
clientBootstrap.handler(new CountDownLatchHandler(latch, content));
134+
135+
ChannelFuture channelFuture = null;
136+
try {
137+
channelFuture = clientBootstrap.connect(remoteAddress);
138+
channelFuture.sync();
139+
140+
for (HttpRequest request : requests) {
141+
channelFuture.channel().writeAndFlush(request);
142+
}
143+
if (latch.await(30L, TimeUnit.SECONDS) == false) {
144+
fail("Failed to get all expected responses.");
145+
}
146+
147+
} finally {
148+
if (channelFuture != null) {
149+
channelFuture.channel().close().sync();
150+
}
151+
}
152+
153+
return content;
154+
}
155+
156+
@Override
157+
public void close() {
158+
clientBootstrap.config().group().shutdownGracefully().awaitUninterruptibly();
159+
}
160+
161+
/**
162+
* helper factory which adds returned data to a list and uses a count down latch to decide when done
163+
*/
164+
private static class CountDownLatchHandler extends ChannelInitializer<SocketChannel> {
165+
166+
private final CountDownLatch latch;
167+
private final Collection<FullHttpResponse> content;
168+
169+
CountDownLatchHandler(final CountDownLatch latch, final Collection<FullHttpResponse> content) {
170+
this.latch = latch;
171+
this.content = content;
172+
}
173+
174+
@Override
175+
protected void initChannel(SocketChannel ch) {
176+
final int maxContentLength = new ByteSizeValue(100, ByteSizeUnit.MB).bytesAsInt();
177+
ch.pipeline().addLast(new HttpResponseDecoder());
178+
ch.pipeline().addLast(new HttpRequestEncoder());
179+
ch.pipeline().addLast(new HttpContentDecompressor());
180+
ch.pipeline().addLast(new HttpObjectAggregator(maxContentLength));
181+
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpObject>() {
182+
@Override
183+
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
184+
final FullHttpResponse response = (FullHttpResponse) msg;
185+
// We copy the buffer manually to avoid a huge allocation on a pooled allocator. We have
186+
// a test that tracks huge allocations, so we want to avoid them in this test code.
187+
ByteBuf newContent = Unpooled.copiedBuffer(((FullHttpResponse) msg).content());
188+
content.add(response.replace(newContent));
189+
latch.countDown();
190+
}
191+
192+
@Override
193+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
194+
super.exceptionCaught(ctx, cause);
195+
latch.countDown();
196+
}
197+
});
198+
}
199+
200+
}
201+
202+
}

0 commit comments

Comments
 (0)