Skip to content

Commit 43232ee

Browse files
committed
feedback
1 parent 5dfbd9f commit 43232ee

File tree

7 files changed

+19
-15
lines changed

7 files changed

+19
-15
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,12 +146,12 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
146146
currentRequestStream.handleNettyContent((HttpContent) msg);
147147
}
148148
} finally {
149-
if (shouldRead) {
150-
ctx.channel().eventLoop().execute(ctx::read);
151-
}
152149
if (msg instanceof LastHttpContent) {
153150
currentRequestStream = null;
154151
}
152+
if (shouldRead) {
153+
ctx.channel().eventLoop().execute(ctx::read);
154+
}
155155
activityTracker.stopActivity();
156156
}
157157
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public Netty4HttpRequest(int sequence, io.netty.handler.codec.http.HttpRequest n
5555
this(sequence, nettyRequest, content, new AtomicBoolean(false), null);
5656
}
5757

58-
public Netty4HttpRequest(
58+
private Netty4HttpRequest(
5959
int sequence,
6060
io.netty.handler.codec.http.HttpRequest nettyRequest,
6161
HttpBody content,
@@ -104,7 +104,9 @@ public HttpBody body() {
104104

105105
@Override
106106
public void setBody(HttpBody body) {
107-
this.content = body.asFull();
107+
assert this.content.isStream() : "only stream content can be replaced";
108+
assert body.isFull() : "only full content can replace stream";
109+
this.content = body;
108110
this.hasContent = body.isEmpty() == false;
109111
}
110112

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -396,8 +396,9 @@ protected boolean isContentAlwaysEmpty(HttpResponse msg) {
396396
}
397397
return super.isContentAlwaysEmpty(msg);
398398
}
399-
});
400-
ch.pipeline().addLast(new Netty4HttpContentSizeHandler(decoder, handlingSettings.maxContentLength()));
399+
})
400+
.addLast(new Netty4HttpContentSizeHandler(decoder, handlingSettings.maxContentLength()));
401+
401402
if (handlingSettings.compression()) {
402403
ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.compressionLevel()) {
403404
@Override

server/src/main/java/org/elasticsearch/rest/RestContentAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ private static void replaceBody(RestRequest restRequest, ReleasableBytesReferenc
2525

2626
/**
2727
* Aggregates content of the RestRequest and notifies consumer with updated, in-place, RestRequest.
28-
* If content is already aggregated then does nothing.
28+
* If content is already aggregated then passes through same request.
2929
*/
3030
public static void aggregate(RestRequest restRequest, Consumer<RestRequest> resultConsumer) {
3131
final var httpRequest = restRequest.getHttpRequest();

server/src/main/java/org/elasticsearch/rest/RestController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ private void dispatchRequest(
444444
return;
445445
}
446446
}
447-
final int contentLength = request.contentLength();
447+
final int contentLength = request.isFullContent() ? request.contentLength() : 0;
448448
try {
449449
if (handler.canTripCircuitBreaker()) {
450450
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");

server/src/main/java/org/elasticsearch/rest/RestRequest.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -295,10 +295,7 @@ public boolean hasContent() {
295295
}
296296

297297
public int contentLength() {
298-
return switch (httpRequest.body()) {
299-
case HttpBody.Full content -> content.bytes().length();
300-
case HttpBody.Stream stream -> 0;
301-
};
298+
return httpRequest.body().asFull().bytes().length();
302299
}
303300

304301
public boolean isFullContent() {

server/src/test/java/org/elasticsearch/rest/RestContentAggregatorTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,11 @@ RestRequest newRestRequest(int size) {
4141

4242
public void testFullBodyPassThrough() {
4343
var fullRequest = newRestRequest(between(1, 1024));
44-
aggregate(fullRequest, (aggregated) -> assertEquals(fullRequest.content(), aggregated.content()));
44+
var aggRef = new AtomicReference<RestRequest>();
45+
aggregate(fullRequest, aggRef::set);
46+
var aggRequest = aggRef.get();
47+
assertSame(fullRequest, aggRequest);
48+
assertSame(fullRequest.content(), aggRequest.content());
4549
}
4650

4751
public void testZeroLengthStream() {
@@ -61,7 +65,7 @@ public void testAggregateRandomSize() {
6165
var streamChunks = range(0, nChunks).mapToObj(i -> randomReleasableBytesReference(chunkSize)).toList();
6266
var request = newRestRequest(chunkSize * nChunks);
6367
request.getHttpRequest().setBody(stream);
64-
AtomicReference<RestRequest> aggregatedRef = new AtomicReference<>();
68+
var aggregatedRef = new AtomicReference<RestRequest>();
6569

6670
aggregate(request, aggregatedRef::set);
6771

0 commit comments

Comments
 (0)