diff --git a/docs/changelog/133775.yaml b/docs/changelog/133775.yaml new file mode 100644 index 0000000000000..1bdecce846d9f --- /dev/null +++ b/docs/changelog/133775.yaml @@ -0,0 +1,5 @@ +pr: 133775 +summary: Remove Transfer-Encoding from HTTP request with no content +area: Network +type: bug +issues: [] diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index 1f783a0c30d4c..1a8f33f57a5db 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -85,6 +85,7 @@ import org.elasticsearch.transport.netty4.Netty4Utils; import org.elasticsearch.xcontent.json.JsonXContent; +import java.io.InputStream; import java.nio.channels.ClosedChannelException; import java.nio.charset.StandardCharsets; import java.util.Collection; @@ -392,6 +393,23 @@ public void testOversizedChunkedEncoding() throws Exception { } } + public void testEmptyChunkedEncoding() throws Exception { + try (var clientContext = newClientContext()) { + var opaqueId = clientContext.newOpaqueId(); + final var emptyStream = new HttpChunkedInput(new ChunkedStream(InputStream.nullInputStream())); + final var request = httpRequest(opaqueId, 0); + HttpUtil.setTransferEncodingChunked(request, true); + clientContext.channel().pipeline().addLast(new ChunkedWriteHandler()); + clientContext.channel().writeAndFlush(request); + clientContext.channel().writeAndFlush(emptyStream); + + var handler = clientContext.awaitRestChannelAccepted(opaqueId); + var restRequest = handler.restRequest; + assertFalse(restRequest.hasContent()); + assertNull(restRequest.header("Transfer-Encoding")); + } + } + // ensures that we don't leak buffers in stream on 400-bad-request // some bad requests are dispatched from rest-controller before reaching rest handler // test relies on netty's buffer leak detection @@ -733,6 +751,7 @@ Channel channel() { static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkConsumer { final SubscribableListener channelAccepted = new SubscribableListener<>(); final String opaqueId; + final RestRequest restRequest; private final AtomicReference> nextChunkListenerRef = new AtomicReference<>(); final Netty4HttpRequestBodyStream stream; RestChannel channel; @@ -740,8 +759,9 @@ static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkCon final CountDownLatch closedLatch = new CountDownLatch(1); volatile boolean shouldThrowInsideHandleChunk = false; - ServerRequestHandler(String opaqueId, Netty4HttpRequestBodyStream stream) { + ServerRequestHandler(String opaqueId, RestRequest restRequest, Netty4HttpRequestBodyStream stream) { this.opaqueId = opaqueId; + this.restRequest = restRequest; this.stream = stream; } @@ -934,7 +954,7 @@ public List routes() { protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { var stream = (Netty4HttpRequestBodyStream) request.contentStream(); var opaqueId = request.getHeaders().get(Task.X_OPAQUE_ID_HTTP_HEADER).get(0); - var handler = new ServerRequestHandler(opaqueId, stream); + var handler = new ServerRequestHandler(opaqueId, request, stream); handlersByOpaqueId.getHandlerFor(opaqueId).onResponse(handler); return handler; } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandler.java new file mode 100644 index 0000000000000..045767cf41c63 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandler.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.http.netty4; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.LastHttpContent; + +public class Netty4EmptyChunkHandler extends ChannelInboundHandlerAdapter { + + private HttpRequest currentRequest; + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + switch (msg) { + case HttpRequest request -> { + if (request.decoderResult().isSuccess() && HttpUtil.isTransferEncodingChunked(request)) { + currentRequest = request; + ctx.read(); + } else { + currentRequest = null; + ctx.fireChannelRead(request); + } + } + case HttpContent content -> { + if (currentRequest != null) { + if (content instanceof LastHttpContent && content.content().readableBytes() == 0) { + HttpUtil.setTransferEncodingChunked(currentRequest, false); + } + ctx.fireChannelRead(currentRequest); + ctx.fireChannelRead(content); + currentRequest = null; + } else { + ctx.fireChannelRead(content); + } + } + default -> ctx.fireChannelRead(msg); + } + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 5ea454534563b..def517d21f91e 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -414,6 +414,7 @@ protected Result beginEncode(HttpResponse httpResponse, String acceptEncoding) t if (ResourceLeakDetector.isEnabled()) { ch.pipeline().addLast(new Netty4LeakDetectionHandler()); } + ch.pipeline().addLast(new Netty4EmptyChunkHandler()); // See https://github.com/netty/netty/issues/15053: the combination of FlowControlHandler and HttpContentDecompressor above // can emit multiple chunks per read, but HttpBody.Stream requires chunks to arrive one-at-a-time so until that issue is // resolved we must add another flow controller here: diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandlerTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandlerTests.java new file mode 100644 index 0000000000000..f6691ae204a2e --- /dev/null +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandlerTests.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.http.netty4; + +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.DecoderResult; +import io.netty.handler.codec.http.DefaultHttpRequest; +import io.netty.handler.codec.http.DefaultLastHttpContent; +import io.netty.handler.codec.http.HttpMessage; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.HttpVersion; + +import org.elasticsearch.test.ESTestCase; + +public class Netty4EmptyChunkHandlerTests extends ESTestCase { + + private EmbeddedChannel channel; + + @Override + public void setUp() throws Exception { + super.setUp(); + channel = new EmbeddedChannel(new Netty4EmptyChunkHandler()); + channel.config().setAutoRead(false); + } + + public void testNonChunkedPassthrough() { + var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, ""); + var content = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER); + channel.writeInbound(req, content); + assertEquals(req, channel.readInbound()); + assertEquals(content, channel.readInbound()); + } + + public void testDecodingFailurePassthrough() { + var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, ""); + HttpUtil.setTransferEncodingChunked(req, true); + req.setDecoderResult(DecoderResult.failure(new Exception())); + channel.writeInbound(req); + var recvReq = (HttpRequest) channel.readInbound(); + assertTrue(recvReq.decoderResult().isFailure()); + assertTrue(HttpUtil.isTransferEncodingChunked(recvReq)); + } + + public void testHoldChunkedRequest() { + var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, ""); + HttpUtil.setTransferEncodingChunked(req, true); + var readSniffer = new ReadSniffer(); + channel.pipeline().addFirst(readSniffer); + channel.writeInbound(req); + assertNull("should hold on HTTP request until first chunk arrives", channel.readInbound()); + assertEquals("must read first chunk when holding request", 1, readSniffer.readCount); + } + + public void testRemoveEncodingFromEmpty() { + var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, ""); + HttpUtil.setTransferEncodingChunked(req, true); + var content = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER); + channel.writeInbound(req, content); + var recvReq = channel.readInbound(); + assertEquals(req, recvReq); + assertEquals(content, channel.readInbound()); + assertFalse("should remove Transfer-Encoding from empty content", HttpUtil.isTransferEncodingChunked((HttpMessage) recvReq)); + } + + public void testKeepEncodingForNonEmpty() { + var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, ""); + HttpUtil.setTransferEncodingChunked(req, true); + var content = new DefaultLastHttpContent(Unpooled.wrappedBuffer(randomByteArrayOfLength(between(1, 1024)))); + channel.writeInbound(req, content); + var recvReq = channel.readInbound(); + assertEquals(req, recvReq); + assertEquals(content, channel.readInbound()); + assertTrue("should keep Transfer-Encoding for non-empty content", HttpUtil.isTransferEncodingChunked((HttpMessage) recvReq)); + } + + public void testRandomizedChannelReuse() { + for (int i = 0; i < 1000; i++) { + switch (between(0, 3)) { + case 0 -> testNonChunkedPassthrough(); + case 1 -> testKeepEncodingForNonEmpty(); + case 2 -> testDecodingFailurePassthrough(); + default -> testRemoveEncodingFromEmpty(); + } + } + } +}