Skip to content

Commit cae8263

Browse files
committed
cleanup
1 parent c07ea21 commit cae8263

File tree

5 files changed

+33
-34
lines changed

5 files changed

+33
-34
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.http.netty4;
1111

1212
import io.netty.buffer.Unpooled;
13+
import io.netty.channel.Channel;
1314
import io.netty.channel.ChannelHandlerContext;
1415
import io.netty.channel.ChannelInboundHandlerAdapter;
1516
import io.netty.handler.codec.DecoderResult;
@@ -37,26 +38,20 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
3738

3839
private final HttpValidator validator;
3940
private final ThreadContext threadContext;
40-
private GatedAutoRead.Gate autoRead;
41+
private final GatedAutoRead.Gate autoRead;
4142
private ArrayDeque<HttpObject> pending = new ArrayDeque<>(4);
4243
private State state = WAITING_TO_START;
4344

44-
public Netty4HttpHeaderValidator(HttpValidator validator, ThreadContext threadContext) {
45+
public Netty4HttpHeaderValidator(Channel channel, HttpValidator validator, ThreadContext threadContext) {
4546
this.validator = validator;
4647
this.threadContext = threadContext;
48+
this.autoRead = GatedAutoRead.newGate(channel);
4749
}
4850

4951
State getState() {
5052
return state;
5153
}
5254

53-
GatedAutoRead.Gate autoRead(ChannelHandlerContext ctx) {
54-
if (autoRead == null) {
55-
autoRead = GatedAutoRead.newGate(ctx.channel());
56-
}
57-
return autoRead;
58-
}
59-
6055
@SuppressWarnings("fallthrough")
6156
@Override
6257
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
@@ -69,7 +64,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
6964
pending.add(ReferenceCountUtil.retain(httpObject));
7065
requestStart(ctx);
7166
assert state == QUEUEING_DATA;
72-
assert autoRead(ctx).get() == false;
67+
assert autoRead.get() == false;
7368
break;
7469
case QUEUEING_DATA:
7570
pending.add(ReferenceCountUtil.retain(httpObject));
@@ -91,7 +86,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
9186
case DROPPING_DATA_PERMANENTLY:
9287
assert pending.isEmpty();
9388
ReferenceCountUtil.release(httpObject); // consume without enqueuing
94-
autoRead(ctx).set(false);
89+
autoRead.set(false);
9590
break;
9691
}
9792
}
@@ -114,7 +109,7 @@ private void requestStart(ChannelHandlerContext ctx) {
114109
}
115110

116111
state = QUEUEING_DATA;
117-
autoRead(ctx).set(false);
112+
autoRead.set(false);
118113

119114
if (httpRequest == null) {
120115
// this looks like a malformed request and will forward without validation
@@ -157,10 +152,10 @@ public void onFailure(Exception e) {
157152
private void forwardFullRequest(ChannelHandlerContext ctx) {
158153
Transports.assertDefaultThreadContext(threadContext);
159154
assert ctx.channel().eventLoop().inEventLoop();
160-
assert autoRead(ctx).get() == false;
155+
assert autoRead.get() == false;
161156
assert state == QUEUEING_DATA;
162157

163-
autoRead(ctx).set(true);
158+
autoRead.set(true);
164159
boolean fullRequestForwarded = forwardData(ctx, pending);
165160

166161
assert fullRequestForwarded || pending.isEmpty();
@@ -177,7 +172,7 @@ private void forwardFullRequest(ChannelHandlerContext ctx) {
177172
private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContext ctx, Exception e) {
178173
Transports.assertDefaultThreadContext(threadContext);
179174
assert ctx.channel().eventLoop().inEventLoop();
180-
assert autoRead(ctx).get() == false;
175+
assert autoRead.get() == false;
181176
assert state == QUEUEING_DATA;
182177

183178
HttpObject messageToForward = pending.getFirst();
@@ -188,7 +183,7 @@ private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContex
188183
}
189184
messageToForward.setDecoderResult(DecoderResult.failure(e));
190185

191-
autoRead(ctx).set(true);
186+
autoRead.set(true);
192187
ctx.fireChannelRead(messageToForward);
193188

194189
assert fullRequestDropped || pending.isEmpty();

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,7 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception {
371371
.addLast(
372372
"header_validator",
373373
HttpHeadersAuthenticatorUtils.getValidatorInboundHandler(
374+
ch,
374375
httpValidator,
375376
transport.getThreadPool().getThreadContext()
376377
)

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/internal/HttpHeadersAuthenticatorUtils.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.http.netty4.internal;
1111

12+
import io.netty.channel.Channel;
1213
import io.netty.handler.codec.http.DefaultHttpRequest;
1314
import io.netty.handler.codec.http.HttpMessage;
1415
import io.netty.handler.codec.http.HttpRequest;
@@ -41,8 +42,8 @@ private HttpHeadersAuthenticatorUtils() {}
4142
* The HTTP headers of the to-be-authenticated {@link HttpRequest} must be wrapped by the special
4243
* {@link HttpHeadersWithAuthenticationContext}, see {@link #wrapAsMessageWithAuthenticationContext(HttpMessage)}.
4344
*/
44-
public static Netty4HttpHeaderValidator getValidatorInboundHandler(HttpValidator validator, ThreadContext threadContext) {
45-
return new Netty4HttpHeaderValidator((httpRequest, channel, listener) -> {
45+
public static Netty4HttpHeaderValidator getValidatorInboundHandler(Channel chan, HttpValidator validator, ThreadContext threadContext) {
46+
return new Netty4HttpHeaderValidator(chan, (httpRequest, channel, listener) -> {
4647
// make sure authentication only runs on properly wrapped "authenticable" headers implementation
4748
if (httpRequest.headers() instanceof HttpHeadersWithAuthenticationContext httpHeadersWithAuthenticationContext) {
4849
validator.validate(httpRequest, channel, ActionListener.wrap(aVoid -> {
@@ -61,7 +62,7 @@ public static Netty4HttpHeaderValidator getValidatorInboundHandler(HttpValidator
6162
/**
6263
* Given a {@link DefaultHttpRequest} argument, this returns a new {@link DefaultHttpRequest} instance that's identical to the
6364
* passed-in one, but the headers of the latter can be authenticated, in the sense that the channel handlers returned by
64-
* {@link #getValidatorInboundHandler(HttpValidator, ThreadContext)} can use this to convey the authentication result context.
65+
* {@link #getValidatorInboundHandler(Channel, HttpValidator, ThreadContext)} can use this to convey the authentication result context.
6566
*/
6667
public static HttpMessage wrapAsMessageWithAuthenticationContext(HttpMessage newlyDecodedMessage) {
6768
assert newlyDecodedMessage instanceof HttpRequest;

modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderThreadContextTests.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public void testSuccessfulSyncValidationUntamperedThreadContext() throws Excepti
6868
channel.pipeline()
6969
.addLast(
7070
new Netty4HttpHeaderValidator(
71+
channel,
7172
getValidator(EsExecutors.DIRECT_EXECUTOR_SERVICE, isValidationSuccessful, null),
7273
threadPool.getThreadContext()
7374
)
@@ -88,6 +89,7 @@ public void testFailedSyncValidationUntamperedThreadContext() throws Exception {
8889
channel.pipeline()
8990
.addLast(
9091
new Netty4HttpHeaderValidator(
92+
channel,
9193
getValidator(EsExecutors.DIRECT_EXECUTOR_SERVICE, isValidationSuccessful, null),
9294
threadPool.getThreadContext()
9395
)
@@ -109,6 +111,7 @@ public void testSuccessfulAsyncValidationUntamperedThreadContext() throws Except
109111
channel.pipeline()
110112
.addLast(
111113
new Netty4HttpHeaderValidator(
114+
channel,
112115
// use a different executor/thread for the validator
113116
getValidator(threadPool.executor(ThreadPool.Names.MANAGEMENT), isValidationSuccessful, validationDone),
114117
threadPool.getThreadContext()
@@ -131,6 +134,7 @@ public void testUnsuccessfulAsyncValidationUntamperedThreadContext() throws Exce
131134
channel.pipeline()
132135
.addLast(
133136
new Netty4HttpHeaderValidator(
137+
channel,
134138
// use a different executor/thread for the validator
135139
getValidator(threadPool.executor(ThreadPool.Names.MANAGEMENT), isValidationSuccessful, validationDone),
136140
threadPool.getThreadContext()
@@ -147,20 +151,18 @@ public void testUnsuccessfulAsyncValidationUntamperedThreadContext() throws Exce
147151
}
148152

149153
private HttpValidator getValidator(ExecutorService executorService, AtomicBoolean success, Semaphore validationDone) {
150-
return (httpRequest, channel, listener) -> {
151-
executorService.submit(() -> {
152-
tamperThreadContext();
153-
if (success.get()) {
154-
listener.onResponse(null);
155-
} else {
156-
listener.onFailure(new Exception("Validation failure"));
157-
}
158-
if (validationDone != null) {
159-
validationDone.release();
160-
}
161-
});
162-
};
163-
};
154+
return (httpRequest, channel, listener) -> executorService.submit(() -> {
155+
tamperThreadContext();
156+
if (success.get()) {
157+
listener.onResponse(null);
158+
} else {
159+
listener.onFailure(new Exception("Validation failure"));
160+
}
161+
if (validationDone != null) {
162+
validationDone.release();
163+
}
164+
});
165+
}
164166

165167
private void tamperThreadContext() {
166168
boolean tampered = false;

modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ private void reset() {
7474
}
7575
listener.set(validationCompleteListener);
7676
};
77-
netty4HttpHeaderValidator = new Netty4HttpHeaderValidator(validator, new ThreadContext(Settings.EMPTY));
77+
netty4HttpHeaderValidator = new Netty4HttpHeaderValidator(channel, validator, new ThreadContext(Settings.EMPTY));
7878
channel.pipeline().addLast(netty4HttpHeaderValidator);
7979
}
8080

0 commit comments

Comments
 (0)