Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
d3591ef
Add http request content stream support (#111438)
mhl-b Aug 13, 2024
70d0232
Merge remote-tracking branch 'upstream/main' into partial-rest-requests
mhl-b Aug 19, 2024
7b3f8ac
Merge remote-tracking branch 'origin/main' into partial-rest-requests
Tim-Brooks Aug 20, 2024
a5565f2
Allow incremental bulk request execution (#111865)
Tim-Brooks Aug 20, 2024
db5c1b0
handle 100-continue and oversized streaming request (#112179)
mhl-b Aug 26, 2024
5757e00
release stream chunk queue on bad request (#112227)
mhl-b Aug 27, 2024
fc4d650
Split bulks based on memory usage (#112267)
Tim-Brooks Aug 28, 2024
467ad0a
Merge remote-tracking branch 'origin/main' into partial-rest-requests
Tim-Brooks Aug 28, 2024
9ba403e
Merge remote-tracking branch 'origin/main' into partial-rest-requests
Tim-Brooks Aug 29, 2024
649ee4e
Incremental bulk integration with rest layer (#112154)
Tim-Brooks Aug 30, 2024
8311942
Merge remote-tracking branch 'origin/main' into partial-rest-requests
Tim-Brooks Sep 3, 2024
e1460ce
Ensure incremental bulk setting is set atomically (#112479)
Tim-Brooks Sep 5, 2024
6e8644e
Merge remote-tracking branch 'origin/main' into partial-rest-requests
Tim-Brooks Sep 6, 2024
af9bfe7
Reduce autoread changes in header validator (#112608)
Tim-Brooks Sep 6, 2024
2c234bf
fix leaking listener (#112629)
mhl-b Sep 9, 2024
2caf43c
replace stream queue with composite buffer (#112685)
mhl-b Sep 10, 2024
3b7c4c7
Ensure http content copied for safe buffers (#112767)
Tim-Brooks Sep 11, 2024
cfa2e88
Ensure partial bulks released if channel closes (#112724)
Tim-Brooks Sep 12, 2024
b8f6e03
merge main
mhl-b Sep 12, 2024
930bbb4
Fix pipeline usage in incremental bulk test (#112842)
Tim-Brooks Sep 13, 2024
76ba879
Add tracing http chunk handlers (#112770)
mhl-b Sep 14, 2024
56ec113
Merge remote-tracking branch 'origin/main' into partial-rest-requests
Tim-Brooks Sep 16, 2024
ef42038
Merge remote-tracking branch 'origin/main' into partial-rest-requests
Tim-Brooks Sep 16, 2024
46f9d09
Merge remote-tracking branch 'origin/main' into partial-rest-requests
Tim-Brooks Sep 16, 2024
f18eb37
Merge remote-tracking branch 'origin/main' into partial-rest-requests
Tim-Brooks Sep 16, 2024
8746fb9
Fix shard error propgation bug from merge
Tim-Brooks Sep 16, 2024
ff32d4d
Merge remote-tracking branch 'origin/main' into partial-rest-requests
Tim-Brooks Sep 16, 2024
abc9a7b
Properly handle empty incremental bulk requests (#112974)
Tim-Brooks Sep 16, 2024
dd277f7
Merge remote-tracking branch 'origin/main' into partial-rest-requests
Tim-Brooks Sep 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ public void testIndexMissingBody() throws IOException {
}

public void testBulkMissingBody() throws IOException {
ResponseException responseException = expectThrows(
ResponseException.class,
() -> client().performRequest(new Request(randomBoolean() ? "POST" : "PUT", "/_bulk"))
);
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
request.setJsonEntity("");
ResponseException responseException = expectThrows(ResponseException.class, () -> client().performRequest(request));
assertResponseException(responseException, "request body is required");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.netty.util.ReferenceCounted;

import org.elasticsearch.ESNetty4IntegTestCase;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
Expand Down Expand Up @@ -52,6 +53,8 @@ protected boolean addMockHttpTransport() {
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
// TODO: We do not currently support in flight circuit breaker limits for bulk. However, IndexingPressure applies
.put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), false)
.put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), LIMIT)
.build();
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.http.netty4;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;

import org.elasticsearch.http.HttpPreRequest;
import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils;

import java.util.function.Predicate;

/**
* A wrapper around {@link HttpObjectAggregator}. Provides optional content aggregation based on
* predicate. {@link HttpObjectAggregator} also handles Expect: 100-continue and oversized content.
* Unfortunately, Netty does not provide handlers for oversized messages beyond HttpObjectAggregator.
*/
public class Netty4HttpAggregator extends HttpObjectAggregator {
private static final Predicate<HttpPreRequest> IGNORE_TEST = (req) -> req.uri().startsWith("/_test/request-stream") == false;

private final Predicate<HttpPreRequest> decider;
private boolean aggregating = true;
private boolean ignoreContentAfterContinueResponse = false;

public Netty4HttpAggregator(int maxContentLength, Predicate<HttpPreRequest> decider) {
super(maxContentLength);
this.decider = decider;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
assert msg instanceof HttpObject;
if (msg instanceof HttpRequest request) {
var preReq = HttpHeadersAuthenticatorUtils.asHttpPreRequest(request);
aggregating = decider.test(preReq) && IGNORE_TEST.test(preReq);
}
if (aggregating || msg instanceof FullHttpRequest) {
super.channelRead(ctx, msg);
} else {
handle(ctx, (HttpObject) msg);
}
}

private void handle(ChannelHandlerContext ctx, HttpObject msg) {
if (msg instanceof HttpRequest request) {
var continueResponse = newContinueResponse(request, maxContentLength(), ctx.pipeline());
if (continueResponse != null) {
// there are 3 responses expected: 100, 413, 417
// on 100 we pass request further and reply to client to continue
// on 413/417 we ignore following content
ctx.writeAndFlush(continueResponse);
var resp = (FullHttpResponse) continueResponse;
if (resp.status() != HttpResponseStatus.CONTINUE) {
ignoreContentAfterContinueResponse = true;
return;
}
HttpUtil.set100ContinueExpected(request, false);
}
ignoreContentAfterContinueResponse = false;
ctx.fireChannelRead(msg);
} else {
var httpContent = (HttpContent) msg;
if (ignoreContentAfterContinueResponse) {
httpContent.release();
} else {
ctx.fireChannelRead(msg);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
pending.add(ReferenceCountUtil.retain(httpObject));
requestStart(ctx);
assert state == QUEUEING_DATA;
assert ctx.channel().config().isAutoRead() == false;
break;
case QUEUEING_DATA:
pending.add(ReferenceCountUtil.retain(httpObject));
Expand All @@ -77,14 +78,14 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (httpObject instanceof LastHttpContent) {
state = WAITING_TO_START;
}
// fall-through
ReferenceCountUtil.release(httpObject);
break;
case DROPPING_DATA_PERMANENTLY:
assert pending.isEmpty();
ReferenceCountUtil.release(httpObject); // consume without enqueuing
ctx.channel().config().setAutoRead(false);
break;
}

setAutoReadForState(ctx, state);
}

private void requestStart(ChannelHandlerContext ctx) {
Expand All @@ -105,6 +106,7 @@ private void requestStart(ChannelHandlerContext ctx) {
}

state = QUEUEING_DATA;
ctx.channel().config().setAutoRead(false);

if (httpRequest == null) {
// this looks like a malformed request and will forward without validation
Expand Down Expand Up @@ -150,6 +152,7 @@ private void forwardFullRequest(ChannelHandlerContext ctx) {
assert ctx.channel().config().isAutoRead() == false;
assert state == QUEUEING_DATA;

ctx.channel().config().setAutoRead(true);
boolean fullRequestForwarded = forwardData(ctx, pending);

assert fullRequestForwarded || pending.isEmpty();
Expand All @@ -161,7 +164,6 @@ private void forwardFullRequest(ChannelHandlerContext ctx) {
}

assert state == WAITING_TO_START || state == QUEUEING_DATA || state == FORWARDING_DATA_UNTIL_NEXT_REQUEST;
setAutoReadForState(ctx, state);
}

private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContext ctx, Exception e) {
Expand All @@ -177,6 +179,8 @@ private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContex
messageToForward = toReplace.replace(Unpooled.EMPTY_BUFFER);
}
messageToForward.setDecoderResult(DecoderResult.failure(e));

ctx.channel().config().setAutoRead(true);
ctx.fireChannelRead(messageToForward);

assert fullRequestDropped || pending.isEmpty();
Expand All @@ -188,7 +192,6 @@ private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContex
}

assert state == WAITING_TO_START || state == QUEUEING_DATA || state == DROPPING_DATA_UNTIL_NEXT_REQUEST;
setAutoReadForState(ctx, state);
}

@Override
Expand Down Expand Up @@ -244,10 +247,6 @@ private static void maybeResizePendingDown(int largeSize, ArrayDeque<HttpObject>
}
}

private static void setAutoReadForState(ChannelHandlerContext ctx, State state) {
ctx.channel().config().setAutoRead((state == QUEUEING_DATA || state == DROPPING_DATA_PERMANENTLY) == false);
}

enum State {
WAITING_TO_START,
QUEUEING_DATA,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.ssl.SslCloseCompletionEvent;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
Expand Down Expand Up @@ -71,6 +74,12 @@ private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, Chu
@Nullable
private ChunkedWrite currentChunkedWrite;

/**
* HTTP request content stream for current request, it's null if there is no current request or request is fully-aggregated
*/
@Nullable
private Netty4HttpRequestBodyStream currentRequestStream;

/*
* The current read and write sequence numbers. Read sequence numbers are attached to requests in the order they are read from the
* channel, and then transferred to responses. A response is not written to the channel context until its sequence number matches the
Expand Down Expand Up @@ -110,23 +119,38 @@ public Netty4HttpPipeliningHandler(
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
activityTracker.startActivity();
try {
assert msg instanceof FullHttpRequest : "Should have fully aggregated message already but saw [" + msg + "]";
final FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
final Netty4HttpRequest netty4HttpRequest;
if (fullHttpRequest.decoderResult().isFailure()) {
final Throwable cause = fullHttpRequest.decoderResult().cause();
final Exception nonError;
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
nonError = new Exception(cause);
if (msg instanceof HttpRequest request) {
final Netty4HttpRequest netty4HttpRequest;
if (request.decoderResult().isFailure()) {
final Throwable cause = request.decoderResult().cause();
final Exception nonError;
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
nonError = new Exception(cause);
} else {
nonError = (Exception) cause;
}
netty4HttpRequest = new Netty4HttpRequest(readSequence++, (FullHttpRequest) request, nonError);
} else {
nonError = (Exception) cause;
assert currentRequestStream == null : "current stream must be null for new request";
if (request instanceof FullHttpRequest fullHttpRequest) {
netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest);
currentRequestStream = null;
} else {
var contentStream = new Netty4HttpRequestBodyStream(ctx.channel());
currentRequestStream = contentStream;
netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream);
}
}
netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest, nonError);
handlePipelinedRequest(ctx, netty4HttpRequest);
} else {
netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest);
assert msg instanceof HttpContent : "expect HttpContent got " + msg;
assert currentRequestStream != null : "current stream must exists before handling http content";
currentRequestStream.handleNettyContent((HttpContent) msg);
if (msg instanceof LastHttpContent) {
currentRequestStream = null;
}
}
handlePipelinedRequest(ctx, netty4HttpRequest);
} finally {
activityTracker.stopActivity();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.EmptyHttpHeaders;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
Expand All @@ -21,6 +22,7 @@
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.http.HttpBody;
import org.elasticsearch.http.HttpRequest;
import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
Expand All @@ -40,22 +42,40 @@
public class Netty4HttpRequest implements HttpRequest {

private final FullHttpRequest request;
private final BytesReference content;
private final HttpBody content;
private final Map<String, List<String>> headers;
private final AtomicBoolean released;
private final Exception inboundException;
private final boolean pooled;
private final int sequence;

Netty4HttpRequest(int sequence, io.netty.handler.codec.http.HttpRequest request, Netty4HttpRequestBodyStream contentStream) {
this(
sequence,
new DefaultFullHttpRequest(
request.protocolVersion(),
request.method(),
request.uri(),
Unpooled.EMPTY_BUFFER,
request.headers(),
EmptyHttpHeaders.INSTANCE
),
new AtomicBoolean(false),
true,
contentStream,
null
);
}

Netty4HttpRequest(int sequence, FullHttpRequest request) {
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.toBytesReference(request.content()));
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.fullHttpBodyFrom(request.content()));
}

Netty4HttpRequest(int sequence, FullHttpRequest request, Exception inboundException) {
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.toBytesReference(request.content()), inboundException);
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.fullHttpBodyFrom(request.content()), inboundException);
}

private Netty4HttpRequest(int sequence, FullHttpRequest request, AtomicBoolean released, boolean pooled, BytesReference content) {
private Netty4HttpRequest(int sequence, FullHttpRequest request, AtomicBoolean released, boolean pooled, HttpBody content) {
this(sequence, request, released, pooled, content, null);
}

Expand All @@ -64,7 +84,7 @@ private Netty4HttpRequest(
FullHttpRequest request,
AtomicBoolean released,
boolean pooled,
BytesReference content,
HttpBody content,
Exception inboundException
) {
this.sequence = sequence;
Expand All @@ -87,7 +107,7 @@ public String uri() {
}

@Override
public BytesReference content() {
public HttpBody body() {
assert released.get() == false;
return content;
}
Expand All @@ -96,6 +116,7 @@ public BytesReference content() {
public void release() {
if (pooled && released.compareAndSet(false, true)) {
request.release();
content.close();
}
}

Expand All @@ -107,6 +128,12 @@ public HttpRequest releaseAndCopy() {
}
try {
final ByteBuf copiedContent = Unpooled.copiedBuffer(request.content());
HttpBody newContent;
if (content.isStream()) {
newContent = content;
} else {
newContent = Netty4Utils.fullHttpBodyFrom(copiedContent);
}
return new Netty4HttpRequest(
sequence,
new DefaultFullHttpRequest(
Expand All @@ -119,7 +146,7 @@ public HttpRequest releaseAndCopy() {
),
new AtomicBoolean(false),
false,
Netty4Utils.toBytesReference(copiedContent)
newContent
);
} finally {
release();
Expand Down
Loading
Loading