Skip to content

Commit 0ab3677

Browse files
ldematteTim-Brooks
authored andcommitted
Revert "Add http stream content size handler (elastic#120246)" (elastic#120934)
This reverts commit 43e3e24.
1 parent e2a664f commit 0ab3677

File tree

5 files changed

+81
-463
lines changed

5 files changed

+81
-463
lines changed

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java

Lines changed: 45 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.elasticsearch.common.util.CollectionUtils;
5555
import org.elasticsearch.features.NodeFeature;
5656
import org.elasticsearch.http.HttpBodyTracer;
57+
import org.elasticsearch.http.HttpHandlingSettings;
5758
import org.elasticsearch.http.HttpServerTransport;
5859
import org.elasticsearch.http.HttpTransportSettings;
5960
import org.elasticsearch.plugins.ActionPlugin;
@@ -92,15 +93,13 @@
9293
@ESIntegTestCase.ClusterScope(numDataNodes = 1)
9394
public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
9495

95-
private static final int MAX_CONTENT_LENGTH = ByteSizeUnit.MB.toIntBytes(50);
96-
9796
@Override
9897
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
9998
Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
10099

101100
builder.put(
102101
HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(),
103-
new ByteSizeValue(MAX_CONTENT_LENGTH, ByteSizeUnit.BYTES)
102+
new ByteSizeValue(ByteSizeUnit.MB.toIntBytes(50), ByteSizeUnit.BYTES)
104103
);
105104
return builder.build();
106105
}
@@ -140,7 +139,7 @@ public void testReceiveAllChunks() throws Exception {
140139
var opaqueId = opaqueId(reqNo);
141140

142141
// this dataset will be compared with one on server side
143-
var dataSize = randomIntBetween(1024, MAX_CONTENT_LENGTH);
142+
var dataSize = randomIntBetween(1024, maxContentLength());
144143
var sendData = Unpooled.wrappedBuffer(randomByteArrayOfLength(dataSize));
145144
sendData.retain();
146145
ctx.clientChannel.writeAndFlush(fullHttpRequest(opaqueId, sendData));
@@ -248,7 +247,7 @@ public void testServerExceptionMidStream() throws Exception {
248247
public void testClientBackpressure() throws Exception {
249248
try (var ctx = setupClientCtx()) {
250249
var opaqueId = opaqueId(0);
251-
var payloadSize = MAX_CONTENT_LENGTH;
250+
var payloadSize = maxContentLength();
252251
var totalParts = 10;
253252
var partSize = payloadSize / totalParts;
254253
ctx.clientChannel.writeAndFlush(httpRequest(opaqueId, payloadSize));
@@ -290,7 +289,7 @@ public void test100Continue() throws Exception {
290289
try (var ctx = setupClientCtx()) {
291290
for (int reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
292291
var id = opaqueId(reqNo);
293-
var acceptableContentLength = randomIntBetween(0, MAX_CONTENT_LENGTH);
292+
var acceptableContentLength = randomIntBetween(0, maxContentLength());
294293

295294
// send request header and await 100-continue
296295
var req = httpRequest(id, acceptableContentLength);
@@ -322,7 +321,7 @@ public void test413TooLargeOnExpect100Continue() throws Exception {
322321
try (var ctx = setupClientCtx()) {
323322
for (int reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
324323
var id = opaqueId(reqNo);
325-
var oversized = MAX_CONTENT_LENGTH + 1;
324+
var oversized = maxContentLength() + 1;
326325

327326
// send request header and await 413 too large
328327
var req = httpRequest(id, oversized);
@@ -338,28 +337,32 @@ public void test413TooLargeOnExpect100Continue() throws Exception {
338337
}
339338
}
340339

341-
// ensures that oversized chunked encoded request has maxContentLength limit and returns 413
342-
public void testOversizedChunkedEncoding() throws Exception {
340+
// ensures that oversized chunked encoded request has no limits at http layer
341+
// rest handler is responsible for oversized requests
342+
public void testOversizedChunkedEncodingNoLimits() throws Exception {
343343
try (var ctx = setupClientCtx()) {
344-
var id = opaqueId(0);
345-
var contentSize = MAX_CONTENT_LENGTH + 1;
346-
var content = randomByteArrayOfLength(contentSize);
347-
var is = new ByteBufInputStream(Unpooled.wrappedBuffer(content));
348-
var chunkedIs = new ChunkedStream(is);
349-
var httpChunkedIs = new HttpChunkedInput(chunkedIs, LastHttpContent.EMPTY_LAST_CONTENT);
350-
var req = httpRequest(id, 0);
351-
HttpUtil.setTransferEncodingChunked(req, true);
352-
353-
ctx.clientChannel.pipeline().addLast(new ChunkedWriteHandler());
354-
ctx.clientChannel.writeAndFlush(req);
355-
ctx.clientChannel.writeAndFlush(httpChunkedIs);
356-
var handler = ctx.awaitRestChannelAccepted(id);
357-
var consumed = handler.readAllBytes();
358-
assertTrue(consumed <= MAX_CONTENT_LENGTH);
359-
360-
var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
361-
assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status());
362-
resp.release();
344+
for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
345+
var id = opaqueId(reqNo);
346+
var contentSize = maxContentLength() + 1;
347+
var content = randomByteArrayOfLength(contentSize);
348+
var is = new ByteBufInputStream(Unpooled.wrappedBuffer(content));
349+
var chunkedIs = new ChunkedStream(is);
350+
var httpChunkedIs = new HttpChunkedInput(chunkedIs, LastHttpContent.EMPTY_LAST_CONTENT);
351+
var req = httpRequest(id, 0);
352+
HttpUtil.setTransferEncodingChunked(req, true);
353+
354+
ctx.clientChannel.pipeline().addLast(new ChunkedWriteHandler());
355+
ctx.clientChannel.writeAndFlush(req);
356+
ctx.clientChannel.writeAndFlush(httpChunkedIs);
357+
var handler = ctx.awaitRestChannelAccepted(id);
358+
var consumed = handler.readAllBytes();
359+
assertEquals(contentSize, consumed);
360+
handler.sendResponse(new RestResponse(RestStatus.OK, ""));
361+
362+
var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
363+
assertEquals(HttpResponseStatus.OK, resp.status());
364+
resp.release();
365+
}
363366
}
364367
}
365368

@@ -370,7 +373,7 @@ public void testBadRequestReleaseQueuedChunks() throws Exception {
370373
try (var ctx = setupClientCtx()) {
371374
for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
372375
var id = opaqueId(reqNo);
373-
var contentSize = randomIntBetween(0, MAX_CONTENT_LENGTH);
376+
var contentSize = randomIntBetween(0, maxContentLength());
374377
var req = httpRequest(id, contentSize);
375378
var content = randomContent(contentSize, true);
376379

@@ -406,7 +409,7 @@ public void testHttpClientStats() throws Exception {
406409

407410
for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
408411
var id = opaqueId(reqNo);
409-
var contentSize = randomIntBetween(0, MAX_CONTENT_LENGTH);
412+
var contentSize = randomIntBetween(0, maxContentLength());
410413
totalBytesSent += contentSize;
411414
ctx.clientChannel.writeAndFlush(httpRequest(id, contentSize));
412415
ctx.clientChannel.writeAndFlush(randomContent(contentSize, true));
@@ -486,6 +489,10 @@ private void assertHttpBodyLogging(Function<Ctx, Runnable> test) throws Exceptio
486489
}
487490
}
488491

492+
private int maxContentLength() {
493+
return HttpHandlingSettings.fromSettings(internalCluster().getInstance(Settings.class)).maxContentLength();
494+
}
495+
489496
private String opaqueId(int reqNo) {
490497
return getTestName() + "-" + reqNo;
491498
}
@@ -655,22 +662,14 @@ void sendResponse(RestResponse response) {
655662
int readBytes(int bytes) {
656663
var consumed = 0;
657664
if (recvLast == false) {
658-
stream.next();
659-
while (consumed < bytes && streamClosed == false) {
660-
try {
661-
var recvChunk = recvChunks.poll(10, TimeUnit.MILLISECONDS);
662-
if (recvChunk != null) {
663-
consumed += recvChunk.chunk.length();
664-
recvChunk.chunk.close();
665-
if (recvChunk.isLast) {
666-
recvLast = true;
667-
break;
668-
}
669-
stream.next();
670-
}
671-
} catch (InterruptedException e) {
672-
Thread.currentThread().interrupt();
673-
throw new AssertionError(e);
665+
while (consumed < bytes) {
666+
stream.next();
667+
var recvChunk = safePoll(recvChunks);
668+
consumed += recvChunk.chunk.length();
669+
recvChunk.chunk.close();
670+
if (recvChunk.isLast) {
671+
recvLast = true;
672+
break;
674673
}
675674
}
676675
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@
1111

1212
import io.netty.channel.ChannelHandlerContext;
1313
import io.netty.handler.codec.http.FullHttpRequest;
14+
import io.netty.handler.codec.http.FullHttpResponse;
15+
import io.netty.handler.codec.http.HttpContent;
1416
import io.netty.handler.codec.http.HttpObject;
1517
import io.netty.handler.codec.http.HttpObjectAggregator;
1618
import io.netty.handler.codec.http.HttpRequest;
17-
import io.netty.handler.codec.http.HttpRequestDecoder;
19+
import io.netty.handler.codec.http.HttpResponseStatus;
20+
import io.netty.handler.codec.http.HttpUtil;
1821

1922
import org.elasticsearch.http.HttpPreRequest;
2023
import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils;
@@ -24,19 +27,18 @@
2427
/**
2528
* A wrapper around {@link HttpObjectAggregator}. Provides optional content aggregation based on
2629
* predicate. {@link HttpObjectAggregator} also handles Expect: 100-continue and oversized content.
27-
* Provides content size handling for non-aggregated requests too.
30+
* Unfortunately, Netty does not provide handlers for oversized messages beyond HttpObjectAggregator.
2831
*/
2932
public class Netty4HttpAggregator extends HttpObjectAggregator {
3033
private static final Predicate<HttpPreRequest> IGNORE_TEST = (req) -> req.uri().startsWith("/_test/request-stream") == false;
3134

3235
private final Predicate<HttpPreRequest> decider;
33-
private final Netty4HttpContentSizeHandler streamContentSizeHandler;
3436
private boolean aggregating = true;
37+
private boolean ignoreContentAfterContinueResponse = false;
3538

36-
public Netty4HttpAggregator(int maxContentLength, Predicate<HttpPreRequest> decider, HttpRequestDecoder decoder) {
39+
public Netty4HttpAggregator(int maxContentLength, Predicate<HttpPreRequest> decider) {
3740
super(maxContentLength);
3841
this.decider = decider;
39-
this.streamContentSizeHandler = new Netty4HttpContentSizeHandler(decoder, maxContentLength);
4042
}
4143

4244
@Override
@@ -49,7 +51,34 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
4951
if (aggregating || msg instanceof FullHttpRequest) {
5052
super.channelRead(ctx, msg);
5153
} else {
52-
streamContentSizeHandler.channelRead(ctx, msg);
54+
handle(ctx, (HttpObject) msg);
55+
}
56+
}
57+
58+
private void handle(ChannelHandlerContext ctx, HttpObject msg) {
59+
if (msg instanceof HttpRequest request) {
60+
var continueResponse = newContinueResponse(request, maxContentLength(), ctx.pipeline());
61+
if (continueResponse != null) {
62+
// there are 3 responses expected: 100, 413, 417
63+
// on 100 we pass request further and reply to client to continue
64+
// on 413/417 we ignore following content
65+
ctx.writeAndFlush(continueResponse);
66+
var resp = (FullHttpResponse) continueResponse;
67+
if (resp.status() != HttpResponseStatus.CONTINUE) {
68+
ignoreContentAfterContinueResponse = true;
69+
return;
70+
}
71+
HttpUtil.set100ContinueExpected(request, false);
72+
}
73+
ignoreContentAfterContinueResponse = false;
74+
ctx.fireChannelRead(msg);
75+
} else {
76+
var httpContent = (HttpContent) msg;
77+
if (ignoreContentAfterContinueResponse) {
78+
httpContent.release();
79+
} else {
80+
ctx.fireChannelRead(msg);
81+
}
5382
}
5483
}
5584
}

0 commit comments

Comments
 (0)