From 3190185c64f52804672338cb0d5f01059c22ba2c Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Thu, 28 Aug 2025 16:56:02 -0700 Subject: [PATCH 1/7] remove Transfer-Encoding from HTTP request with no content --- .../Netty4IncrementalRequestHandlingIT.java | 53 +++++++++++- .../http/netty4/Netty4EmptyChunkHandler.java | 50 +++++++++++ .../netty4/Netty4HttpServerTransport.java | 1 + .../netty4/Netty4EmptyChunkHandlerTests.java | 82 +++++++++++++++++++ 4 files changed, 184 insertions(+), 2 deletions(-) create mode 100644 modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandler.java create mode 100644 modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandlerTests.java diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index 1f783a0c30d4c..234a55721bb02 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -11,6 +11,7 @@ import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; @@ -34,6 +35,7 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.stream.ChunkedInput; import io.netty.handler.stream.ChunkedStream; import io.netty.handler.stream.ChunkedWriteHandler; @@ -392,6 +394,51 @@ public void testOversizedChunkedEncoding() throws Exception { } } + public void testEmptyChunkedEncoding() throws Exception { + try (var clientContext = newClientContext()) { + var opaqueId = clientContext.newOpaqueId(); + final var emptyStream = new HttpChunkedInput(new ChunkedInput<>() { + @Override + public boolean isEndOfInput() throws Exception { + return true; + } + + @Override + public void close() throws Exception {} + + @Override + public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { + return null; + } + + @Override + public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception { + return null; + } + + @Override + public long length() { + return 0; + } + + @Override + public long progress() { + return 0; + } + }, LastHttpContent.EMPTY_LAST_CONTENT); + final var request = httpRequest(opaqueId, 0); + HttpUtil.setTransferEncodingChunked(request, true); + clientContext.channel().pipeline().addLast(new ChunkedWriteHandler()); + clientContext.channel().writeAndFlush(request); + clientContext.channel().writeAndFlush(emptyStream); + + var handler = clientContext.awaitRestChannelAccepted(opaqueId); + var restRequest = handler.restRequest; + assertFalse(restRequest.hasContent()); + assertNull(restRequest.header("Transfer-Encoding")); + } + } + // ensures that we don't leak buffers in stream on 400-bad-request // some bad requests are dispatched from rest-controller before reaching rest handler // test relies on netty's buffer leak detection @@ -733,6 +780,7 @@ Channel channel() { static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkConsumer { final SubscribableListener channelAccepted = new SubscribableListener<>(); final String opaqueId; + final RestRequest restRequest; private final AtomicReference> nextChunkListenerRef = new AtomicReference<>(); final Netty4HttpRequestBodyStream stream; RestChannel channel; @@ -740,8 +788,9 @@ static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkCon final CountDownLatch closedLatch = new CountDownLatch(1); volatile boolean shouldThrowInsideHandleChunk = false; - ServerRequestHandler(String opaqueId, Netty4HttpRequestBodyStream stream) { + ServerRequestHandler(String opaqueId, RestRequest restRequest, Netty4HttpRequestBodyStream stream) { this.opaqueId = opaqueId; + this.restRequest = restRequest; this.stream = stream; } @@ -934,7 +983,7 @@ public List routes() { protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { var stream = (Netty4HttpRequestBodyStream) request.contentStream(); var opaqueId = request.getHeaders().get(Task.X_OPAQUE_ID_HTTP_HEADER).get(0); - var handler = new ServerRequestHandler(opaqueId, stream); + var handler = new ServerRequestHandler(opaqueId, request, stream); handlersByOpaqueId.getHandlerFor(opaqueId).onResponse(handler); return handler; } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandler.java new file mode 100644 index 0000000000000..762946576ec99 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandler.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.http.netty4; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.LastHttpContent; + +public class Netty4EmptyChunkHandler extends ChannelInboundHandlerAdapter { + + private HttpRequest currentRequest; + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + switch (msg) { + case HttpRequest request -> { + if (HttpUtil.isTransferEncodingChunked(request)) { + currentRequest = request; + ctx.read(); + } else { + currentRequest = null; + ctx.fireChannelRead(request); + } + } + case HttpContent content -> { + if (currentRequest != null) { + if (content instanceof LastHttpContent && content.content().readableBytes() == 0) { + HttpUtil.setTransferEncodingChunked(currentRequest, false); + } + ctx.fireChannelRead(currentRequest); + ctx.fireChannelRead(content); + currentRequest = null; + } else { + ctx.fireChannelRead(content); + } + } + default -> ctx.fireChannelRead(msg); + } + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 5ea454534563b..def517d21f91e 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -414,6 +414,7 @@ protected Result beginEncode(HttpResponse httpResponse, String acceptEncoding) t if (ResourceLeakDetector.isEnabled()) { ch.pipeline().addLast(new Netty4LeakDetectionHandler()); } + ch.pipeline().addLast(new Netty4EmptyChunkHandler()); // See https://github.com/netty/netty/issues/15053: the combination of FlowControlHandler and HttpContentDecompressor above // can emit multiple chunks per read, but HttpBody.Stream requires chunks to arrive one-at-a-time so until that issue is // resolved we must add another flow controller here: diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandlerTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandlerTests.java new file mode 100644 index 0000000000000..1a4cadb10c806 --- /dev/null +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandlerTests.java @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.http.netty4; + +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.http.DefaultHttpRequest; +import io.netty.handler.codec.http.DefaultLastHttpContent; +import io.netty.handler.codec.http.HttpMessage; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.HttpVersion; + +import org.elasticsearch.test.ESTestCase; + +public class Netty4EmptyChunkHandlerTests extends ESTestCase { + + private EmbeddedChannel channel; + + @Override + public void setUp() throws Exception { + super.setUp(); + channel = new EmbeddedChannel(new Netty4EmptyChunkHandler()); + } + + public void testNonChunkedPassthrough() { + var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, ""); + var content = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER); + channel.writeInbound(req, content); + assertEquals(req, channel.readInbound()); + assertEquals(content, channel.readInbound()); + } + + public void testHoldChunkedRequest() { + var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, ""); + HttpUtil.setTransferEncodingChunked(req, true); + var readSniffer = new ReadSniffer(); + channel.pipeline().addFirst(readSniffer); + channel.writeInbound(req); + assertNull("should hold on HTTP request until first chunk arrives", channel.readInbound()); + assertEquals("must read first chunk when holding request", 1, readSniffer.readCount); + } + + public void testRemoveEncodingFromEmpty() { + var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, ""); + HttpUtil.setTransferEncodingChunked(req, true); + var content = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER); + channel.writeInbound(req, content); + var recvReq = channel.readInbound(); + assertEquals(req, recvReq); + assertEquals(content, channel.readInbound()); + assertFalse("should remove Transfer-Encoding from empty content", HttpUtil.isTransferEncodingChunked((HttpMessage) recvReq)); + } + + public void testKeepEncodingForNonEmpty() { + var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, ""); + HttpUtil.setTransferEncodingChunked(req, true); + var content = new DefaultLastHttpContent(Unpooled.wrappedBuffer(randomByteArrayOfLength(between(1, 1024)))); + channel.writeInbound(req, content); + var recvReq = channel.readInbound(); + assertEquals(req, recvReq); + assertEquals(content, channel.readInbound()); + assertTrue("should keep Transfer-Encoding for non-empty content", HttpUtil.isTransferEncodingChunked((HttpMessage) recvReq)); + } + + public void testRandomizedChannelReuse() { + for (int i = 0; i < 1000; i++) { + switch (between(0, 2)) { + case 0 -> testNonChunkedPassthrough(); + case 1 -> testKeepEncodingForNonEmpty(); + default -> testRemoveEncodingFromEmpty(); + } + } + } +} From 7ab736111e35f23df9cb91ea28dd6dd6f43ff586 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Thu, 28 Aug 2025 17:04:00 -0700 Subject: [PATCH 2/7] Update docs/changelog/133775.yaml --- docs/changelog/133775.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/133775.yaml diff --git a/docs/changelog/133775.yaml b/docs/changelog/133775.yaml new file mode 100644 index 0000000000000..1bdecce846d9f --- /dev/null +++ b/docs/changelog/133775.yaml @@ -0,0 +1,5 @@ +pr: 133775 +summary: Remove Transfer-Encoding from HTTP request with no content +area: Network +type: bug +issues: [] From 533637d2a8884b45d59f44d48cc5bb1ce44a583e Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Thu, 28 Aug 2025 18:02:43 -0700 Subject: [PATCH 3/7] disable autoread in test --- .../elasticsearch/http/netty4/Netty4EmptyChunkHandlerTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandlerTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandlerTests.java index 1a4cadb10c806..d876aa58f57af 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandlerTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandlerTests.java @@ -28,6 +28,7 @@ public class Netty4EmptyChunkHandlerTests extends ESTestCase { public void setUp() throws Exception { super.setUp(); channel = new EmbeddedChannel(new Netty4EmptyChunkHandler()); + channel.config().setAutoRead(false); } public void testNonChunkedPassthrough() { From 7d24410a4cd2bd4afb59e11254f92f8570b6fec7 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Thu, 28 Aug 2025 20:36:33 -0700 Subject: [PATCH 4/7] passthrough errors --- .../org/elasticsearch/http/netty4/Netty4EmptyChunkHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandler.java index 762946576ec99..045767cf41c63 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandler.java @@ -24,7 +24,7 @@ public class Netty4EmptyChunkHandler extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { switch (msg) { case HttpRequest request -> { - if (HttpUtil.isTransferEncodingChunked(request)) { + if (request.decoderResult().isSuccess() && HttpUtil.isTransferEncodingChunked(request)) { currentRequest = request; ctx.read(); } else { From 96e1c4d06b58e08d4f019135639e5ad33d8b37d3 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Tue, 2 Sep 2025 17:45:13 -0700 Subject: [PATCH 5/7] cleanup and decoding failure test --- .../Netty4IncrementalRequestHandlingIT.java | 31 ++----------------- .../netty4/Netty4EmptyChunkHandlerTests.java | 12 +++++++ 2 files changed, 14 insertions(+), 29 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index 234a55721bb02..ec06f50f0326d 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -87,6 +87,7 @@ import org.elasticsearch.transport.netty4.Netty4Utils; import org.elasticsearch.xcontent.json.JsonXContent; +import java.io.InputStream; import java.nio.channels.ClosedChannelException; import java.nio.charset.StandardCharsets; import java.util.Collection; @@ -397,35 +398,7 @@ public void testOversizedChunkedEncoding() throws Exception { public void testEmptyChunkedEncoding() throws Exception { try (var clientContext = newClientContext()) { var opaqueId = clientContext.newOpaqueId(); - final var emptyStream = new HttpChunkedInput(new ChunkedInput<>() { - @Override - public boolean isEndOfInput() throws Exception { - return true; - } - - @Override - public void close() throws Exception {} - - @Override - public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { - return null; - } - - @Override - public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception { - return null; - } - - @Override - public long length() { - return 0; - } - - @Override - public long progress() { - return 0; - } - }, LastHttpContent.EMPTY_LAST_CONTENT); + final var emptyStream = new HttpChunkedInput(new ChunkedStream(InputStream.nullInputStream())); final var request = httpRequest(opaqueId, 0); HttpUtil.setTransferEncodingChunked(request, true); clientContext.channel().pipeline().addLast(new ChunkedWriteHandler()); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandlerTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandlerTests.java index d876aa58f57af..8cc3e60732820 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandlerTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandlerTests.java @@ -11,10 +11,12 @@ import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.DecoderResult; import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.DefaultLastHttpContent; import io.netty.handler.codec.http.HttpMessage; import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.HttpVersion; @@ -39,6 +41,16 @@ public void testNonChunkedPassthrough() { assertEquals(content, channel.readInbound()); } + public void testDecodingFailurePassthrough() { + var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, ""); + HttpUtil.setTransferEncodingChunked(req, true); + req.setDecoderResult(DecoderResult.failure(new Exception())); + channel.writeInbound(req); + var recvReq = (HttpRequest) channel.readInbound(); + assertTrue(recvReq.decoderResult().isFailure()); + assertTrue(HttpUtil.isTransferEncodingChunked(recvReq)); + } + public void testHoldChunkedRequest() { var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, ""); HttpUtil.setTransferEncodingChunked(req, true); From 8172a4228182e92ed0c860cd2c878809cb18bd92 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Tue, 2 Sep 2025 17:47:53 -0700 Subject: [PATCH 6/7] more tests --- .../http/netty4/Netty4EmptyChunkHandlerTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandlerTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandlerTests.java index 8cc3e60732820..f6691ae204a2e 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandlerTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandlerTests.java @@ -85,9 +85,10 @@ public void testKeepEncodingForNonEmpty() { public void testRandomizedChannelReuse() { for (int i = 0; i < 1000; i++) { - switch (between(0, 2)) { + switch (between(0, 3)) { case 0 -> testNonChunkedPassthrough(); case 1 -> testKeepEncodingForNonEmpty(); + case 2 -> testDecodingFailurePassthrough(); default -> testRemoveEncodingFromEmpty(); } } From c02f1d2f1c288c8bef3a490fabff449df44656c5 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 3 Sep 2025 00:53:32 +0000 Subject: [PATCH 7/7] [CI] Auto commit changes from spotless --- .../http/netty4/Netty4IncrementalRequestHandlingIT.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index ec06f50f0326d..1a8f33f57a5db 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -11,7 +11,6 @@ import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; @@ -35,7 +34,6 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.LastHttpContent; -import io.netty.handler.stream.ChunkedInput; import io.netty.handler.stream.ChunkedStream; import io.netty.handler.stream.ChunkedWriteHandler;