Skip to content

Commit 650738e

Browse files
committed
wip
1 parent 3a292e9 commit 650738e

31 files changed

+892
-251
lines changed

docs/changelog/117787.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 117787
2+
summary: "[WIP] Add rest content aggregator"
3+
area: Network
4+
type: enhancement
5+
issues: []

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java

Lines changed: 40 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,10 @@ public void testEmptyContent() throws Exception {
117117
assertTrue(recvChunk.isLast);
118118
assertEquals(0, recvChunk.chunk.length());
119119
recvChunk.chunk.close();
120-
assertFalse(handler.streamClosed);
120+
assertBusy(() -> assertTrue(handler.streamClosed));
121121

122122
// send response to process following request
123123
handler.sendResponse(new RestResponse(RestStatus.OK, ""));
124-
assertBusy(() -> assertTrue(handler.streamClosed));
125124
}
126125
assertBusy(() -> assertEquals("should receive all server responses", totalRequests, ctx.clientRespQueue.size()));
127126
}
@@ -154,10 +153,9 @@ public void testReceiveAllChunks() throws Exception {
154153
}
155154
}
156155

157-
assertFalse(handler.streamClosed);
156+
assertBusy(() -> assertTrue(handler.streamClosed));
158157
assertEquals("sent and received payloads are not the same", sendData, recvData);
159158
handler.sendResponse(new RestResponse(RestStatus.OK, ""));
160-
assertBusy(() -> assertTrue(handler.streamClosed));
161159
}
162160
assertBusy(() -> assertEquals("should receive all server responses", totalRequests, ctx.clientRespQueue.size()));
163161
}
@@ -333,32 +331,27 @@ public void test413TooLargeOnExpect100Continue() throws Exception {
333331
}
334332
}
335333

336-
// ensures that oversized chunked encoded request has no limits at http layer
337-
// rest handler is responsible for oversized requests
338-
public void testOversizedChunkedEncodingNoLimits() throws Exception {
334+
// ensures that oversized chunked encoded request has limits at http layer
335+
public void testOversizedChunkedEncodingLimits() throws Exception {
339336
try (var ctx = setupClientCtx()) {
340-
for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
341-
var id = opaqueId(reqNo);
342-
var contentSize = maxContentLength() + 1;
343-
var content = randomByteArrayOfLength(contentSize);
344-
var is = new ByteBufInputStream(Unpooled.wrappedBuffer(content));
345-
var chunkedIs = new ChunkedStream(is);
346-
var httpChunkedIs = new HttpChunkedInput(chunkedIs, LastHttpContent.EMPTY_LAST_CONTENT);
347-
var req = httpRequest(id, 0);
348-
HttpUtil.setTransferEncodingChunked(req, true);
349-
350-
ctx.clientChannel.pipeline().addLast(new ChunkedWriteHandler());
351-
ctx.clientChannel.writeAndFlush(req);
352-
ctx.clientChannel.writeAndFlush(httpChunkedIs);
353-
var handler = ctx.awaitRestChannelAccepted(id);
354-
var consumed = handler.readAllBytes();
355-
assertEquals(contentSize, consumed);
356-
handler.sendResponse(new RestResponse(RestStatus.OK, ""));
357-
358-
var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
359-
assertEquals(HttpResponseStatus.OK, resp.status());
360-
resp.release();
361-
}
337+
var id = opaqueId(0);
338+
var contentSize = maxContentLength() + 1;
339+
var content = randomByteArrayOfLength(contentSize);
340+
var is = new ByteBufInputStream(Unpooled.wrappedBuffer(content));
341+
var chunkedIs = new ChunkedStream(is);
342+
var httpChunkedIs = new HttpChunkedInput(chunkedIs, LastHttpContent.EMPTY_LAST_CONTENT);
343+
var req = httpRequest(id, 0);
344+
HttpUtil.setTransferEncodingChunked(req, true);
345+
346+
ctx.clientChannel.pipeline().addLast(new ChunkedWriteHandler());
347+
ctx.clientChannel.writeAndFlush(req);
348+
ctx.clientChannel.writeAndFlush(httpChunkedIs);
349+
var handler = ctx.awaitRestChannelAccepted(id);
350+
handler.readAllBytes();
351+
352+
var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
353+
assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status());
354+
resp.release();
362355
}
363356
}
364357

@@ -655,24 +648,27 @@ void sendResponse(RestResponse response) {
655648
channel.sendResponse(response);
656649
}
657650

658-
int readBytes(int bytes) {
651+
int readBytes(int bytes) throws InterruptedException {
659652
var consumed = 0;
660653
if (recvLast == false) {
661-
while (consumed < bytes) {
662-
stream.next();
663-
var recvChunk = safePoll(recvChunks);
664-
consumed += recvChunk.chunk.length();
665-
recvChunk.chunk.close();
666-
if (recvChunk.isLast) {
667-
recvLast = true;
668-
break;
654+
stream.next();
655+
while (consumed < bytes && streamClosed == false) {
656+
var recvChunk = recvChunks.poll(10, TimeUnit.MILLISECONDS);
657+
if (recvChunk != null) {
658+
consumed += recvChunk.chunk.length();
659+
recvChunk.chunk.close();
660+
if (recvChunk.isLast) {
661+
recvLast = true;
662+
break;
663+
}
664+
stream.next();
669665
}
670666
}
671667
}
672668
return consumed;
673669
}
674670

675-
int readAllBytes() {
671+
int readAllBytes() throws InterruptedException {
676672
return readBytes(Integer.MAX_VALUE);
677673
}
678674

@@ -704,6 +700,11 @@ public String getName() {
704700
return ROUTE;
705701
}
706702

703+
@Override
704+
public boolean supportContentStream() {
705+
return true;
706+
}
707+
707708
@Override
708709
public List<Route> routes() {
709710
return List.of(new Route(RestRequest.Method.POST, ROUTE));
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.http.netty4;
11+
12+
import io.netty.channel.Channel;
13+
import io.netty.channel.ChannelConfig;
14+
import io.netty.util.AttributeKey;
15+
16+
import java.util.BitSet;
17+
18+
class AutoReadSync {
19+
20+
private static final AttributeKey<AutoReadSync> AUTO_READ_SYNC_KEY = AttributeKey.valueOf("AutoReadSync");
21+
private final Channel channel;
22+
private final ChannelConfig config;
23+
private final BitSet ids;
24+
private final BitSet toggles;
25+
26+
AutoReadSync(Channel channel) {
27+
this.channel = channel;
28+
this.config = channel.config();
29+
this.ids = new BitSet();
30+
this.toggles = new BitSet();
31+
}
32+
33+
static Handle from(Channel channel) {
34+
assert channel.eventLoop().inEventLoop();
35+
var autoRead = channel.attr(AUTO_READ_SYNC_KEY).get();
36+
if (autoRead == null) {
37+
autoRead = new AutoReadSync(channel);
38+
channel.attr(AUTO_READ_SYNC_KEY).set(autoRead);
39+
}
40+
return autoRead.getHandle();
41+
}
42+
43+
Handle getHandle() {
44+
var nextId = ids.nextClearBit(0);
45+
ids.set(nextId, true);
46+
return new Handle(nextId);
47+
}
48+
49+
class Handle {
50+
private final int id;
51+
private boolean released;
52+
53+
Handle(int id) {
54+
this.id = id;
55+
}
56+
57+
private void assertState() {
58+
assert channel.eventLoop().inEventLoop();
59+
assert released == false;
60+
}
61+
62+
boolean isEnabled() {
63+
assertState();
64+
return toggles.get(id) == false;
65+
}
66+
67+
void enable() {
68+
assertState();
69+
toggles.set(id, false);
70+
config.setAutoRead(toggles.isEmpty());
71+
}
72+
73+
void disable() {
74+
assertState();
75+
toggles.set(id, true);
76+
config.setAutoRead(false);
77+
}
78+
79+
void release() {
80+
assertState();
81+
enable();
82+
ids.set(id, false);
83+
released = true;
84+
}
85+
}
86+
87+
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java renamed to modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandler.java

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
package org.elasticsearch.http.netty4;
1111

1212
import io.netty.channel.ChannelHandlerContext;
13-
import io.netty.handler.codec.http.FullHttpRequest;
1413
import io.netty.handler.codec.http.FullHttpResponse;
1514
import io.netty.handler.codec.http.HttpContent;
1615
import io.netty.handler.codec.http.HttpObject;
@@ -19,43 +18,27 @@
1918
import io.netty.handler.codec.http.HttpResponseStatus;
2019
import io.netty.handler.codec.http.HttpUtil;
2120

22-
import org.elasticsearch.http.HttpPreRequest;
23-
import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils;
24-
25-
import java.util.function.Predicate;
26-
2721
/**
28-
* A wrapper around {@link HttpObjectAggregator}. Provides optional content aggregation based on
29-
* predicate. {@link HttpObjectAggregator} also handles Expect: 100-continue and oversized content.
22+
* A wrapper around {@link HttpObjectAggregator}. Handles "Expect: 100-continue" and oversized content.
3023
* Unfortunately, Netty does not provide handlers for oversized messages beyond HttpObjectAggregator.
24+
* TODO: move to {@link org.elasticsearch.rest.RestController}
3125
*/
32-
public class Netty4HttpAggregator extends HttpObjectAggregator {
33-
private static final Predicate<HttpPreRequest> IGNORE_TEST = (req) -> req.uri().startsWith("/_test/request-stream") == false;
34-
35-
private final Predicate<HttpPreRequest> decider;
36-
private boolean aggregating = true;
37-
private boolean ignoreContentAfterContinueResponse = false;
26+
public class Netty4HttpContentSizeHandler extends HttpObjectAggregator {
27+
private boolean ignoreFollowingContent = false;
28+
private int contentLength = 0;
29+
private HttpRequest currentRequest;
3830

39-
public Netty4HttpAggregator(int maxContentLength, Predicate<HttpPreRequest> decider) {
31+
public Netty4HttpContentSizeHandler(int maxContentLength) {
4032
super(maxContentLength);
41-
this.decider = decider;
4233
}
4334

4435
@Override
4536
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
4637
assert msg instanceof HttpObject;
47-
if (msg instanceof HttpRequest request) {
48-
var preReq = HttpHeadersAuthenticatorUtils.asHttpPreRequest(request);
49-
aggregating = (decider.test(preReq) && IGNORE_TEST.test(preReq)) || request.decoderResult().isFailure();
50-
}
51-
if (aggregating || msg instanceof FullHttpRequest) {
52-
super.channelRead(ctx, msg);
53-
} else {
54-
handle(ctx, (HttpObject) msg);
55-
}
38+
handle(ctx, (HttpObject) msg);
5639
}
5740

58-
private void handle(ChannelHandlerContext ctx, HttpObject msg) {
41+
private void handle(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
5942
if (msg instanceof HttpRequest request) {
6043
var continueResponse = newContinueResponse(request, maxContentLength(), ctx.pipeline());
6144
if (continueResponse != null) {
@@ -65,18 +48,34 @@ private void handle(ChannelHandlerContext ctx, HttpObject msg) {
6548
ctx.writeAndFlush(continueResponse);
6649
var resp = (FullHttpResponse) continueResponse;
6750
if (resp.status() != HttpResponseStatus.CONTINUE) {
68-
ignoreContentAfterContinueResponse = true;
51+
ignoreFollowingContent = true;
6952
return;
7053
}
7154
HttpUtil.set100ContinueExpected(request, false);
55+
} else {
56+
if (HttpUtil.getContentLength(request, 0) > maxContentLength()) {
57+
handleOversizedMessage(ctx, request);
58+
ignoreFollowingContent = true;
59+
return;
60+
}
7261
}
73-
ignoreContentAfterContinueResponse = false;
62+
ignoreFollowingContent = false;
63+
contentLength = 0;
64+
currentRequest = request;
7465
ctx.fireChannelRead(msg);
7566
} else {
7667
var httpContent = (HttpContent) msg;
77-
if (ignoreContentAfterContinueResponse) {
68+
if (ignoreFollowingContent) {
7869
httpContent.release();
7970
} else {
71+
contentLength += httpContent.content().readableBytes();
72+
if (contentLength > maxContentLength()) {
73+
ignoreFollowingContent = true;
74+
httpContent.release();
75+
HttpUtil.setKeepAlive(currentRequest, false);
76+
handleOversizedMessage(ctx, currentRequest);
77+
return;
78+
}
8079
ctx.fireChannelRead(msg);
8180
}
8281
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
3737

3838
private final HttpValidator validator;
3939
private final ThreadContext threadContext;
40+
private AutoReadSync.Handle autoRead;
4041
private ArrayDeque<HttpObject> pending = new ArrayDeque<>(4);
4142
private State state = WAITING_TO_START;
4243

@@ -45,6 +46,12 @@ public Netty4HttpHeaderValidator(HttpValidator validator, ThreadContext threadCo
4546
this.threadContext = threadContext;
4647
}
4748

49+
@Override
50+
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
51+
autoRead = AutoReadSync.from(ctx.channel());
52+
super.channelRegistered(ctx);
53+
}
54+
4855
State getState() {
4956
return state;
5057
}
@@ -61,7 +68,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
6168
pending.add(ReferenceCountUtil.retain(httpObject));
6269
requestStart(ctx);
6370
assert state == QUEUEING_DATA;
64-
assert ctx.channel().config().isAutoRead() == false;
71+
assert autoRead.isEnabled() == false;
6572
break;
6673
case QUEUEING_DATA:
6774
pending.add(ReferenceCountUtil.retain(httpObject));
@@ -83,7 +90,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
8390
case DROPPING_DATA_PERMANENTLY:
8491
assert pending.isEmpty();
8592
ReferenceCountUtil.release(httpObject); // consume without enqueuing
86-
ctx.channel().config().setAutoRead(false);
93+
autoRead.disable();
8794
break;
8895
}
8996
}
@@ -106,7 +113,7 @@ private void requestStart(ChannelHandlerContext ctx) {
106113
}
107114

108115
state = QUEUEING_DATA;
109-
ctx.channel().config().setAutoRead(false);
116+
autoRead.disable();
110117

111118
if (httpRequest == null) {
112119
// this looks like a malformed request and will forward without validation
@@ -149,10 +156,10 @@ public void onFailure(Exception e) {
149156
private void forwardFullRequest(ChannelHandlerContext ctx) {
150157
Transports.assertDefaultThreadContext(threadContext);
151158
assert ctx.channel().eventLoop().inEventLoop();
152-
assert ctx.channel().config().isAutoRead() == false;
159+
assert autoRead.isEnabled() == false;
153160
assert state == QUEUEING_DATA;
154161

155-
ctx.channel().config().setAutoRead(true);
162+
autoRead.enable();
156163
boolean fullRequestForwarded = forwardData(ctx, pending);
157164

158165
assert fullRequestForwarded || pending.isEmpty();
@@ -169,7 +176,7 @@ private void forwardFullRequest(ChannelHandlerContext ctx) {
169176
private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContext ctx, Exception e) {
170177
Transports.assertDefaultThreadContext(threadContext);
171178
assert ctx.channel().eventLoop().inEventLoop();
172-
assert ctx.channel().config().isAutoRead() == false;
179+
assert autoRead.isEnabled() == false;
173180
assert state == QUEUEING_DATA;
174181

175182
HttpObject messageToForward = pending.getFirst();
@@ -180,7 +187,7 @@ private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContex
180187
}
181188
messageToForward.setDecoderResult(DecoderResult.failure(e));
182189

183-
ctx.channel().config().setAutoRead(true);
190+
autoRead.enable();
184191
ctx.fireChannelRead(messageToForward);
185192

186193
assert fullRequestDropped || pending.isEmpty();

0 commit comments

Comments
 (0)