Skip to content

Commit c7d3e4d

Browse files
committed
wip
1 parent eb0020f commit c7d3e4d

File tree

28 files changed

+959
-210
lines changed

28 files changed

+959
-210
lines changed

docs/changelog/117787.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 117787
2+
summary: "[WIP] Add rest content aggregator"
3+
area: Network
4+
type: enhancement
5+
issues: []

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -333,12 +333,10 @@ public void test413TooLargeOnExpect100Continue() throws Exception {
333333
}
334334
}
335335

336-
// ensures that oversized chunked encoded request has no limits at http layer
337-
// rest handler is responsible for oversized requests
338-
public void testOversizedChunkedEncodingNoLimits() throws Exception {
336+
// ensures that oversized chunked encoded request has limits at http layer
337+
public void testOversizedChunkedEncodingLimits() throws Exception {
339338
try (var ctx = setupClientCtx()) {
340-
for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
341-
var id = opaqueId(reqNo);
339+
var id = opaqueId(0);
342340
var contentSize = maxContentLength() + 1;
343341
var content = randomByteArrayOfLength(contentSize);
344342
var is = new ByteBufInputStream(Unpooled.wrappedBuffer(content));
@@ -351,14 +349,11 @@ public void testOversizedChunkedEncodingNoLimits() throws Exception {
351349
ctx.clientChannel.writeAndFlush(req);
352350
ctx.clientChannel.writeAndFlush(httpChunkedIs);
353351
var handler = ctx.awaitRestChannelAccepted(id);
354-
var consumed = handler.readAllBytes();
355-
assertEquals(contentSize, consumed);
356-
handler.sendResponse(new RestResponse(RestStatus.OK, ""));
352+
handler.readAllBytes();
357353

358354
var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
359-
assertEquals(HttpResponseStatus.OK, resp.status());
355+
assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status());
360356
resp.release();
361-
}
362357
}
363358
}
364359

@@ -655,24 +650,27 @@ void sendResponse(RestResponse response) {
655650
channel.sendResponse(response);
656651
}
657652

658-
int readBytes(int bytes) {
653+
int readBytes(int bytes) throws InterruptedException {
659654
var consumed = 0;
660655
if (recvLast == false) {
661-
while (consumed < bytes) {
662-
stream.next();
663-
var recvChunk = safePoll(recvChunks);
664-
consumed += recvChunk.chunk.length();
665-
recvChunk.chunk.close();
666-
if (recvChunk.isLast) {
667-
recvLast = true;
668-
break;
656+
stream.next();
657+
while (consumed < bytes && streamClosed == false) {
658+
var recvChunk = recvChunks.poll(10, TimeUnit.MILLISECONDS);
659+
if (recvChunk != null) {
660+
consumed += recvChunk.chunk.length();
661+
recvChunk.chunk.close();
662+
if (recvChunk.isLast) {
663+
recvLast = true;
664+
break;
665+
}
666+
stream.next();
669667
}
670668
}
671669
}
672670
return consumed;
673671
}
674672

675-
int readAllBytes() {
673+
int readAllBytes() throws InterruptedException {
676674
return readBytes(Integer.MAX_VALUE);
677675
}
678676

@@ -704,6 +702,11 @@ public String getName() {
704702
return ROUTE;
705703
}
706704

705+
@Override
706+
public boolean supportContentStream() {
707+
return true;
708+
}
709+
707710
@Override
708711
public List<Route> routes() {
709712
return List.of(new Route(RestRequest.Method.POST, ROUTE));

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java renamed to modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandler.java

Lines changed: 35 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
package org.elasticsearch.http.netty4;
1111

1212
import io.netty.channel.ChannelHandlerContext;
13-
import io.netty.handler.codec.http.FullHttpRequest;
1413
import io.netty.handler.codec.http.FullHttpResponse;
1514
import io.netty.handler.codec.http.HttpContent;
1615
import io.netty.handler.codec.http.HttpObject;
@@ -19,43 +18,35 @@
1918
import io.netty.handler.codec.http.HttpResponseStatus;
2019
import io.netty.handler.codec.http.HttpUtil;
2120

22-
import org.elasticsearch.http.HttpPreRequest;
23-
import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils;
24-
25-
import java.util.function.Predicate;
26-
2721
/**
28-
* A wrapper around {@link HttpObjectAggregator}. Provides optional content aggregation based on
29-
* predicate. {@link HttpObjectAggregator} also handles Expect: 100-continue and oversized content.
22+
<<<<<<< Updated upstream
23+
* A wrapper around {@link HttpObjectAggregator}.Handles "Expect: 100-continue" and oversized content.
24+
=======
25+
* A wrapper around {@link HttpObjectAggregator}. Handles "Expect: 100-continue" and oversized content.
26+
>>>>>>> Stashed changes
3027
* Unfortunately, Netty does not provide handlers for oversized messages beyond HttpObjectAggregator.
28+
* TODO: move to {@link org.elasticsearch.rest.RestController}
3129
*/
32-
public class Netty4HttpAggregator extends HttpObjectAggregator {
33-
private static final Predicate<HttpPreRequest> IGNORE_TEST = (req) -> req.uri().startsWith("/_test/request-stream") == false;
34-
35-
private final Predicate<HttpPreRequest> decider;
36-
private boolean aggregating = true;
30+
public class Netty4HttpContentSizeHandler extends HttpObjectAggregator {
31+
<<<<<<< Updated upstream
3732
private boolean ignoreContentAfterContinueResponse = false;
33+
=======
34+
private boolean ignoreFollowingContent = false;
35+
private int contentLength = 0;
36+
private HttpRequest currentRequest;
37+
>>>>>>> Stashed changes
3838

39-
public Netty4HttpAggregator(int maxContentLength, Predicate<HttpPreRequest> decider) {
39+
public Netty4HttpContentSizeHandler(int maxContentLength) {
4040
super(maxContentLength);
41-
this.decider = decider;
4241
}
4342

4443
@Override
4544
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
4645
assert msg instanceof HttpObject;
47-
if (msg instanceof HttpRequest request) {
48-
var preReq = HttpHeadersAuthenticatorUtils.asHttpPreRequest(request);
49-
aggregating = (decider.test(preReq) && IGNORE_TEST.test(preReq)) || request.decoderResult().isFailure();
50-
}
51-
if (aggregating || msg instanceof FullHttpRequest) {
52-
super.channelRead(ctx, msg);
53-
} else {
54-
handle(ctx, (HttpObject) msg);
55-
}
46+
handle(ctx, (HttpObject) msg);
5647
}
5748

58-
private void handle(ChannelHandlerContext ctx, HttpObject msg) {
49+
private void handle(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
5950
if (msg instanceof HttpRequest request) {
6051
var continueResponse = newContinueResponse(request, maxContentLength(), ctx.pipeline());
6152
if (continueResponse != null) {
@@ -65,18 +56,34 @@ private void handle(ChannelHandlerContext ctx, HttpObject msg) {
6556
ctx.writeAndFlush(continueResponse);
6657
var resp = (FullHttpResponse) continueResponse;
6758
if (resp.status() != HttpResponseStatus.CONTINUE) {
68-
ignoreContentAfterContinueResponse = true;
59+
ignoreFollowingContent = true;
6960
return;
7061
}
7162
HttpUtil.set100ContinueExpected(request, false);
63+
} else {
64+
if (HttpUtil.getContentLength(request, 0) > maxContentLength()) {
65+
handleOversizedMessage(ctx, request);
66+
ignoreFollowingContent = true;
67+
return;
68+
}
7269
}
73-
ignoreContentAfterContinueResponse = false;
70+
ignoreFollowingContent = false;
71+
contentLength = 0;
72+
currentRequest = request;
7473
ctx.fireChannelRead(msg);
7574
} else {
7675
var httpContent = (HttpContent) msg;
77-
if (ignoreContentAfterContinueResponse) {
76+
if (ignoreFollowingContent) {
7877
httpContent.release();
7978
} else {
79+
contentLength += httpContent.content().readableBytes();
80+
if (contentLength > maxContentLength()) {
81+
ignoreFollowingContent = true;
82+
httpContent.release();
83+
HttpUtil.setKeepAlive(currentRequest, false);
84+
handleOversizedMessage(ctx, currentRequest);
85+
return;
86+
}
8087
ctx.fireChannelRead(msg);
8188
}
8289
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ State getState() {
4949
return state;
5050
}
5151

52+
boolean canAutoRead() {
53+
return state != WAITING_TO_START && state != QUEUEING_DATA;
54+
}
55+
5256
@SuppressWarnings("fallthrough")
5357
@Override
5458
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public Netty4HttpPipeliningHandler(
116116
}
117117

118118
@Override
119-
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
119+
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
120120
activityTracker.startActivity();
121121
try {
122122
if (msg instanceof HttpRequest request) {
@@ -130,7 +130,7 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
130130
} else {
131131
nonError = (Exception) cause;
132132
}
133-
netty4HttpRequest = new Netty4HttpRequest(readSequence++, (FullHttpRequest) request, nonError);
133+
netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, nonError);
134134
} else {
135135
assert currentRequestStream == null : "current stream must be null for new request";
136136
if (request instanceof FullHttpRequest fullHttpRequest) {
@@ -139,7 +139,8 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
139139
} else {
140140
var contentStream = new Netty4HttpRequestBodyStream(
141141
ctx.channel(),
142-
serverTransport.getThreadPool().getThreadContext()
142+
serverTransport.getThreadPool().getThreadContext(),
143+
activityTracker
143144
);
144145
currentRequestStream = contentStream;
145146
netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream);

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,21 @@
1111

1212
import io.netty.buffer.Unpooled;
1313
import io.netty.handler.codec.http.DefaultFullHttpRequest;
14+
import io.netty.handler.codec.http.DefaultHttpRequest;
1415
import io.netty.handler.codec.http.EmptyHttpHeaders;
1516
import io.netty.handler.codec.http.FullHttpRequest;
1617
import io.netty.handler.codec.http.HttpHeaderNames;
1718
import io.netty.handler.codec.http.HttpHeaders;
1819
import io.netty.handler.codec.http.HttpMethod;
20+
import io.netty.handler.codec.http.HttpRequest;
21+
import io.netty.handler.codec.http.HttpUtil;
1922
import io.netty.handler.codec.http.QueryStringDecoder;
2023
import io.netty.handler.codec.http.cookie.Cookie;
2124
import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
2225
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
2326

2427
import org.elasticsearch.common.bytes.BytesReference;
2528
import org.elasticsearch.http.HttpBody;
26-
import org.elasticsearch.http.HttpRequest;
2729
import org.elasticsearch.http.HttpResponse;
2830
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
2931
import org.elasticsearch.rest.RestRequest;
@@ -39,18 +41,19 @@
3941
import java.util.concurrent.atomic.AtomicBoolean;
4042
import java.util.stream.Collectors;
4143

42-
public class Netty4HttpRequest implements HttpRequest {
44+
public class Netty4HttpRequest implements org.elasticsearch.http.HttpRequest {
4345

44-
private final FullHttpRequest request;
46+
private final HttpRequest request;
4547
private final HttpBody content;
4648
private final Map<String, List<String>> headers;
4749
private final AtomicBoolean released;
4850
private final Exception inboundException;
4951
private final boolean pooled;
5052
private final int sequence;
5153
private final QueryStringDecoder queryStringDecoder;
54+
private final int contentLength;
5255

53-
Netty4HttpRequest(int sequence, io.netty.handler.codec.http.HttpRequest request, Netty4HttpRequestBodyStream contentStream) {
56+
Netty4HttpRequest(int sequence, HttpRequest request, Netty4HttpRequestBodyStream contentStream) {
5457
this(
5558
sequence,
5659
new DefaultFullHttpRequest(
@@ -72,17 +75,18 @@ public class Netty4HttpRequest implements HttpRequest {
7275
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.fullHttpBodyFrom(request.content()));
7376
}
7477

75-
Netty4HttpRequest(int sequence, FullHttpRequest request, Exception inboundException) {
76-
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.fullHttpBodyFrom(request.content()), inboundException);
78+
Netty4HttpRequest(int sequence, HttpRequest request, Exception inboundException) {
79+
this(sequence, request, new AtomicBoolean(false), true, HttpBody.empty(), inboundException);
80+
7781
}
7882

79-
private Netty4HttpRequest(int sequence, FullHttpRequest request, AtomicBoolean released, boolean pooled, HttpBody content) {
83+
private Netty4HttpRequest(int sequence, HttpRequest request, AtomicBoolean released, boolean pooled, HttpBody content) {
8084
this(sequence, request, released, pooled, content, null);
8185
}
8286

8387
private Netty4HttpRequest(
8488
int sequence,
85-
FullHttpRequest request,
89+
HttpRequest request,
8690
AtomicBoolean released,
8791
boolean pooled,
8892
HttpBody content,
@@ -96,6 +100,15 @@ private Netty4HttpRequest(
96100
this.released = released;
97101
this.inboundException = inboundException;
98102
this.queryStringDecoder = new QueryStringDecoder(request.uri());
103+
this.contentLength = getContentLength(request);
104+
}
105+
106+
static int getContentLength(io.netty.handler.codec.http.HttpRequest request) {
107+
if (HttpUtil.isTransferEncodingChunked(request)) {
108+
return -1;
109+
} else {
110+
return HttpUtil.getContentLength(request, 0);
111+
}
99112
}
100113

101114
@Override
@@ -113,6 +126,11 @@ public String rawPath() {
113126
return queryStringDecoder.rawPath();
114127
}
115128

129+
@Override
130+
public int contentLength() {
131+
return contentLength;
132+
}
133+
116134
@Override
117135
public HttpBody body() {
118136
assert released.get() == false;
@@ -122,7 +140,6 @@ public HttpBody body() {
122140
@Override
123141
public void release() {
124142
if (pooled && released.compareAndSet(false, true)) {
125-
request.release();
126143
content.close();
127144
}
128145
}
@@ -147,27 +164,23 @@ public List<String> strictCookies() {
147164
@Override
148165
public HttpVersion protocolVersion() {
149166
if (request.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_0)) {
150-
return HttpRequest.HttpVersion.HTTP_1_0;
167+
return org.elasticsearch.http.HttpRequest.HttpVersion.HTTP_1_0;
151168
} else if (request.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_1)) {
152-
return HttpRequest.HttpVersion.HTTP_1_1;
169+
return org.elasticsearch.http.HttpRequest.HttpVersion.HTTP_1_1;
153170
} else {
154171
throw new IllegalArgumentException("Unexpected http protocol version: " + request.protocolVersion());
155172
}
156173
}
157174

158175
@Override
159-
public HttpRequest removeHeader(String header) {
176+
public org.elasticsearch.http.HttpRequest removeHeader(String header) {
160177
HttpHeaders copiedHeadersWithout = request.headers().copy();
161178
copiedHeadersWithout.remove(header);
162-
HttpHeaders copiedTrailingHeadersWithout = request.trailingHeaders().copy();
163-
copiedTrailingHeadersWithout.remove(header);
164-
FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(
179+
HttpRequest requestWithoutHeader = new DefaultHttpRequest(
165180
request.protocolVersion(),
166181
request.method(),
167182
request.uri(),
168-
request.content(),
169-
copiedHeadersWithout,
170-
copiedTrailingHeadersWithout
183+
copiedHeadersWithout
171184
);
172185
return new Netty4HttpRequest(sequence, requestWithoutHeader, released, pooled, content);
173186
}

0 commit comments

Comments
 (0)