Skip to content

Commit 1901c71

Browse files
authored
Re-add http stream content size handler (#121095)
1 parent 74f9178 commit 1901c71

File tree

5 files changed

+461
-81
lines changed

5 files changed

+461
-81
lines changed

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java

Lines changed: 49 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import org.elasticsearch.common.util.CollectionUtils;
5555
import org.elasticsearch.features.NodeFeature;
5656
import org.elasticsearch.http.HttpBodyTracer;
57-
import org.elasticsearch.http.HttpHandlingSettings;
5857
import org.elasticsearch.http.HttpServerTransport;
5958
import org.elasticsearch.http.HttpTransportSettings;
6059
import org.elasticsearch.plugins.ActionPlugin;
@@ -93,10 +92,15 @@
9392
@ESIntegTestCase.ClusterScope(numDataNodes = 1)
9493
public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
9594

95+
private static final int MAX_CONTENT_LENGTH = ByteSizeUnit.MB.toIntBytes(50);
96+
9697
@Override
9798
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
9899
Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
99-
builder.put(HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(), ByteSizeValue.of(50, ByteSizeUnit.MB));
100+
builder.put(
101+
HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(),
102+
ByteSizeValue.of(MAX_CONTENT_LENGTH, ByteSizeUnit.BYTES)
103+
);
100104
return builder.build();
101105
}
102106

@@ -135,7 +139,7 @@ public void testReceiveAllChunks() throws Exception {
135139
var opaqueId = opaqueId(reqNo);
136140

137141
// this dataset will be compared with one on server side
138-
var dataSize = randomIntBetween(1024, maxContentLength());
142+
var dataSize = randomIntBetween(1024, MAX_CONTENT_LENGTH);
139143
var sendData = Unpooled.wrappedBuffer(randomByteArrayOfLength(dataSize));
140144
sendData.retain();
141145
ctx.clientChannel.writeAndFlush(fullHttpRequest(opaqueId, sendData));
@@ -243,7 +247,7 @@ public void testServerExceptionMidStream() throws Exception {
243247
public void testClientBackpressure() throws Exception {
244248
try (var ctx = setupClientCtx()) {
245249
var opaqueId = opaqueId(0);
246-
var payloadSize = maxContentLength();
250+
var payloadSize = MAX_CONTENT_LENGTH;
247251
var totalParts = 10;
248252
var partSize = payloadSize / totalParts;
249253
ctx.clientChannel.writeAndFlush(httpRequest(opaqueId, payloadSize));
@@ -285,7 +289,7 @@ public void test100Continue() throws Exception {
285289
try (var ctx = setupClientCtx()) {
286290
for (int reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
287291
var id = opaqueId(reqNo);
288-
var acceptableContentLength = randomIntBetween(0, maxContentLength());
292+
var acceptableContentLength = randomIntBetween(0, MAX_CONTENT_LENGTH);
289293

290294
// send request header and await 100-continue
291295
var req = httpRequest(id, acceptableContentLength);
@@ -317,7 +321,7 @@ public void test413TooLargeOnExpect100Continue() throws Exception {
317321
try (var ctx = setupClientCtx()) {
318322
for (int reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
319323
var id = opaqueId(reqNo);
320-
var oversized = maxContentLength() + 1;
324+
var oversized = MAX_CONTENT_LENGTH + 1;
321325

322326
// send request header and await 413 too large
323327
var req = httpRequest(id, oversized);
@@ -333,32 +337,28 @@ public void test413TooLargeOnExpect100Continue() throws Exception {
333337
}
334338
}
335339

336-
// ensures that oversized chunked encoded request has no limits at http layer
337-
// rest handler is responsible for oversized requests
338-
public void testOversizedChunkedEncodingNoLimits() throws Exception {
340+
// ensures that oversized chunked encoded request has maxContentLength limit and returns 413
341+
public void testOversizedChunkedEncoding() throws Exception {
339342
try (var ctx = setupClientCtx()) {
340-
for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
341-
var id = opaqueId(reqNo);
342-
var contentSize = maxContentLength() + 1;
343-
var content = randomByteArrayOfLength(contentSize);
344-
var is = new ByteBufInputStream(Unpooled.wrappedBuffer(content));
345-
var chunkedIs = new ChunkedStream(is);
346-
var httpChunkedIs = new HttpChunkedInput(chunkedIs, LastHttpContent.EMPTY_LAST_CONTENT);
347-
var req = httpRequest(id, 0);
348-
HttpUtil.setTransferEncodingChunked(req, true);
349-
350-
ctx.clientChannel.pipeline().addLast(new ChunkedWriteHandler());
351-
ctx.clientChannel.writeAndFlush(req);
352-
ctx.clientChannel.writeAndFlush(httpChunkedIs);
353-
var handler = ctx.awaitRestChannelAccepted(id);
354-
var consumed = handler.readAllBytes();
355-
assertEquals(contentSize, consumed);
356-
handler.sendResponse(new RestResponse(RestStatus.OK, ""));
357-
358-
var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
359-
assertEquals(HttpResponseStatus.OK, resp.status());
360-
resp.release();
361-
}
343+
var id = opaqueId(0);
344+
var contentSize = MAX_CONTENT_LENGTH + 1;
345+
var content = randomByteArrayOfLength(contentSize);
346+
var is = new ByteBufInputStream(Unpooled.wrappedBuffer(content));
347+
var chunkedIs = new ChunkedStream(is);
348+
var httpChunkedIs = new HttpChunkedInput(chunkedIs, LastHttpContent.EMPTY_LAST_CONTENT);
349+
var req = httpRequest(id, 0);
350+
HttpUtil.setTransferEncodingChunked(req, true);
351+
352+
ctx.clientChannel.pipeline().addLast(new ChunkedWriteHandler());
353+
ctx.clientChannel.writeAndFlush(req);
354+
ctx.clientChannel.writeAndFlush(httpChunkedIs);
355+
var handler = ctx.awaitRestChannelAccepted(id);
356+
var consumed = handler.readAllBytes();
357+
assertTrue(consumed <= MAX_CONTENT_LENGTH);
358+
359+
var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
360+
assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status());
361+
resp.release();
362362
}
363363
}
364364

@@ -369,7 +369,7 @@ public void testBadRequestReleaseQueuedChunks() throws Exception {
369369
try (var ctx = setupClientCtx()) {
370370
for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
371371
var id = opaqueId(reqNo);
372-
var contentSize = randomIntBetween(0, maxContentLength());
372+
var contentSize = randomIntBetween(0, MAX_CONTENT_LENGTH);
373373
var req = httpRequest(id, contentSize);
374374
var content = randomContent(contentSize, true);
375375

@@ -405,7 +405,7 @@ public void testHttpClientStats() throws Exception {
405405

406406
for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
407407
var id = opaqueId(reqNo);
408-
var contentSize = randomIntBetween(0, maxContentLength());
408+
var contentSize = randomIntBetween(0, MAX_CONTENT_LENGTH);
409409
totalBytesSent += contentSize;
410410
ctx.clientChannel.writeAndFlush(httpRequest(id, contentSize));
411411
ctx.clientChannel.writeAndFlush(randomContent(contentSize, true));
@@ -485,10 +485,6 @@ private void assertHttpBodyLogging(Function<Ctx, Runnable> test) throws Exceptio
485485
}
486486
}
487487

488-
private int maxContentLength() {
489-
return HttpHandlingSettings.fromSettings(internalCluster().getInstance(Settings.class)).maxContentLength();
490-
}
491-
492488
private String opaqueId(int reqNo) {
493489
return getTestName() + "-" + reqNo;
494490
}
@@ -658,14 +654,22 @@ void sendResponse(RestResponse response) {
658654
int readBytes(int bytes) {
659655
var consumed = 0;
660656
if (recvLast == false) {
661-
while (consumed < bytes) {
662-
stream.next();
663-
var recvChunk = safePoll(recvChunks);
664-
consumed += recvChunk.chunk.length();
665-
recvChunk.chunk.close();
666-
if (recvChunk.isLast) {
667-
recvLast = true;
668-
break;
657+
stream.next();
658+
while (consumed < bytes && streamClosed == false) {
659+
try {
660+
var recvChunk = recvChunks.poll(10, TimeUnit.MILLISECONDS);
661+
if (recvChunk != null) {
662+
consumed += recvChunk.chunk.length();
663+
recvChunk.chunk.close();
664+
if (recvChunk.isLast) {
665+
recvLast = true;
666+
break;
667+
}
668+
stream.next();
669+
}
670+
} catch (InterruptedException e) {
671+
Thread.currentThread().interrupt();
672+
throw new AssertionError(e);
669673
}
670674
}
671675
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java

Lines changed: 6 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,10 @@
1111

1212
import io.netty.channel.ChannelHandlerContext;
1313
import io.netty.handler.codec.http.FullHttpRequest;
14-
import io.netty.handler.codec.http.FullHttpResponse;
15-
import io.netty.handler.codec.http.HttpContent;
1614
import io.netty.handler.codec.http.HttpObject;
1715
import io.netty.handler.codec.http.HttpObjectAggregator;
1816
import io.netty.handler.codec.http.HttpRequest;
19-
import io.netty.handler.codec.http.HttpResponseStatus;
20-
import io.netty.handler.codec.http.HttpUtil;
17+
import io.netty.handler.codec.http.HttpRequestDecoder;
2118

2219
import org.elasticsearch.http.HttpPreRequest;
2320
import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils;
@@ -27,18 +24,19 @@
2724
/**
2825
* A wrapper around {@link HttpObjectAggregator}. Provides optional content aggregation based on
2926
* predicate. {@link HttpObjectAggregator} also handles Expect: 100-continue and oversized content.
30-
* Unfortunately, Netty does not provide handlers for oversized messages beyond HttpObjectAggregator.
27+
* Provides content size handling for non-aggregated requests too.
3128
*/
3229
public class Netty4HttpAggregator extends HttpObjectAggregator {
3330
private static final Predicate<HttpPreRequest> IGNORE_TEST = (req) -> req.uri().startsWith("/_test/request-stream") == false;
3431

3532
private final Predicate<HttpPreRequest> decider;
33+
private final Netty4HttpContentSizeHandler streamContentSizeHandler;
3634
private boolean aggregating = true;
37-
private boolean ignoreContentAfterContinueResponse = false;
3835

39-
public Netty4HttpAggregator(int maxContentLength, Predicate<HttpPreRequest> decider) {
36+
public Netty4HttpAggregator(int maxContentLength, Predicate<HttpPreRequest> decider, HttpRequestDecoder decoder) {
4037
super(maxContentLength);
4138
this.decider = decider;
39+
this.streamContentSizeHandler = new Netty4HttpContentSizeHandler(decoder, maxContentLength);
4240
}
4341

4442
@Override
@@ -51,34 +49,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
5149
if (aggregating || msg instanceof FullHttpRequest) {
5250
super.channelRead(ctx, msg);
5351
} else {
54-
handle(ctx, (HttpObject) msg);
55-
}
56-
}
57-
58-
private void handle(ChannelHandlerContext ctx, HttpObject msg) {
59-
if (msg instanceof HttpRequest request) {
60-
var continueResponse = newContinueResponse(request, maxContentLength(), ctx.pipeline());
61-
if (continueResponse != null) {
62-
// there are 3 responses expected: 100, 413, 417
63-
// on 100 we pass request further and reply to client to continue
64-
// on 413/417 we ignore following content
65-
ctx.writeAndFlush(continueResponse);
66-
var resp = (FullHttpResponse) continueResponse;
67-
if (resp.status() != HttpResponseStatus.CONTINUE) {
68-
ignoreContentAfterContinueResponse = true;
69-
return;
70-
}
71-
HttpUtil.set100ContinueExpected(request, false);
72-
}
73-
ignoreContentAfterContinueResponse = false;
74-
ctx.fireChannelRead(msg);
75-
} else {
76-
var httpContent = (HttpContent) msg;
77-
if (ignoreContentAfterContinueResponse) {
78-
httpContent.release();
79-
} else {
80-
ctx.fireChannelRead(msg);
81-
}
52+
streamContentSizeHandler.channelRead(ctx, msg);
8253
}
8354
}
8455
}

0 commit comments

Comments
 (0)