Skip to content

Commit c07ea21

Browse files
committed
test fix
1 parent 8cacf13 commit c07ea21

File tree

3 files changed

+31
-13
lines changed

3 files changed

+31
-13
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,11 @@ State getState() {
5050
return state;
5151
}
5252

53-
@Override
54-
public void channelActive(ChannelHandlerContext ctx) throws Exception {
55-
autoRead = GatedAutoRead.newGate(ctx.channel());
56-
super.channelActive(ctx);
53+
GatedAutoRead.Gate autoRead(ChannelHandlerContext ctx) {
54+
if (autoRead == null) {
55+
autoRead = GatedAutoRead.newGate(ctx.channel());
56+
}
57+
return autoRead;
5758
}
5859

5960
@SuppressWarnings("fallthrough")
@@ -68,7 +69,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
6869
pending.add(ReferenceCountUtil.retain(httpObject));
6970
requestStart(ctx);
7071
assert state == QUEUEING_DATA;
71-
assert autoRead.get() == false;
72+
assert autoRead(ctx).get() == false;
7273
break;
7374
case QUEUEING_DATA:
7475
pending.add(ReferenceCountUtil.retain(httpObject));
@@ -90,7 +91,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
9091
case DROPPING_DATA_PERMANENTLY:
9192
assert pending.isEmpty();
9293
ReferenceCountUtil.release(httpObject); // consume without enqueuing
93-
autoRead.set(false);
94+
autoRead(ctx).set(false);
9495
break;
9596
}
9697
}
@@ -113,7 +114,7 @@ private void requestStart(ChannelHandlerContext ctx) {
113114
}
114115

115116
state = QUEUEING_DATA;
116-
autoRead.set(false);
117+
autoRead(ctx).set(false);
117118

118119
if (httpRequest == null) {
119120
// this looks like a malformed request and will forward without validation
@@ -156,10 +157,10 @@ public void onFailure(Exception e) {
156157
private void forwardFullRequest(ChannelHandlerContext ctx) {
157158
Transports.assertDefaultThreadContext(threadContext);
158159
assert ctx.channel().eventLoop().inEventLoop();
159-
assert autoRead.get() == false;
160+
assert autoRead(ctx).get() == false;
160161
assert state == QUEUEING_DATA;
161162

162-
autoRead.set(true);
163+
autoRead(ctx).set(true);
163164
boolean fullRequestForwarded = forwardData(ctx, pending);
164165

165166
assert fullRequestForwarded || pending.isEmpty();
@@ -176,7 +177,7 @@ private void forwardFullRequest(ChannelHandlerContext ctx) {
176177
private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContext ctx, Exception e) {
177178
Transports.assertDefaultThreadContext(threadContext);
178179
assert ctx.channel().eventLoop().inEventLoop();
179-
assert autoRead.get() == false;
180+
assert autoRead(ctx).get() == false;
180181
assert state == QUEUEING_DATA;
181182

182183
HttpObject messageToForward = pending.getFirst();
@@ -187,7 +188,7 @@ private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContex
187188
}
188189
messageToForward.setDecoderResult(DecoderResult.failure(e));
189190

190-
autoRead.set(true);
191+
autoRead(ctx).set(true);
191192
ctx.fireChannelRead(messageToForward);
192193

193194
assert fullRequestDropped || pending.isEmpty();

modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ private void reset() {
7676
};
7777
netty4HttpHeaderValidator = new Netty4HttpHeaderValidator(validator, new ThreadContext(Settings.EMPTY));
7878
channel.pipeline().addLast(netty4HttpHeaderValidator);
79-
channel.pipeline().fireChannelActive();
8079
}
8180

8281
public void testValidationPausesAndResumesData() {

x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportCloseNotifyTests.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
2828
import io.netty.util.concurrent.Future;
2929

30+
import org.elasticsearch.action.ActionListener;
3031
import org.elasticsearch.common.network.NetworkService;
3132
import org.elasticsearch.common.settings.Settings;
3233
import org.elasticsearch.common.util.concurrent.ThreadContext;
@@ -35,6 +36,7 @@
3536
import org.elasticsearch.http.HttpServerTransport;
3637
import org.elasticsearch.http.netty4.Netty4HttpServerTransport;
3738
import org.elasticsearch.rest.RestChannel;
39+
import org.elasticsearch.rest.RestContentAggregator;
3840
import org.elasticsearch.rest.RestRequest;
3941
import org.elasticsearch.rest.RestResponse;
4042
import org.elasticsearch.rest.RestStatus;
@@ -261,7 +263,23 @@ private static class QueuedDispatcher implements HttpServerTransport.Dispatcher
261263

262264
@Override
263265
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
264-
reqQueue.add(new ReqCtx(request, channel, threadContext));
266+
if (request.isStreamedContent()) {
267+
RestContentAggregator.aggregate(request, new ActionListener<>() {
268+
@Override
269+
public void onResponse(RestRequest restRequest) {
270+
reqQueue.add(new ReqCtx(request, channel, threadContext));
271+
}
272+
273+
@Override
274+
public void onFailure(Exception e) {
275+
errQueue.add(new ErrCtx(channel, threadContext, e));
276+
277+
}
278+
});
279+
} else {
280+
reqQueue.add(new ReqCtx(request, channel, threadContext));
281+
}
282+
265283
}
266284

267285
@Override

0 commit comments

Comments
 (0)