Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/129302.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 129302
summary: Move HTTP content aggregation from Netty into `RestController`
area: Network
type: enhancement
issues:
- 120746
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,11 @@ public String getName() {
return ROUTE;
}

@Override
public boolean supportsContentStream() {
return true;
}

@Override
public List<Route> routes() {
return List.of(new Route(RestRequest.Method.POST, ROUTE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (pendingRead == false) {
long now = timer.absoluteTimeInMillis();
if (now >= lastRead + interval) {
// if you encounter this warning during test make sure you consume content of RestRequest if it's a stream
// or use AggregatingDispatcher that will consume stream fully and produce RestRequest with full content.
logger.warn("chan-id={} haven't read from channel for [{}ms]", ctx.channel().id(), (now - lastRead));
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
Expand Down Expand Up @@ -131,30 +130,25 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
} else {
nonError = (Exception) cause;
}
netty4HttpRequest = new Netty4HttpRequest(readSequence++, (FullHttpRequest) request, nonError);
netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, nonError);
} else {
assert currentRequestStream == null : "current stream must be null for new request";
if (request instanceof FullHttpRequest fullHttpRequest) {
netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest);
currentRequestStream = null;
} else {
var contentStream = new Netty4HttpRequestBodyStream(ctx, serverTransport.getThreadPool().getThreadContext());
currentRequestStream = contentStream;
netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream);
shouldRead = false;
}
var contentStream = new Netty4HttpRequestBodyStream(ctx, serverTransport.getThreadPool().getThreadContext());
currentRequestStream = contentStream;
netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream);
shouldRead = false;
}
handlePipelinedRequest(ctx, netty4HttpRequest);
} else {
assert msg instanceof HttpContent : "expect HttpContent got " + msg;
assert currentRequestStream != null : "current stream must exists before handling http content";
shouldRead = false;
currentRequestStream.handleNettyContent((HttpContent) msg);
if (msg instanceof LastHttpContent) {
currentRequestStream = null;
}
}
} finally {
if (msg instanceof LastHttpContent) {
currentRequestStream = null;
Comment on lines +149 to +150
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readability nit: this has to happen before the ctx::read above; of course it does today because we're already on the event loop so the dispatched ctx::read happens after this method returns but still it's confusing to write them in this order.

}
if (shouldRead) {
ctx.channel().eventLoop().execute(ctx::read);
}
Expand All @@ -167,7 +161,7 @@ protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpReque
final Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
boolean success = false;
assert Transports.assertDefaultThreadContext(serverTransport.getThreadPool().getThreadContext());
assert Transports.assertTransportThread();
assert ctx.channel().eventLoop().inEventLoop();
try {
serverTransport.incomingRequest(pipelinedRequest, channel);
success = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@

package org.elasticsearch.http.netty4;

import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.EmptyHttpHeaders;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
Expand All @@ -28,7 +26,6 @@
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.netty4.Netty4Utils;

import java.util.AbstractMap;
import java.util.Collection;
Expand All @@ -41,71 +38,57 @@

public class Netty4HttpRequest implements HttpRequest {

private final FullHttpRequest request;
private final HttpBody content;
private final int sequence;
private final io.netty.handler.codec.http.HttpRequest nettyRequest;
private boolean hasContent;
private HttpBody content;
private final Map<String, List<String>> headers;
private final AtomicBoolean released;
private final Exception inboundException;
private final boolean pooled;
private final int sequence;
private final QueryStringDecoder queryStringDecoder;

Netty4HttpRequest(int sequence, io.netty.handler.codec.http.HttpRequest request, Netty4HttpRequestBodyStream contentStream) {
this(
sequence,
new DefaultFullHttpRequest(
request.protocolVersion(),
request.method(),
request.uri(),
Unpooled.EMPTY_BUFFER,
request.headers(),
EmptyHttpHeaders.INSTANCE
),
new AtomicBoolean(false),
true,
contentStream,
null
);
}

Netty4HttpRequest(int sequence, FullHttpRequest request) {
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.fullHttpBodyFrom(request.content()));
public Netty4HttpRequest(int sequence, io.netty.handler.codec.http.HttpRequest nettyRequest, Exception exception) {
this(sequence, nettyRequest, HttpBody.empty(), new AtomicBoolean(false), exception);
}

Netty4HttpRequest(int sequence, FullHttpRequest request, Exception inboundException) {
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.fullHttpBodyFrom(request.content()), inboundException);
}

private Netty4HttpRequest(int sequence, FullHttpRequest request, AtomicBoolean released, boolean pooled, HttpBody content) {
this(sequence, request, released, pooled, content, null);
public Netty4HttpRequest(int sequence, io.netty.handler.codec.http.HttpRequest nettyRequest, HttpBody content) {
this(sequence, nettyRequest, content, new AtomicBoolean(false), null);
}

private Netty4HttpRequest(
int sequence,
FullHttpRequest request,
AtomicBoolean released,
boolean pooled,
io.netty.handler.codec.http.HttpRequest nettyRequest,
HttpBody content,
AtomicBoolean released,
Exception inboundException
) {
this.sequence = sequence;
this.request = request;
this.headers = getHttpHeadersAsMap(request.headers());
this.nettyRequest = nettyRequest;
this.hasContent = hasContentHeader(nettyRequest);
this.content = content;
this.pooled = pooled;
this.headers = getHttpHeadersAsMap(nettyRequest.headers());
this.released = released;
this.inboundException = inboundException;
this.queryStringDecoder = new QueryStringDecoder(request.uri());
this.queryStringDecoder = new QueryStringDecoder(nettyRequest.uri());
}

private static boolean hasContentHeader(io.netty.handler.codec.http.HttpRequest nettyRequest) {
return HttpUtil.isTransferEncodingChunked(nettyRequest) || HttpUtil.getContentLength(nettyRequest, 0L) > 0;
}

@Override
public boolean hasContent() {
return hasContent;
}

@Override
public RestRequest.Method method() {
return translateRequestMethod(request.method());
return translateRequestMethod(nettyRequest.method());
}

@Override
public String uri() {
return request.uri();
return nettyRequest.uri();
}

@Override
Expand All @@ -119,10 +102,17 @@ public HttpBody body() {
return content;
}

@Override
public void setBody(HttpBody body) {
assert this.content.isStream() : "only stream content can be replaced";
assert body.isFull() : "only full content can replace stream";
this.content = body;
this.hasContent = body.isEmpty() == false;
}

@Override
public void release() {
if (pooled && released.compareAndSet(false, true)) {
request.release();
if (released.compareAndSet(false, true)) {
content.close();
}
}
Expand All @@ -134,7 +124,7 @@ public final Map<String, List<String>> getHeaders() {

@Override
public List<String> strictCookies() {
String cookieString = request.headers().get(HttpHeaderNames.COOKIE);
String cookieString = nettyRequest.headers().get(HttpHeaderNames.COOKIE);
if (cookieString != null) {
Set<Cookie> cookies = ServerCookieDecoder.STRICT.decode(cookieString);
if (cookies.isEmpty() == false) {
Expand All @@ -146,40 +136,36 @@ public List<String> strictCookies() {

@Override
public HttpVersion protocolVersion() {
if (request.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_0)) {
return HttpRequest.HttpVersion.HTTP_1_0;
} else if (request.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_1)) {
return HttpRequest.HttpVersion.HTTP_1_1;
if (nettyRequest.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_0)) {
return HttpVersion.HTTP_1_0;
} else if (nettyRequest.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_1)) {
return HttpVersion.HTTP_1_1;
} else {
throw new IllegalArgumentException("Unexpected http protocol version: " + request.protocolVersion());
throw new IllegalArgumentException("Unexpected http protocol version: " + nettyRequest.protocolVersion());
}
}

@Override
public HttpRequest removeHeader(String header) {
HttpHeaders copiedHeadersWithout = request.headers().copy();
HttpHeaders copiedHeadersWithout = nettyRequest.headers().copy();
copiedHeadersWithout.remove(header);
HttpHeaders copiedTrailingHeadersWithout = request.trailingHeaders().copy();
copiedTrailingHeadersWithout.remove(header);
FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(
request.protocolVersion(),
request.method(),
request.uri(),
request.content(),
copiedHeadersWithout,
copiedTrailingHeadersWithout
var requestWithoutHeader = new DefaultHttpRequest(
nettyRequest.protocolVersion(),
nettyRequest.method(),
nettyRequest.uri(),
copiedHeadersWithout
);
return new Netty4HttpRequest(sequence, requestWithoutHeader, released, pooled, content);
return new Netty4HttpRequest(sequence, requestWithoutHeader, content, released, null);
}

@Override
public Netty4FullHttpResponse createResponse(RestStatus status, BytesReference contentRef) {
return new Netty4FullHttpResponse(sequence, request.protocolVersion(), status, contentRef);
return new Netty4FullHttpResponse(sequence, nettyRequest.protocolVersion(), status, contentRef);
}

@Override
public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBodyPart firstBodyPart) {
return new Netty4ChunkedHttpResponse(sequence, request.protocolVersion(), status, firstBodyPart);
return new Netty4ChunkedHttpResponse(sequence, nettyRequest.protocolVersion(), status, firstBodyPart);
}

@Override
Expand All @@ -188,7 +174,7 @@ public Exception getInboundException() {
}

public io.netty.handler.codec.http.HttpRequest getNettyRequest() {
return request;
return nettyRequest;
}

public static RestRequest.Method translateRequestMethod(HttpMethod httpMethod) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ public void next() {

public void handleNettyContent(HttpContent httpContent) {
assert ctx.channel().eventLoop().inEventLoop() : Thread.currentThread().getName();
assert readLastChunk == false;
if (closing) {
httpContent.release();
read();
} else {
assert readLastChunk == false;
try (var ignored = threadContext.restoreExistingContext(requestContext)) {
var isLast = httpContent instanceof LastHttpContent;
var buf = Netty4Utils.toReleasableBytesReference(httpContent.content());
Expand Down Expand Up @@ -105,17 +105,19 @@ public void close() {

private void doClose() {
assert ctx.channel().eventLoop().inEventLoop() : Thread.currentThread().getName();
closing = true;
try (var ignored = threadContext.restoreExistingContext(requestContext)) {
for (var tracer : tracingHandlers) {
Releasables.closeExpectNoException(tracer);
if (closing == false) {
closing = true;
try (var ignored = threadContext.restoreExistingContext(requestContext)) {
for (var tracer : tracingHandlers) {
Releasables.closeExpectNoException(tracer);
}
if (handler != null) {
handler.close();
}
}
if (handler != null) {
handler.close();
if (readLastChunk == false) {
read();
}
}
if (readLastChunk == false) {
read();
}
}
}
Loading
Loading