Skip to content
Merged
5 changes: 5 additions & 0 deletions docs/changelog/133775.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 133775
summary: Remove Transfer-Encoding from HTTP request with no content
area: Network
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
Expand All @@ -34,6 +35,7 @@
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.stream.ChunkedInput;
import io.netty.handler.stream.ChunkedStream;
import io.netty.handler.stream.ChunkedWriteHandler;

Expand Down Expand Up @@ -392,6 +394,51 @@ public void testOversizedChunkedEncoding() throws Exception {
}
}

public void testEmptyChunkedEncoding() throws Exception {
try (var clientContext = newClientContext()) {
var opaqueId = clientContext.newOpaqueId();
final var emptyStream = new HttpChunkedInput(new ChunkedInput<>() {
@Override
public boolean isEndOfInput() throws Exception {
return true;
}

@Override
public void close() throws Exception {}

@Override
public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
return null;
}

@Override
public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
return null;
}

@Override
public long length() {
return 0;
}

@Override
public long progress() {
return 0;
}
}, LastHttpContent.EMPTY_LAST_CONTENT);
final var request = httpRequest(opaqueId, 0);
HttpUtil.setTransferEncodingChunked(request, true);
clientContext.channel().pipeline().addLast(new ChunkedWriteHandler());
clientContext.channel().writeAndFlush(request);
clientContext.channel().writeAndFlush(emptyStream);

var handler = clientContext.awaitRestChannelAccepted(opaqueId);
var restRequest = handler.restRequest;
assertFalse(restRequest.hasContent());
assertNull(restRequest.header("Transfer-Encoding"));
}
}

// ensures that we don't leak buffers in stream on 400-bad-request
// some bad requests are dispatched from rest-controller before reaching rest handler
// test relies on netty's buffer leak detection
Expand Down Expand Up @@ -733,15 +780,17 @@ Channel channel() {
static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkConsumer {
final SubscribableListener<Void> channelAccepted = new SubscribableListener<>();
final String opaqueId;
final RestRequest restRequest;
private final AtomicReference<ActionListener<Chunk>> nextChunkListenerRef = new AtomicReference<>();
final Netty4HttpRequestBodyStream stream;
RestChannel channel;
boolean receivedLastChunk = false;
final CountDownLatch closedLatch = new CountDownLatch(1);
volatile boolean shouldThrowInsideHandleChunk = false;

ServerRequestHandler(String opaqueId, Netty4HttpRequestBodyStream stream) {
ServerRequestHandler(String opaqueId, RestRequest restRequest, Netty4HttpRequestBodyStream stream) {
this.opaqueId = opaqueId;
this.restRequest = restRequest;
this.stream = stream;
}

Expand Down Expand Up @@ -934,7 +983,7 @@ public List<Route> routes() {
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
var stream = (Netty4HttpRequestBodyStream) request.contentStream();
var opaqueId = request.getHeaders().get(Task.X_OPAQUE_ID_HTTP_HEADER).get(0);
var handler = new ServerRequestHandler(opaqueId, stream);
var handler = new ServerRequestHandler(opaqueId, request, stream);
handlersByOpaqueId.getHandlerFor(opaqueId).onResponse(handler);
return handler;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.http.netty4;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;

public class Netty4EmptyChunkHandler extends ChannelInboundHandlerAdapter {

private HttpRequest currentRequest;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
switch (msg) {
case HttpRequest request -> {
if (request.decoderResult().isSuccess() && HttpUtil.isTransferEncodingChunked(request)) {
currentRequest = request;
ctx.read();
} else {
currentRequest = null;
ctx.fireChannelRead(request);
}
}
case HttpContent content -> {
if (currentRequest != null) {
if (content instanceof LastHttpContent && content.content().readableBytes() == 0) {
HttpUtil.setTransferEncodingChunked(currentRequest, false);
}
ctx.fireChannelRead(currentRequest);
ctx.fireChannelRead(content);
currentRequest = null;
} else {
ctx.fireChannelRead(content);
}
}
default -> ctx.fireChannelRead(msg);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ protected Result beginEncode(HttpResponse httpResponse, String acceptEncoding) t
if (ResourceLeakDetector.isEnabled()) {
ch.pipeline().addLast(new Netty4LeakDetectionHandler());
}
ch.pipeline().addLast(new Netty4EmptyChunkHandler());
// See https://github.com/netty/netty/issues/15053: the combination of FlowControlHandler and HttpContentDecompressor above
// can emit multiple chunks per read, but HttpBody.Stream requires chunks to arrive one-at-a-time so until that issue is
// resolved we must add another flow controller here:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.http.netty4;

import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;

import org.elasticsearch.test.ESTestCase;

public class Netty4EmptyChunkHandlerTests extends ESTestCase {

private EmbeddedChannel channel;

@Override
public void setUp() throws Exception {
super.setUp();
channel = new EmbeddedChannel(new Netty4EmptyChunkHandler());
channel.config().setAutoRead(false);
}

public void testNonChunkedPassthrough() {
var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
var content = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER);
channel.writeInbound(req, content);
assertEquals(req, channel.readInbound());
assertEquals(content, channel.readInbound());
}

public void testHoldChunkedRequest() {
var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
HttpUtil.setTransferEncodingChunked(req, true);
var readSniffer = new ReadSniffer();
channel.pipeline().addFirst(readSniffer);
channel.writeInbound(req);
assertNull("should hold on HTTP request until first chunk arrives", channel.readInbound());
assertEquals("must read first chunk when holding request", 1, readSniffer.readCount);
}

public void testRemoveEncodingFromEmpty() {
var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
HttpUtil.setTransferEncodingChunked(req, true);
var content = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER);
channel.writeInbound(req, content);
var recvReq = channel.readInbound();
assertEquals(req, recvReq);
assertEquals(content, channel.readInbound());
assertFalse("should remove Transfer-Encoding from empty content", HttpUtil.isTransferEncodingChunked((HttpMessage) recvReq));
}

public void testKeepEncodingForNonEmpty() {
var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
HttpUtil.setTransferEncodingChunked(req, true);
var content = new DefaultLastHttpContent(Unpooled.wrappedBuffer(randomByteArrayOfLength(between(1, 1024))));
channel.writeInbound(req, content);
var recvReq = channel.readInbound();
assertEquals(req, recvReq);
assertEquals(content, channel.readInbound());
assertTrue("should keep Transfer-Encoding for non-empty content", HttpUtil.isTransferEncodingChunked((HttpMessage) recvReq));
}

public void testRandomizedChannelReuse() {
for (int i = 0; i < 1000; i++) {
switch (between(0, 2)) {
case 0 -> testNonChunkedPassthrough();
case 1 -> testKeepEncodingForNonEmpty();
default -> testRemoveEncodingFromEmpty();
}
}
}
}