Skip to content

Commit eeca493

Browse files
authored
Move HTTP content aggregation from Netty into RestController (#129302)
1 parent 083326e commit eeca493

35 files changed

+688
-287
lines changed

docs/changelog/129302.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 129302
2+
summary: Move HTTP content aggregation from Netty into `RestController`
3+
area: Network
4+
type: enhancement
5+
issues:
6+
- 120746

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -920,6 +920,11 @@ public String getName() {
920920
return ROUTE;
921921
}
922922

923+
@Override
924+
public boolean supportsContentStream() {
925+
return true;
926+
}
927+
923928
@Override
924929
public List<Route> routes() {
925930
return List.of(new Route(RestRequest.Method.POST, ROUTE));

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
4747
if (pendingRead == false) {
4848
long now = timer.absoluteTimeInMillis();
4949
if (now >= lastRead + interval) {
50+
// if you encounter this warning during test make sure you consume content of RestRequest if it's a stream
51+
// or use AggregatingDispatcher that will consume stream fully and produce RestRequest with full content.
5052
logger.warn("chan-id={} haven't read from channel for [{}ms]", ctx.channel().id(), (now - lastRead));
5153
}
5254
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java

Lines changed: 0 additions & 59 deletions
This file was deleted.

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.netty.handler.codec.http.DefaultHttpContent;
2020
import io.netty.handler.codec.http.DefaultHttpResponse;
2121
import io.netty.handler.codec.http.DefaultLastHttpContent;
22-
import io.netty.handler.codec.http.FullHttpRequest;
2322
import io.netty.handler.codec.http.HttpContent;
2423
import io.netty.handler.codec.http.HttpObject;
2524
import io.netty.handler.codec.http.HttpRequest;
@@ -131,30 +130,25 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
131130
} else {
132131
nonError = (Exception) cause;
133132
}
134-
netty4HttpRequest = new Netty4HttpRequest(readSequence++, (FullHttpRequest) request, nonError);
133+
netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, nonError);
135134
} else {
136135
assert currentRequestStream == null : "current stream must be null for new request";
137-
if (request instanceof FullHttpRequest fullHttpRequest) {
138-
netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest);
139-
currentRequestStream = null;
140-
} else {
141-
var contentStream = new Netty4HttpRequestBodyStream(ctx, serverTransport.getThreadPool().getThreadContext());
142-
currentRequestStream = contentStream;
143-
netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream);
144-
shouldRead = false;
145-
}
136+
var contentStream = new Netty4HttpRequestBodyStream(ctx, serverTransport.getThreadPool().getThreadContext());
137+
currentRequestStream = contentStream;
138+
netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream);
139+
shouldRead = false;
146140
}
147141
handlePipelinedRequest(ctx, netty4HttpRequest);
148142
} else {
149143
assert msg instanceof HttpContent : "expect HttpContent got " + msg;
150144
assert currentRequestStream != null : "current stream must exists before handling http content";
151145
shouldRead = false;
152146
currentRequestStream.handleNettyContent((HttpContent) msg);
153-
if (msg instanceof LastHttpContent) {
154-
currentRequestStream = null;
155-
}
156147
}
157148
} finally {
149+
if (msg instanceof LastHttpContent) {
150+
currentRequestStream = null;
151+
}
158152
if (shouldRead) {
159153
ctx.channel().eventLoop().execute(ctx::read);
160154
}
@@ -167,7 +161,7 @@ protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpReque
167161
final Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
168162
boolean success = false;
169163
assert Transports.assertDefaultThreadContext(serverTransport.getThreadPool().getThreadContext());
170-
assert Transports.assertTransportThread();
164+
assert ctx.channel().eventLoop().inEventLoop();
171165
try {
172166
serverTransport.incomingRequest(pipelinedRequest, channel);
173167
success = true;

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java

Lines changed: 52 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,11 @@
99

1010
package org.elasticsearch.http.netty4;
1111

12-
import io.netty.buffer.Unpooled;
13-
import io.netty.handler.codec.http.DefaultFullHttpRequest;
14-
import io.netty.handler.codec.http.EmptyHttpHeaders;
15-
import io.netty.handler.codec.http.FullHttpRequest;
12+
import io.netty.handler.codec.http.DefaultHttpRequest;
1613
import io.netty.handler.codec.http.HttpHeaderNames;
1714
import io.netty.handler.codec.http.HttpHeaders;
1815
import io.netty.handler.codec.http.HttpMethod;
16+
import io.netty.handler.codec.http.HttpUtil;
1917
import io.netty.handler.codec.http.QueryStringDecoder;
2018
import io.netty.handler.codec.http.cookie.Cookie;
2119
import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
@@ -28,7 +26,6 @@
2826
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
2927
import org.elasticsearch.rest.RestRequest;
3028
import org.elasticsearch.rest.RestStatus;
31-
import org.elasticsearch.transport.netty4.Netty4Utils;
3229

3330
import java.util.AbstractMap;
3431
import java.util.Collection;
@@ -41,71 +38,57 @@
4138

4239
public class Netty4HttpRequest implements HttpRequest {
4340

44-
private final FullHttpRequest request;
45-
private final HttpBody content;
41+
private final int sequence;
42+
private final io.netty.handler.codec.http.HttpRequest nettyRequest;
43+
private boolean hasContent;
44+
private HttpBody content;
4645
private final Map<String, List<String>> headers;
4746
private final AtomicBoolean released;
4847
private final Exception inboundException;
49-
private final boolean pooled;
50-
private final int sequence;
5148
private final QueryStringDecoder queryStringDecoder;
5249

53-
Netty4HttpRequest(int sequence, io.netty.handler.codec.http.HttpRequest request, Netty4HttpRequestBodyStream contentStream) {
54-
this(
55-
sequence,
56-
new DefaultFullHttpRequest(
57-
request.protocolVersion(),
58-
request.method(),
59-
request.uri(),
60-
Unpooled.EMPTY_BUFFER,
61-
request.headers(),
62-
EmptyHttpHeaders.INSTANCE
63-
),
64-
new AtomicBoolean(false),
65-
true,
66-
contentStream,
67-
null
68-
);
69-
}
70-
71-
Netty4HttpRequest(int sequence, FullHttpRequest request) {
72-
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.fullHttpBodyFrom(request.content()));
50+
public Netty4HttpRequest(int sequence, io.netty.handler.codec.http.HttpRequest nettyRequest, Exception exception) {
51+
this(sequence, nettyRequest, HttpBody.empty(), new AtomicBoolean(false), exception);
7352
}
7453

75-
Netty4HttpRequest(int sequence, FullHttpRequest request, Exception inboundException) {
76-
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.fullHttpBodyFrom(request.content()), inboundException);
77-
}
78-
79-
private Netty4HttpRequest(int sequence, FullHttpRequest request, AtomicBoolean released, boolean pooled, HttpBody content) {
80-
this(sequence, request, released, pooled, content, null);
54+
public Netty4HttpRequest(int sequence, io.netty.handler.codec.http.HttpRequest nettyRequest, HttpBody content) {
55+
this(sequence, nettyRequest, content, new AtomicBoolean(false), null);
8156
}
8257

8358
private Netty4HttpRequest(
8459
int sequence,
85-
FullHttpRequest request,
86-
AtomicBoolean released,
87-
boolean pooled,
60+
io.netty.handler.codec.http.HttpRequest nettyRequest,
8861
HttpBody content,
62+
AtomicBoolean released,
8963
Exception inboundException
9064
) {
9165
this.sequence = sequence;
92-
this.request = request;
93-
this.headers = getHttpHeadersAsMap(request.headers());
66+
this.nettyRequest = nettyRequest;
67+
this.hasContent = hasContentHeader(nettyRequest);
9468
this.content = content;
95-
this.pooled = pooled;
69+
this.headers = getHttpHeadersAsMap(nettyRequest.headers());
9670
this.released = released;
9771
this.inboundException = inboundException;
98-
this.queryStringDecoder = new QueryStringDecoder(request.uri());
72+
this.queryStringDecoder = new QueryStringDecoder(nettyRequest.uri());
73+
}
74+
75+
private static boolean hasContentHeader(io.netty.handler.codec.http.HttpRequest nettyRequest) {
76+
return HttpUtil.isTransferEncodingChunked(nettyRequest) || HttpUtil.getContentLength(nettyRequest, 0L) > 0;
77+
}
78+
79+
@Override
80+
public boolean hasContent() {
81+
return hasContent;
9982
}
10083

10184
@Override
10285
public RestRequest.Method method() {
103-
return translateRequestMethod(request.method());
86+
return translateRequestMethod(nettyRequest.method());
10487
}
10588

10689
@Override
10790
public String uri() {
108-
return request.uri();
91+
return nettyRequest.uri();
10992
}
11093

11194
@Override
@@ -119,10 +102,17 @@ public HttpBody body() {
119102
return content;
120103
}
121104

105+
@Override
106+
public void setBody(HttpBody body) {
107+
assert this.content.isStream() : "only stream content can be replaced";
108+
assert body.isFull() : "only full content can replace stream";
109+
this.content = body;
110+
this.hasContent = body.isEmpty() == false;
111+
}
112+
122113
@Override
123114
public void release() {
124-
if (pooled && released.compareAndSet(false, true)) {
125-
request.release();
115+
if (released.compareAndSet(false, true)) {
126116
content.close();
127117
}
128118
}
@@ -134,7 +124,7 @@ public final Map<String, List<String>> getHeaders() {
134124

135125
@Override
136126
public List<String> strictCookies() {
137-
String cookieString = request.headers().get(HttpHeaderNames.COOKIE);
127+
String cookieString = nettyRequest.headers().get(HttpHeaderNames.COOKIE);
138128
if (cookieString != null) {
139129
Set<Cookie> cookies = ServerCookieDecoder.STRICT.decode(cookieString);
140130
if (cookies.isEmpty() == false) {
@@ -146,40 +136,36 @@ public List<String> strictCookies() {
146136

147137
@Override
148138
public HttpVersion protocolVersion() {
149-
if (request.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_0)) {
150-
return HttpRequest.HttpVersion.HTTP_1_0;
151-
} else if (request.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_1)) {
152-
return HttpRequest.HttpVersion.HTTP_1_1;
139+
if (nettyRequest.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_0)) {
140+
return HttpVersion.HTTP_1_0;
141+
} else if (nettyRequest.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_1)) {
142+
return HttpVersion.HTTP_1_1;
153143
} else {
154-
throw new IllegalArgumentException("Unexpected http protocol version: " + request.protocolVersion());
144+
throw new IllegalArgumentException("Unexpected http protocol version: " + nettyRequest.protocolVersion());
155145
}
156146
}
157147

158148
@Override
159149
public HttpRequest removeHeader(String header) {
160-
HttpHeaders copiedHeadersWithout = request.headers().copy();
150+
HttpHeaders copiedHeadersWithout = nettyRequest.headers().copy();
161151
copiedHeadersWithout.remove(header);
162-
HttpHeaders copiedTrailingHeadersWithout = request.trailingHeaders().copy();
163-
copiedTrailingHeadersWithout.remove(header);
164-
FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(
165-
request.protocolVersion(),
166-
request.method(),
167-
request.uri(),
168-
request.content(),
169-
copiedHeadersWithout,
170-
copiedTrailingHeadersWithout
152+
var requestWithoutHeader = new DefaultHttpRequest(
153+
nettyRequest.protocolVersion(),
154+
nettyRequest.method(),
155+
nettyRequest.uri(),
156+
copiedHeadersWithout
171157
);
172-
return new Netty4HttpRequest(sequence, requestWithoutHeader, released, pooled, content);
158+
return new Netty4HttpRequest(sequence, requestWithoutHeader, content, released, null);
173159
}
174160

175161
@Override
176162
public Netty4FullHttpResponse createResponse(RestStatus status, BytesReference contentRef) {
177-
return new Netty4FullHttpResponse(sequence, request.protocolVersion(), status, contentRef);
163+
return new Netty4FullHttpResponse(sequence, nettyRequest.protocolVersion(), status, contentRef);
178164
}
179165

180166
@Override
181167
public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBodyPart firstBodyPart) {
182-
return new Netty4ChunkedHttpResponse(sequence, request.protocolVersion(), status, firstBodyPart);
168+
return new Netty4ChunkedHttpResponse(sequence, nettyRequest.protocolVersion(), status, firstBodyPart);
183169
}
184170

185171
@Override
@@ -188,7 +174,7 @@ public Exception getInboundException() {
188174
}
189175

190176
public io.netty.handler.codec.http.HttpRequest getNettyRequest() {
191-
return request;
177+
return nettyRequest;
192178
}
193179

194180
public static RestRequest.Method translateRequestMethod(HttpMethod httpMethod) {

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,11 @@ public void next() {
7373

7474
public void handleNettyContent(HttpContent httpContent) {
7575
assert ctx.channel().eventLoop().inEventLoop() : Thread.currentThread().getName();
76+
assert readLastChunk == false;
7677
if (closing) {
7778
httpContent.release();
7879
read();
7980
} else {
80-
assert readLastChunk == false;
8181
try (var ignored = threadContext.restoreExistingContext(requestContext)) {
8282
var isLast = httpContent instanceof LastHttpContent;
8383
var buf = Netty4Utils.toReleasableBytesReference(httpContent.content());
@@ -105,17 +105,19 @@ public void close() {
105105

106106
private void doClose() {
107107
assert ctx.channel().eventLoop().inEventLoop() : Thread.currentThread().getName();
108-
closing = true;
109-
try (var ignored = threadContext.restoreExistingContext(requestContext)) {
110-
for (var tracer : tracingHandlers) {
111-
Releasables.closeExpectNoException(tracer);
108+
if (closing == false) {
109+
closing = true;
110+
try (var ignored = threadContext.restoreExistingContext(requestContext)) {
111+
for (var tracer : tracingHandlers) {
112+
Releasables.closeExpectNoException(tracer);
113+
}
114+
if (handler != null) {
115+
handler.close();
116+
}
112117
}
113-
if (handler != null) {
114-
handler.close();
118+
if (readLastChunk == false) {
119+
read();
115120
}
116121
}
117-
if (readLastChunk == false) {
118-
read();
119-
}
120122
}
121123
}

0 commit comments

Comments
 (0)