Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.http.HttpBodyTracer;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.plugins.ActionPlugin;
Expand Down Expand Up @@ -93,10 +92,15 @@
@ESIntegTestCase.ClusterScope(numDataNodes = 1)
public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {

private static final int MAX_CONTENT_LENGTH = ByteSizeUnit.MB.toIntBytes(50);

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
builder.put(HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(), ByteSizeValue.of(50, ByteSizeUnit.MB));
builder.put(
HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(),
ByteSizeValue.of(MAX_CONTENT_LENGTH, ByteSizeUnit.BYTES)
);
return builder.build();
}

Expand Down Expand Up @@ -135,7 +139,7 @@ public void testReceiveAllChunks() throws Exception {
var opaqueId = opaqueId(reqNo);

// this dataset will be compared with one on server side
var dataSize = randomIntBetween(1024, maxContentLength());
var dataSize = randomIntBetween(1024, MAX_CONTENT_LENGTH);
var sendData = Unpooled.wrappedBuffer(randomByteArrayOfLength(dataSize));
sendData.retain();
ctx.clientChannel.writeAndFlush(fullHttpRequest(opaqueId, sendData));
Expand Down Expand Up @@ -243,7 +247,7 @@ public void testServerExceptionMidStream() throws Exception {
public void testClientBackpressure() throws Exception {
try (var ctx = setupClientCtx()) {
var opaqueId = opaqueId(0);
var payloadSize = maxContentLength();
var payloadSize = MAX_CONTENT_LENGTH;
var totalParts = 10;
var partSize = payloadSize / totalParts;
ctx.clientChannel.writeAndFlush(httpRequest(opaqueId, payloadSize));
Expand Down Expand Up @@ -285,7 +289,7 @@ public void test100Continue() throws Exception {
try (var ctx = setupClientCtx()) {
for (int reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
var id = opaqueId(reqNo);
var acceptableContentLength = randomIntBetween(0, maxContentLength());
var acceptableContentLength = randomIntBetween(0, MAX_CONTENT_LENGTH);

// send request header and await 100-continue
var req = httpRequest(id, acceptableContentLength);
Expand Down Expand Up @@ -317,7 +321,7 @@ public void test413TooLargeOnExpect100Continue() throws Exception {
try (var ctx = setupClientCtx()) {
for (int reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
var id = opaqueId(reqNo);
var oversized = maxContentLength() + 1;
var oversized = MAX_CONTENT_LENGTH + 1;

// send request header and await 413 too large
var req = httpRequest(id, oversized);
Expand All @@ -333,32 +337,28 @@ public void test413TooLargeOnExpect100Continue() throws Exception {
}
}

// ensures that oversized chunked encoded request has no limits at http layer
// rest handler is responsible for oversized requests
public void testOversizedChunkedEncodingNoLimits() throws Exception {
// ensures that oversized chunked encoded request has maxContentLength limit and returns 413
public void testOversizedChunkedEncoding() throws Exception {
try (var ctx = setupClientCtx()) {
for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
var id = opaqueId(reqNo);
var contentSize = maxContentLength() + 1;
var content = randomByteArrayOfLength(contentSize);
var is = new ByteBufInputStream(Unpooled.wrappedBuffer(content));
var chunkedIs = new ChunkedStream(is);
var httpChunkedIs = new HttpChunkedInput(chunkedIs, LastHttpContent.EMPTY_LAST_CONTENT);
var req = httpRequest(id, 0);
HttpUtil.setTransferEncodingChunked(req, true);

ctx.clientChannel.pipeline().addLast(new ChunkedWriteHandler());
ctx.clientChannel.writeAndFlush(req);
ctx.clientChannel.writeAndFlush(httpChunkedIs);
var handler = ctx.awaitRestChannelAccepted(id);
var consumed = handler.readAllBytes();
assertEquals(contentSize, consumed);
handler.sendResponse(new RestResponse(RestStatus.OK, ""));

var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
assertEquals(HttpResponseStatus.OK, resp.status());
resp.release();
}
var id = opaqueId(0);
var contentSize = MAX_CONTENT_LENGTH + 1;
var content = randomByteArrayOfLength(contentSize);
var is = new ByteBufInputStream(Unpooled.wrappedBuffer(content));
var chunkedIs = new ChunkedStream(is);
var httpChunkedIs = new HttpChunkedInput(chunkedIs, LastHttpContent.EMPTY_LAST_CONTENT);
var req = httpRequest(id, 0);
HttpUtil.setTransferEncodingChunked(req, true);

ctx.clientChannel.pipeline().addLast(new ChunkedWriteHandler());
ctx.clientChannel.writeAndFlush(req);
ctx.clientChannel.writeAndFlush(httpChunkedIs);
var handler = ctx.awaitRestChannelAccepted(id);
var consumed = handler.readAllBytes();
assertTrue(consumed <= MAX_CONTENT_LENGTH);

var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status());
resp.release();
}
}

Expand All @@ -369,7 +369,7 @@ public void testBadRequestReleaseQueuedChunks() throws Exception {
try (var ctx = setupClientCtx()) {
for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
var id = opaqueId(reqNo);
var contentSize = randomIntBetween(0, maxContentLength());
var contentSize = randomIntBetween(0, MAX_CONTENT_LENGTH);
var req = httpRequest(id, contentSize);
var content = randomContent(contentSize, true);

Expand Down Expand Up @@ -405,7 +405,7 @@ public void testHttpClientStats() throws Exception {

for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
var id = opaqueId(reqNo);
var contentSize = randomIntBetween(0, maxContentLength());
var contentSize = randomIntBetween(0, MAX_CONTENT_LENGTH);
totalBytesSent += contentSize;
ctx.clientChannel.writeAndFlush(httpRequest(id, contentSize));
ctx.clientChannel.writeAndFlush(randomContent(contentSize, true));
Expand Down Expand Up @@ -485,10 +485,6 @@ private void assertHttpBodyLogging(Function<Ctx, Runnable> test) throws Exceptio
}
}

private int maxContentLength() {
return HttpHandlingSettings.fromSettings(internalCluster().getInstance(Settings.class)).maxContentLength();
}

private String opaqueId(int reqNo) {
return getTestName() + "-" + reqNo;
}
Expand Down Expand Up @@ -658,14 +654,22 @@ void sendResponse(RestResponse response) {
int readBytes(int bytes) {
var consumed = 0;
if (recvLast == false) {
while (consumed < bytes) {
stream.next();
var recvChunk = safePoll(recvChunks);
consumed += recvChunk.chunk.length();
recvChunk.chunk.close();
if (recvChunk.isLast) {
recvLast = true;
break;
stream.next();
while (consumed < bytes && streamClosed == false) {
try {
var recvChunk = recvChunks.poll(10, TimeUnit.MILLISECONDS);
if (recvChunk != null) {
consumed += recvChunk.chunk.length();
recvChunk.chunk.close();
if (recvChunk.isLast) {
recvLast = true;
break;
}
stream.next();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError(e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpRequestDecoder;

import org.elasticsearch.http.HttpPreRequest;
import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils;
Expand All @@ -27,18 +24,19 @@
/**
* A wrapper around {@link HttpObjectAggregator}. Provides optional content aggregation based on
* predicate. {@link HttpObjectAggregator} also handles Expect: 100-continue and oversized content.
* Unfortunately, Netty does not provide handlers for oversized messages beyond HttpObjectAggregator.
* Provides content size handling for non-aggregated requests too.
*/
public class Netty4HttpAggregator extends HttpObjectAggregator {
private static final Predicate<HttpPreRequest> IGNORE_TEST = (req) -> req.uri().startsWith("/_test/request-stream") == false;

private final Predicate<HttpPreRequest> decider;
private final Netty4HttpContentSizeHandler streamContentSizeHandler;
private boolean aggregating = true;
private boolean ignoreContentAfterContinueResponse = false;

public Netty4HttpAggregator(int maxContentLength, Predicate<HttpPreRequest> decider) {
public Netty4HttpAggregator(int maxContentLength, Predicate<HttpPreRequest> decider, HttpRequestDecoder decoder) {
super(maxContentLength);
this.decider = decider;
this.streamContentSizeHandler = new Netty4HttpContentSizeHandler(decoder, maxContentLength);
}

@Override
Expand All @@ -51,34 +49,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (aggregating || msg instanceof FullHttpRequest) {
super.channelRead(ctx, msg);
} else {
handle(ctx, (HttpObject) msg);
}
}

private void handle(ChannelHandlerContext ctx, HttpObject msg) {
if (msg instanceof HttpRequest request) {
var continueResponse = newContinueResponse(request, maxContentLength(), ctx.pipeline());
if (continueResponse != null) {
// there are 3 responses expected: 100, 413, 417
// on 100 we pass request further and reply to client to continue
// on 413/417 we ignore following content
ctx.writeAndFlush(continueResponse);
var resp = (FullHttpResponse) continueResponse;
if (resp.status() != HttpResponseStatus.CONTINUE) {
ignoreContentAfterContinueResponse = true;
return;
}
HttpUtil.set100ContinueExpected(request, false);
}
ignoreContentAfterContinueResponse = false;
ctx.fireChannelRead(msg);
} else {
var httpContent = (HttpContent) msg;
if (ignoreContentAfterContinueResponse) {
httpContent.release();
} else {
ctx.fireChannelRead(msg);
}
streamContentSizeHandler.channelRead(ctx, msg);
}
}
}
Loading