Skip to content

Commit 14ccac1

Browse files
authored
Merge branch 'main' into feature/session-tokens
2 parents 4a32400 + 937f80c commit 14ccac1

File tree

5 files changed

+174
-2
lines changed

5 files changed

+174
-2
lines changed

docs/changelog/133775.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 133775
2+
summary: Remove Transfer-Encoding from HTTP request with no content
3+
area: Network
4+
type: bug
5+
issues: []

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
import org.elasticsearch.transport.netty4.Netty4Utils;
8686
import org.elasticsearch.xcontent.json.JsonXContent;
8787

88+
import java.io.InputStream;
8889
import java.nio.channels.ClosedChannelException;
8990
import java.nio.charset.StandardCharsets;
9091
import java.util.Collection;
@@ -392,6 +393,23 @@ public void testOversizedChunkedEncoding() throws Exception {
392393
}
393394
}
394395

396+
public void testEmptyChunkedEncoding() throws Exception {
397+
try (var clientContext = newClientContext()) {
398+
var opaqueId = clientContext.newOpaqueId();
399+
final var emptyStream = new HttpChunkedInput(new ChunkedStream(InputStream.nullInputStream()));
400+
final var request = httpRequest(opaqueId, 0);
401+
HttpUtil.setTransferEncodingChunked(request, true);
402+
clientContext.channel().pipeline().addLast(new ChunkedWriteHandler());
403+
clientContext.channel().writeAndFlush(request);
404+
clientContext.channel().writeAndFlush(emptyStream);
405+
406+
var handler = clientContext.awaitRestChannelAccepted(opaqueId);
407+
var restRequest = handler.restRequest;
408+
assertFalse(restRequest.hasContent());
409+
assertNull(restRequest.header("Transfer-Encoding"));
410+
}
411+
}
412+
395413
// ensures that we don't leak buffers in stream on 400-bad-request
396414
// some bad requests are dispatched from rest-controller before reaching rest handler
397415
// test relies on netty's buffer leak detection
@@ -733,15 +751,17 @@ Channel channel() {
733751
static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkConsumer {
734752
final SubscribableListener<Void> channelAccepted = new SubscribableListener<>();
735753
final String opaqueId;
754+
final RestRequest restRequest;
736755
private final AtomicReference<ActionListener<Chunk>> nextChunkListenerRef = new AtomicReference<>();
737756
final Netty4HttpRequestBodyStream stream;
738757
RestChannel channel;
739758
boolean receivedLastChunk = false;
740759
final CountDownLatch closedLatch = new CountDownLatch(1);
741760
volatile boolean shouldThrowInsideHandleChunk = false;
742761

743-
ServerRequestHandler(String opaqueId, Netty4HttpRequestBodyStream stream) {
762+
ServerRequestHandler(String opaqueId, RestRequest restRequest, Netty4HttpRequestBodyStream stream) {
744763
this.opaqueId = opaqueId;
764+
this.restRequest = restRequest;
745765
this.stream = stream;
746766
}
747767

@@ -934,7 +954,7 @@ public List<Route> routes() {
934954
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
935955
var stream = (Netty4HttpRequestBodyStream) request.contentStream();
936956
var opaqueId = request.getHeaders().get(Task.X_OPAQUE_ID_HTTP_HEADER).get(0);
937-
var handler = new ServerRequestHandler(opaqueId, stream);
957+
var handler = new ServerRequestHandler(opaqueId, request, stream);
938958
handlersByOpaqueId.getHandlerFor(opaqueId).onResponse(handler);
939959
return handler;
940960
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.http.netty4;
11+
12+
import io.netty.channel.ChannelHandlerContext;
13+
import io.netty.channel.ChannelInboundHandlerAdapter;
14+
import io.netty.handler.codec.http.HttpContent;
15+
import io.netty.handler.codec.http.HttpRequest;
16+
import io.netty.handler.codec.http.HttpUtil;
17+
import io.netty.handler.codec.http.LastHttpContent;
18+
19+
public class Netty4EmptyChunkHandler extends ChannelInboundHandlerAdapter {
20+
21+
private HttpRequest currentRequest;
22+
23+
@Override
24+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
25+
switch (msg) {
26+
case HttpRequest request -> {
27+
if (request.decoderResult().isSuccess() && HttpUtil.isTransferEncodingChunked(request)) {
28+
currentRequest = request;
29+
ctx.read();
30+
} else {
31+
currentRequest = null;
32+
ctx.fireChannelRead(request);
33+
}
34+
}
35+
case HttpContent content -> {
36+
if (currentRequest != null) {
37+
if (content instanceof LastHttpContent && content.content().readableBytes() == 0) {
38+
HttpUtil.setTransferEncodingChunked(currentRequest, false);
39+
}
40+
ctx.fireChannelRead(currentRequest);
41+
ctx.fireChannelRead(content);
42+
currentRequest = null;
43+
} else {
44+
ctx.fireChannelRead(content);
45+
}
46+
}
47+
default -> ctx.fireChannelRead(msg);
48+
}
49+
}
50+
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,7 @@ protected Result beginEncode(HttpResponse httpResponse, String acceptEncoding) t
414414
if (ResourceLeakDetector.isEnabled()) {
415415
ch.pipeline().addLast(new Netty4LeakDetectionHandler());
416416
}
417+
ch.pipeline().addLast(new Netty4EmptyChunkHandler());
417418
// See https://github.com/netty/netty/issues/15053: the combination of FlowControlHandler and HttpContentDecompressor above
418419
// can emit multiple chunks per read, but HttpBody.Stream requires chunks to arrive one-at-a-time so until that issue is
419420
// resolved we must add another flow controller here:
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.http.netty4;
11+
12+
import io.netty.buffer.Unpooled;
13+
import io.netty.channel.embedded.EmbeddedChannel;
14+
import io.netty.handler.codec.DecoderResult;
15+
import io.netty.handler.codec.http.DefaultHttpRequest;
16+
import io.netty.handler.codec.http.DefaultLastHttpContent;
17+
import io.netty.handler.codec.http.HttpMessage;
18+
import io.netty.handler.codec.http.HttpMethod;
19+
import io.netty.handler.codec.http.HttpRequest;
20+
import io.netty.handler.codec.http.HttpUtil;
21+
import io.netty.handler.codec.http.HttpVersion;
22+
23+
import org.elasticsearch.test.ESTestCase;
24+
25+
public class Netty4EmptyChunkHandlerTests extends ESTestCase {
26+
27+
private EmbeddedChannel channel;
28+
29+
@Override
30+
public void setUp() throws Exception {
31+
super.setUp();
32+
channel = new EmbeddedChannel(new Netty4EmptyChunkHandler());
33+
channel.config().setAutoRead(false);
34+
}
35+
36+
public void testNonChunkedPassthrough() {
37+
var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
38+
var content = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER);
39+
channel.writeInbound(req, content);
40+
assertEquals(req, channel.readInbound());
41+
assertEquals(content, channel.readInbound());
42+
}
43+
44+
public void testDecodingFailurePassthrough() {
45+
var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
46+
HttpUtil.setTransferEncodingChunked(req, true);
47+
req.setDecoderResult(DecoderResult.failure(new Exception()));
48+
channel.writeInbound(req);
49+
var recvReq = (HttpRequest) channel.readInbound();
50+
assertTrue(recvReq.decoderResult().isFailure());
51+
assertTrue(HttpUtil.isTransferEncodingChunked(recvReq));
52+
}
53+
54+
public void testHoldChunkedRequest() {
55+
var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
56+
HttpUtil.setTransferEncodingChunked(req, true);
57+
var readSniffer = new ReadSniffer();
58+
channel.pipeline().addFirst(readSniffer);
59+
channel.writeInbound(req);
60+
assertNull("should hold on HTTP request until first chunk arrives", channel.readInbound());
61+
assertEquals("must read first chunk when holding request", 1, readSniffer.readCount);
62+
}
63+
64+
public void testRemoveEncodingFromEmpty() {
65+
var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
66+
HttpUtil.setTransferEncodingChunked(req, true);
67+
var content = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER);
68+
channel.writeInbound(req, content);
69+
var recvReq = channel.readInbound();
70+
assertEquals(req, recvReq);
71+
assertEquals(content, channel.readInbound());
72+
assertFalse("should remove Transfer-Encoding from empty content", HttpUtil.isTransferEncodingChunked((HttpMessage) recvReq));
73+
}
74+
75+
public void testKeepEncodingForNonEmpty() {
76+
var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
77+
HttpUtil.setTransferEncodingChunked(req, true);
78+
var content = new DefaultLastHttpContent(Unpooled.wrappedBuffer(randomByteArrayOfLength(between(1, 1024))));
79+
channel.writeInbound(req, content);
80+
var recvReq = channel.readInbound();
81+
assertEquals(req, recvReq);
82+
assertEquals(content, channel.readInbound());
83+
assertTrue("should keep Transfer-Encoding for non-empty content", HttpUtil.isTransferEncodingChunked((HttpMessage) recvReq));
84+
}
85+
86+
public void testRandomizedChannelReuse() {
87+
for (int i = 0; i < 1000; i++) {
88+
switch (between(0, 3)) {
89+
case 0 -> testNonChunkedPassthrough();
90+
case 1 -> testKeepEncodingForNonEmpty();
91+
case 2 -> testDecodingFailurePassthrough();
92+
default -> testRemoveEncodingFromEmpty();
93+
}
94+
}
95+
}
96+
}

0 commit comments

Comments
 (0)