Skip to content

Commit f58af24

Browse files
Merge branch 'main' into charlotte-query-api-key-examples
2 parents 7ad9217 + 1d913f3 commit f58af24

File tree

107 files changed

+3483
-1847
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

107 files changed

+3483
-1847
lines changed

docs/changelog/129302.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 129302
2+
summary: Move HTTP content aggregation from Netty into `RestController`
3+
area: Network
4+
type: enhancement
5+
issues:
6+
- 120746

modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
1414

1515
import org.elasticsearch.test.cluster.ElasticsearchCluster;
16+
import org.elasticsearch.test.cluster.FeatureFlag;
1617
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
1718
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
1819
import org.junit.ClassRule;
@@ -28,10 +29,11 @@ public static Iterable<Object[]> parameters() throws Exception {
2829
}
2930

3031
@ClassRule
31-
public static ElasticsearchCluster cluster = ElasticsearchCluster.local().module("streams").build();
32+
public static ElasticsearchCluster cluster = ElasticsearchCluster.local().module("streams").feature(FeatureFlag.LOGS_STREAM).build();
3233

3334
@Override
3435
protected String getTestRestCluster() {
3536
return cluster.getHttpAddresses();
3637
}
38+
3739
}

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -920,6 +920,11 @@ public String getName() {
920920
return ROUTE;
921921
}
922922

923+
@Override
924+
public boolean supportsContentStream() {
925+
return true;
926+
}
927+
923928
@Override
924929
public List<Route> routes() {
925930
return List.of(new Route(RestRequest.Method.POST, ROUTE));

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
4747
if (pendingRead == false) {
4848
long now = timer.absoluteTimeInMillis();
4949
if (now >= lastRead + interval) {
50+
// if you encounter this warning during test make sure you consume content of RestRequest if it's a stream
51+
// or use AggregatingDispatcher that will consume stream fully and produce RestRequest with full content.
5052
logger.warn("chan-id={} haven't read from channel for [{}ms]", ctx.channel().id(), (now - lastRead));
5153
}
5254
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java

Lines changed: 0 additions & 59 deletions
This file was deleted.

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.netty.handler.codec.http.DefaultHttpContent;
2020
import io.netty.handler.codec.http.DefaultHttpResponse;
2121
import io.netty.handler.codec.http.DefaultLastHttpContent;
22-
import io.netty.handler.codec.http.FullHttpRequest;
2322
import io.netty.handler.codec.http.HttpContent;
2423
import io.netty.handler.codec.http.HttpObject;
2524
import io.netty.handler.codec.http.HttpRequest;
@@ -131,30 +130,25 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
131130
} else {
132131
nonError = (Exception) cause;
133132
}
134-
netty4HttpRequest = new Netty4HttpRequest(readSequence++, (FullHttpRequest) request, nonError);
133+
netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, nonError);
135134
} else {
136135
assert currentRequestStream == null : "current stream must be null for new request";
137-
if (request instanceof FullHttpRequest fullHttpRequest) {
138-
netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest);
139-
currentRequestStream = null;
140-
} else {
141-
var contentStream = new Netty4HttpRequestBodyStream(ctx, serverTransport.getThreadPool().getThreadContext());
142-
currentRequestStream = contentStream;
143-
netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream);
144-
shouldRead = false;
145-
}
136+
var contentStream = new Netty4HttpRequestBodyStream(ctx, serverTransport.getThreadPool().getThreadContext());
137+
currentRequestStream = contentStream;
138+
netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream);
139+
shouldRead = false;
146140
}
147141
handlePipelinedRequest(ctx, netty4HttpRequest);
148142
} else {
149143
assert msg instanceof HttpContent : "expect HttpContent got " + msg;
150144
assert currentRequestStream != null : "current stream must exists before handling http content";
151145
shouldRead = false;
152146
currentRequestStream.handleNettyContent((HttpContent) msg);
153-
if (msg instanceof LastHttpContent) {
154-
currentRequestStream = null;
155-
}
156147
}
157148
} finally {
149+
if (msg instanceof LastHttpContent) {
150+
currentRequestStream = null;
151+
}
158152
if (shouldRead) {
159153
ctx.channel().eventLoop().execute(ctx::read);
160154
}
@@ -167,7 +161,7 @@ protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpReque
167161
final Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
168162
boolean success = false;
169163
assert Transports.assertDefaultThreadContext(serverTransport.getThreadPool().getThreadContext());
170-
assert Transports.assertTransportThread();
164+
assert ctx.channel().eventLoop().inEventLoop();
171165
try {
172166
serverTransport.incomingRequest(pipelinedRequest, channel);
173167
success = true;

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java

Lines changed: 52 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,11 @@
99

1010
package org.elasticsearch.http.netty4;
1111

12-
import io.netty.buffer.Unpooled;
13-
import io.netty.handler.codec.http.DefaultFullHttpRequest;
14-
import io.netty.handler.codec.http.EmptyHttpHeaders;
15-
import io.netty.handler.codec.http.FullHttpRequest;
12+
import io.netty.handler.codec.http.DefaultHttpRequest;
1613
import io.netty.handler.codec.http.HttpHeaderNames;
1714
import io.netty.handler.codec.http.HttpHeaders;
1815
import io.netty.handler.codec.http.HttpMethod;
16+
import io.netty.handler.codec.http.HttpUtil;
1917
import io.netty.handler.codec.http.QueryStringDecoder;
2018
import io.netty.handler.codec.http.cookie.Cookie;
2119
import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
@@ -28,7 +26,6 @@
2826
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
2927
import org.elasticsearch.rest.RestRequest;
3028
import org.elasticsearch.rest.RestStatus;
31-
import org.elasticsearch.transport.netty4.Netty4Utils;
3229

3330
import java.util.AbstractMap;
3431
import java.util.Collection;
@@ -41,71 +38,57 @@
4138

4239
public class Netty4HttpRequest implements HttpRequest {
4340

44-
private final FullHttpRequest request;
45-
private final HttpBody content;
41+
private final int sequence;
42+
private final io.netty.handler.codec.http.HttpRequest nettyRequest;
43+
private boolean hasContent;
44+
private HttpBody content;
4645
private final Map<String, List<String>> headers;
4746
private final AtomicBoolean released;
4847
private final Exception inboundException;
49-
private final boolean pooled;
50-
private final int sequence;
5148
private final QueryStringDecoder queryStringDecoder;
5249

53-
Netty4HttpRequest(int sequence, io.netty.handler.codec.http.HttpRequest request, Netty4HttpRequestBodyStream contentStream) {
54-
this(
55-
sequence,
56-
new DefaultFullHttpRequest(
57-
request.protocolVersion(),
58-
request.method(),
59-
request.uri(),
60-
Unpooled.EMPTY_BUFFER,
61-
request.headers(),
62-
EmptyHttpHeaders.INSTANCE
63-
),
64-
new AtomicBoolean(false),
65-
true,
66-
contentStream,
67-
null
68-
);
69-
}
70-
71-
Netty4HttpRequest(int sequence, FullHttpRequest request) {
72-
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.fullHttpBodyFrom(request.content()));
50+
public Netty4HttpRequest(int sequence, io.netty.handler.codec.http.HttpRequest nettyRequest, Exception exception) {
51+
this(sequence, nettyRequest, HttpBody.empty(), new AtomicBoolean(false), exception);
7352
}
7453

75-
Netty4HttpRequest(int sequence, FullHttpRequest request, Exception inboundException) {
76-
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.fullHttpBodyFrom(request.content()), inboundException);
77-
}
78-
79-
private Netty4HttpRequest(int sequence, FullHttpRequest request, AtomicBoolean released, boolean pooled, HttpBody content) {
80-
this(sequence, request, released, pooled, content, null);
54+
public Netty4HttpRequest(int sequence, io.netty.handler.codec.http.HttpRequest nettyRequest, HttpBody content) {
55+
this(sequence, nettyRequest, content, new AtomicBoolean(false), null);
8156
}
8257

8358
private Netty4HttpRequest(
8459
int sequence,
85-
FullHttpRequest request,
86-
AtomicBoolean released,
87-
boolean pooled,
60+
io.netty.handler.codec.http.HttpRequest nettyRequest,
8861
HttpBody content,
62+
AtomicBoolean released,
8963
Exception inboundException
9064
) {
9165
this.sequence = sequence;
92-
this.request = request;
93-
this.headers = getHttpHeadersAsMap(request.headers());
66+
this.nettyRequest = nettyRequest;
67+
this.hasContent = hasContentHeader(nettyRequest);
9468
this.content = content;
95-
this.pooled = pooled;
69+
this.headers = getHttpHeadersAsMap(nettyRequest.headers());
9670
this.released = released;
9771
this.inboundException = inboundException;
98-
this.queryStringDecoder = new QueryStringDecoder(request.uri());
72+
this.queryStringDecoder = new QueryStringDecoder(nettyRequest.uri());
73+
}
74+
75+
private static boolean hasContentHeader(io.netty.handler.codec.http.HttpRequest nettyRequest) {
76+
return HttpUtil.isTransferEncodingChunked(nettyRequest) || HttpUtil.getContentLength(nettyRequest, 0L) > 0;
77+
}
78+
79+
@Override
80+
public boolean hasContent() {
81+
return hasContent;
9982
}
10083

10184
@Override
10285
public RestRequest.Method method() {
103-
return translateRequestMethod(request.method());
86+
return translateRequestMethod(nettyRequest.method());
10487
}
10588

10689
@Override
10790
public String uri() {
108-
return request.uri();
91+
return nettyRequest.uri();
10992
}
11093

11194
@Override
@@ -119,10 +102,17 @@ public HttpBody body() {
119102
return content;
120103
}
121104

105+
@Override
106+
public void setBody(HttpBody body) {
107+
assert this.content.isStream() : "only stream content can be replaced";
108+
assert body.isFull() : "only full content can replace stream";
109+
this.content = body;
110+
this.hasContent = body.isEmpty() == false;
111+
}
112+
122113
@Override
123114
public void release() {
124-
if (pooled && released.compareAndSet(false, true)) {
125-
request.release();
115+
if (released.compareAndSet(false, true)) {
126116
content.close();
127117
}
128118
}
@@ -134,7 +124,7 @@ public final Map<String, List<String>> getHeaders() {
134124

135125
@Override
136126
public List<String> strictCookies() {
137-
String cookieString = request.headers().get(HttpHeaderNames.COOKIE);
127+
String cookieString = nettyRequest.headers().get(HttpHeaderNames.COOKIE);
138128
if (cookieString != null) {
139129
Set<Cookie> cookies = ServerCookieDecoder.STRICT.decode(cookieString);
140130
if (cookies.isEmpty() == false) {
@@ -146,40 +136,36 @@ public List<String> strictCookies() {
146136

147137
@Override
148138
public HttpVersion protocolVersion() {
149-
if (request.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_0)) {
150-
return HttpRequest.HttpVersion.HTTP_1_0;
151-
} else if (request.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_1)) {
152-
return HttpRequest.HttpVersion.HTTP_1_1;
139+
if (nettyRequest.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_0)) {
140+
return HttpVersion.HTTP_1_0;
141+
} else if (nettyRequest.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_1)) {
142+
return HttpVersion.HTTP_1_1;
153143
} else {
154-
throw new IllegalArgumentException("Unexpected http protocol version: " + request.protocolVersion());
144+
throw new IllegalArgumentException("Unexpected http protocol version: " + nettyRequest.protocolVersion());
155145
}
156146
}
157147

158148
@Override
159149
public HttpRequest removeHeader(String header) {
160-
HttpHeaders copiedHeadersWithout = request.headers().copy();
150+
HttpHeaders copiedHeadersWithout = nettyRequest.headers().copy();
161151
copiedHeadersWithout.remove(header);
162-
HttpHeaders copiedTrailingHeadersWithout = request.trailingHeaders().copy();
163-
copiedTrailingHeadersWithout.remove(header);
164-
FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(
165-
request.protocolVersion(),
166-
request.method(),
167-
request.uri(),
168-
request.content(),
169-
copiedHeadersWithout,
170-
copiedTrailingHeadersWithout
152+
var requestWithoutHeader = new DefaultHttpRequest(
153+
nettyRequest.protocolVersion(),
154+
nettyRequest.method(),
155+
nettyRequest.uri(),
156+
copiedHeadersWithout
171157
);
172-
return new Netty4HttpRequest(sequence, requestWithoutHeader, released, pooled, content);
158+
return new Netty4HttpRequest(sequence, requestWithoutHeader, content, released, null);
173159
}
174160

175161
@Override
176162
public Netty4FullHttpResponse createResponse(RestStatus status, BytesReference contentRef) {
177-
return new Netty4FullHttpResponse(sequence, request.protocolVersion(), status, contentRef);
163+
return new Netty4FullHttpResponse(sequence, nettyRequest.protocolVersion(), status, contentRef);
178164
}
179165

180166
@Override
181167
public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBodyPart firstBodyPart) {
182-
return new Netty4ChunkedHttpResponse(sequence, request.protocolVersion(), status, firstBodyPart);
168+
return new Netty4ChunkedHttpResponse(sequence, nettyRequest.protocolVersion(), status, firstBodyPart);
183169
}
184170

185171
@Override
@@ -188,7 +174,7 @@ public Exception getInboundException() {
188174
}
189175

190176
public io.netty.handler.codec.http.HttpRequest getNettyRequest() {
191-
return request;
177+
return nettyRequest;
192178
}
193179

194180
public static RestRequest.Method translateRequestMethod(HttpMethod httpMethod) {

0 commit comments

Comments
 (0)