| 
85 | 85 | import org.elasticsearch.transport.netty4.Netty4Utils;  | 
86 | 86 | import org.elasticsearch.xcontent.json.JsonXContent;  | 
87 | 87 | 
 
  | 
 | 88 | +import java.io.InputStream;  | 
88 | 89 | import java.nio.channels.ClosedChannelException;  | 
89 | 90 | import java.nio.charset.StandardCharsets;  | 
90 | 91 | import java.util.Collection;  | 
@@ -392,6 +393,23 @@ public void testOversizedChunkedEncoding() throws Exception {  | 
392 | 393 |         }  | 
393 | 394 |     }  | 
394 | 395 | 
 
  | 
 | 396 | +    public void testEmptyChunkedEncoding() throws Exception {  | 
 | 397 | +        try (var clientContext = newClientContext()) {  | 
 | 398 | +            var opaqueId = clientContext.newOpaqueId();  | 
 | 399 | +            final var emptyStream = new HttpChunkedInput(new ChunkedStream(InputStream.nullInputStream()));  | 
 | 400 | +            final var request = httpRequest(opaqueId, 0);  | 
 | 401 | +            HttpUtil.setTransferEncodingChunked(request, true);  | 
 | 402 | +            clientContext.channel().pipeline().addLast(new ChunkedWriteHandler());  | 
 | 403 | +            clientContext.channel().writeAndFlush(request);  | 
 | 404 | +            clientContext.channel().writeAndFlush(emptyStream);  | 
 | 405 | + | 
 | 406 | +            var handler = clientContext.awaitRestChannelAccepted(opaqueId);  | 
 | 407 | +            var restRequest = handler.restRequest;  | 
 | 408 | +            assertFalse(restRequest.hasContent());  | 
 | 409 | +            assertNull(restRequest.header("Transfer-Encoding"));  | 
 | 410 | +        }  | 
 | 411 | +    }  | 
 | 412 | + | 
395 | 413 |     // ensures that we don't leak buffers in stream on 400-bad-request  | 
396 | 414 |     // some bad requests are dispatched from rest-controller before reaching rest handler  | 
397 | 415 |     // test relies on netty's buffer leak detection  | 
@@ -733,15 +751,17 @@ Channel channel() {  | 
733 | 751 |     static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkConsumer {  | 
734 | 752 |         final SubscribableListener<Void> channelAccepted = new SubscribableListener<>();  | 
735 | 753 |         final String opaqueId;  | 
 | 754 | +        final RestRequest restRequest;  | 
736 | 755 |         private final AtomicReference<ActionListener<Chunk>> nextChunkListenerRef = new AtomicReference<>();  | 
737 | 756 |         final Netty4HttpRequestBodyStream stream;  | 
738 | 757 |         RestChannel channel;  | 
739 | 758 |         boolean receivedLastChunk = false;  | 
740 | 759 |         final CountDownLatch closedLatch = new CountDownLatch(1);  | 
741 | 760 |         volatile boolean shouldThrowInsideHandleChunk = false;  | 
742 | 761 | 
 
  | 
743 |  | -        ServerRequestHandler(String opaqueId, Netty4HttpRequestBodyStream stream) {  | 
 | 762 | +        ServerRequestHandler(String opaqueId, RestRequest restRequest, Netty4HttpRequestBodyStream stream) {  | 
744 | 763 |             this.opaqueId = opaqueId;  | 
 | 764 | +            this.restRequest = restRequest;  | 
745 | 765 |             this.stream = stream;  | 
746 | 766 |         }  | 
747 | 767 | 
 
  | 
@@ -934,7 +954,7 @@ public List<Route> routes() {  | 
934 | 954 |                 protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {  | 
935 | 955 |                     var stream = (Netty4HttpRequestBodyStream) request.contentStream();  | 
936 | 956 |                     var opaqueId = request.getHeaders().get(Task.X_OPAQUE_ID_HTTP_HEADER).get(0);  | 
937 |  | -                    var handler = new ServerRequestHandler(opaqueId, stream);  | 
 | 957 | +                    var handler = new ServerRequestHandler(opaqueId, request, stream);  | 
938 | 958 |                     handlersByOpaqueId.getHandlerFor(opaqueId).onResponse(handler);  | 
939 | 959 |                     return handler;  | 
940 | 960 |                 }  | 
 | 
0 commit comments