Skip to content

Commit 570efa2

Browse files
authored
Revert "Add http stream content size handler (#120246)" (#120934)
This reverts commit 43e3e24.
1 parent 241b998 commit 570efa2

File tree

5 files changed

+81
-466
lines changed

5 files changed

+81
-466
lines changed

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java

Lines changed: 45 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.elasticsearch.common.util.CollectionUtils;
5555
import org.elasticsearch.features.NodeFeature;
5656
import org.elasticsearch.http.HttpBodyTracer;
57+
import org.elasticsearch.http.HttpHandlingSettings;
5758
import org.elasticsearch.http.HttpServerTransport;
5859
import org.elasticsearch.http.HttpTransportSettings;
5960
import org.elasticsearch.plugins.ActionPlugin;
@@ -92,15 +93,10 @@
9293
@ESIntegTestCase.ClusterScope(numDataNodes = 1)
9394
public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
9495

95-
private static final int MAX_CONTENT_LENGTH = ByteSizeUnit.MB.toIntBytes(50);
96-
9796
@Override
9897
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
9998
Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
100-
builder.put(
101-
HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(),
102-
ByteSizeValue.of(MAX_CONTENT_LENGTH, ByteSizeUnit.BYTES)
103-
);
99+
builder.put(HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(), ByteSizeValue.of(50, ByteSizeUnit.MB));
104100
return builder.build();
105101
}
106102

@@ -139,7 +135,7 @@ public void testReceiveAllChunks() throws Exception {
139135
var opaqueId = opaqueId(reqNo);
140136

141137
// this dataset will be compared with one on server side
142-
var dataSize = randomIntBetween(1024, MAX_CONTENT_LENGTH);
138+
var dataSize = randomIntBetween(1024, maxContentLength());
143139
var sendData = Unpooled.wrappedBuffer(randomByteArrayOfLength(dataSize));
144140
sendData.retain();
145141
ctx.clientChannel.writeAndFlush(fullHttpRequest(opaqueId, sendData));
@@ -247,7 +243,7 @@ public void testServerExceptionMidStream() throws Exception {
247243
public void testClientBackpressure() throws Exception {
248244
try (var ctx = setupClientCtx()) {
249245
var opaqueId = opaqueId(0);
250-
var payloadSize = MAX_CONTENT_LENGTH;
246+
var payloadSize = maxContentLength();
251247
var totalParts = 10;
252248
var partSize = payloadSize / totalParts;
253249
ctx.clientChannel.writeAndFlush(httpRequest(opaqueId, payloadSize));
@@ -289,7 +285,7 @@ public void test100Continue() throws Exception {
289285
try (var ctx = setupClientCtx()) {
290286
for (int reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
291287
var id = opaqueId(reqNo);
292-
var acceptableContentLength = randomIntBetween(0, MAX_CONTENT_LENGTH);
288+
var acceptableContentLength = randomIntBetween(0, maxContentLength());
293289

294290
// send request header and await 100-continue
295291
var req = httpRequest(id, acceptableContentLength);
@@ -321,7 +317,7 @@ public void test413TooLargeOnExpect100Continue() throws Exception {
321317
try (var ctx = setupClientCtx()) {
322318
for (int reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
323319
var id = opaqueId(reqNo);
324-
var oversized = MAX_CONTENT_LENGTH + 1;
320+
var oversized = maxContentLength() + 1;
325321

326322
// send request header and await 413 too large
327323
var req = httpRequest(id, oversized);
@@ -337,28 +333,32 @@ public void test413TooLargeOnExpect100Continue() throws Exception {
337333
}
338334
}
339335

340-
// ensures that oversized chunked encoded request has maxContentLength limit and returns 413
341-
public void testOversizedChunkedEncoding() throws Exception {
336+
// ensures that oversized chunked encoded request has no limits at http layer
337+
// rest handler is responsible for oversized requests
338+
public void testOversizedChunkedEncodingNoLimits() throws Exception {
342339
try (var ctx = setupClientCtx()) {
343-
var id = opaqueId(0);
344-
var contentSize = MAX_CONTENT_LENGTH + 1;
345-
var content = randomByteArrayOfLength(contentSize);
346-
var is = new ByteBufInputStream(Unpooled.wrappedBuffer(content));
347-
var chunkedIs = new ChunkedStream(is);
348-
var httpChunkedIs = new HttpChunkedInput(chunkedIs, LastHttpContent.EMPTY_LAST_CONTENT);
349-
var req = httpRequest(id, 0);
350-
HttpUtil.setTransferEncodingChunked(req, true);
351-
352-
ctx.clientChannel.pipeline().addLast(new ChunkedWriteHandler());
353-
ctx.clientChannel.writeAndFlush(req);
354-
ctx.clientChannel.writeAndFlush(httpChunkedIs);
355-
var handler = ctx.awaitRestChannelAccepted(id);
356-
var consumed = handler.readAllBytes();
357-
assertTrue(consumed <= MAX_CONTENT_LENGTH);
358-
359-
var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
360-
assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status());
361-
resp.release();
340+
for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
341+
var id = opaqueId(reqNo);
342+
var contentSize = maxContentLength() + 1;
343+
var content = randomByteArrayOfLength(contentSize);
344+
var is = new ByteBufInputStream(Unpooled.wrappedBuffer(content));
345+
var chunkedIs = new ChunkedStream(is);
346+
var httpChunkedIs = new HttpChunkedInput(chunkedIs, LastHttpContent.EMPTY_LAST_CONTENT);
347+
var req = httpRequest(id, 0);
348+
HttpUtil.setTransferEncodingChunked(req, true);
349+
350+
ctx.clientChannel.pipeline().addLast(new ChunkedWriteHandler());
351+
ctx.clientChannel.writeAndFlush(req);
352+
ctx.clientChannel.writeAndFlush(httpChunkedIs);
353+
var handler = ctx.awaitRestChannelAccepted(id);
354+
var consumed = handler.readAllBytes();
355+
assertEquals(contentSize, consumed);
356+
handler.sendResponse(new RestResponse(RestStatus.OK, ""));
357+
358+
var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
359+
assertEquals(HttpResponseStatus.OK, resp.status());
360+
resp.release();
361+
}
362362
}
363363
}
364364

@@ -369,7 +369,7 @@ public void testBadRequestReleaseQueuedChunks() throws Exception {
369369
try (var ctx = setupClientCtx()) {
370370
for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
371371
var id = opaqueId(reqNo);
372-
var contentSize = randomIntBetween(0, MAX_CONTENT_LENGTH);
372+
var contentSize = randomIntBetween(0, maxContentLength());
373373
var req = httpRequest(id, contentSize);
374374
var content = randomContent(contentSize, true);
375375

@@ -405,7 +405,7 @@ public void testHttpClientStats() throws Exception {
405405

406406
for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
407407
var id = opaqueId(reqNo);
408-
var contentSize = randomIntBetween(0, MAX_CONTENT_LENGTH);
408+
var contentSize = randomIntBetween(0, maxContentLength());
409409
totalBytesSent += contentSize;
410410
ctx.clientChannel.writeAndFlush(httpRequest(id, contentSize));
411411
ctx.clientChannel.writeAndFlush(randomContent(contentSize, true));
@@ -485,6 +485,10 @@ private void assertHttpBodyLogging(Function<Ctx, Runnable> test) throws Exceptio
485485
}
486486
}
487487

488+
private int maxContentLength() {
489+
return HttpHandlingSettings.fromSettings(internalCluster().getInstance(Settings.class)).maxContentLength();
490+
}
491+
488492
private String opaqueId(int reqNo) {
489493
return getTestName() + "-" + reqNo;
490494
}
@@ -654,22 +658,14 @@ void sendResponse(RestResponse response) {
654658
int readBytes(int bytes) {
655659
var consumed = 0;
656660
if (recvLast == false) {
657-
stream.next();
658-
while (consumed < bytes && streamClosed == false) {
659-
try {
660-
var recvChunk = recvChunks.poll(10, TimeUnit.MILLISECONDS);
661-
if (recvChunk != null) {
662-
consumed += recvChunk.chunk.length();
663-
recvChunk.chunk.close();
664-
if (recvChunk.isLast) {
665-
recvLast = true;
666-
break;
667-
}
668-
stream.next();
669-
}
670-
} catch (InterruptedException e) {
671-
Thread.currentThread().interrupt();
672-
throw new AssertionError(e);
661+
while (consumed < bytes) {
662+
stream.next();
663+
var recvChunk = safePoll(recvChunks);
664+
consumed += recvChunk.chunk.length();
665+
recvChunk.chunk.close();
666+
if (recvChunk.isLast) {
667+
recvLast = true;
668+
break;
673669
}
674670
}
675671
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@
1111

1212
import io.netty.channel.ChannelHandlerContext;
1313
import io.netty.handler.codec.http.FullHttpRequest;
14+
import io.netty.handler.codec.http.FullHttpResponse;
15+
import io.netty.handler.codec.http.HttpContent;
1416
import io.netty.handler.codec.http.HttpObject;
1517
import io.netty.handler.codec.http.HttpObjectAggregator;
1618
import io.netty.handler.codec.http.HttpRequest;
17-
import io.netty.handler.codec.http.HttpRequestDecoder;
19+
import io.netty.handler.codec.http.HttpResponseStatus;
20+
import io.netty.handler.codec.http.HttpUtil;
1821

1922
import org.elasticsearch.http.HttpPreRequest;
2023
import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils;
@@ -24,19 +27,18 @@
2427
/**
2528
* A wrapper around {@link HttpObjectAggregator}. Provides optional content aggregation based on
2629
* predicate. {@link HttpObjectAggregator} also handles Expect: 100-continue and oversized content.
27-
* Provides content size handling for non-aggregated requests too.
30+
* Unfortunately, Netty does not provide handlers for oversized messages beyond HttpObjectAggregator.
2831
*/
2932
public class Netty4HttpAggregator extends HttpObjectAggregator {
3033
private static final Predicate<HttpPreRequest> IGNORE_TEST = (req) -> req.uri().startsWith("/_test/request-stream") == false;
3134

3235
private final Predicate<HttpPreRequest> decider;
33-
private final Netty4HttpContentSizeHandler streamContentSizeHandler;
3436
private boolean aggregating = true;
37+
private boolean ignoreContentAfterContinueResponse = false;
3538

36-
public Netty4HttpAggregator(int maxContentLength, Predicate<HttpPreRequest> decider, HttpRequestDecoder decoder) {
39+
public Netty4HttpAggregator(int maxContentLength, Predicate<HttpPreRequest> decider) {
3740
super(maxContentLength);
3841
this.decider = decider;
39-
this.streamContentSizeHandler = new Netty4HttpContentSizeHandler(decoder, maxContentLength);
4042
}
4143

4244
@Override
@@ -49,7 +51,34 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
4951
if (aggregating || msg instanceof FullHttpRequest) {
5052
super.channelRead(ctx, msg);
5153
} else {
52-
streamContentSizeHandler.channelRead(ctx, msg);
54+
handle(ctx, (HttpObject) msg);
55+
}
56+
}
57+
58+
private void handle(ChannelHandlerContext ctx, HttpObject msg) {
59+
if (msg instanceof HttpRequest request) {
60+
var continueResponse = newContinueResponse(request, maxContentLength(), ctx.pipeline());
61+
if (continueResponse != null) {
62+
// there are 3 responses expected: 100, 413, 417
63+
// on 100 we pass request further and reply to client to continue
64+
// on 413/417 we ignore following content
65+
ctx.writeAndFlush(continueResponse);
66+
var resp = (FullHttpResponse) continueResponse;
67+
if (resp.status() != HttpResponseStatus.CONTINUE) {
68+
ignoreContentAfterContinueResponse = true;
69+
return;
70+
}
71+
HttpUtil.set100ContinueExpected(request, false);
72+
}
73+
ignoreContentAfterContinueResponse = false;
74+
ctx.fireChannelRead(msg);
75+
} else {
76+
var httpContent = (HttpContent) msg;
77+
if (ignoreContentAfterContinueResponse) {
78+
httpContent.release();
79+
} else {
80+
ctx.fireChannelRead(msg);
81+
}
5382
}
5483
}
5584
}

0 commit comments

Comments
 (0)