5454import org .elasticsearch .common .util .CollectionUtils ;
5555import org .elasticsearch .features .NodeFeature ;
5656import org .elasticsearch .http .HttpBodyTracer ;
57+ import org .elasticsearch .http .HttpHandlingSettings ;
5758import org .elasticsearch .http .HttpServerTransport ;
5859import org .elasticsearch .http .HttpTransportSettings ;
5960import org .elasticsearch .plugins .ActionPlugin ;
9293@ ESIntegTestCase .ClusterScope (numDataNodes = 1 )
9394public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
9495
95- private static final int MAX_CONTENT_LENGTH = ByteSizeUnit .MB .toIntBytes (50 );
96-
9796 @ Override
9897 protected Settings nodeSettings (int nodeOrdinal , Settings otherSettings ) {
9998 Settings .Builder builder = Settings .builder ().put (super .nodeSettings (nodeOrdinal , otherSettings ));
100- builder .put (
101- HttpTransportSettings .SETTING_HTTP_MAX_CONTENT_LENGTH .getKey (),
102- ByteSizeValue .of (MAX_CONTENT_LENGTH , ByteSizeUnit .BYTES )
103- );
99+ builder .put (HttpTransportSettings .SETTING_HTTP_MAX_CONTENT_LENGTH .getKey (), ByteSizeValue .of (50 , ByteSizeUnit .MB ));
104100 return builder .build ();
105101 }
106102
@@ -139,7 +135,7 @@ public void testReceiveAllChunks() throws Exception {
139135 var opaqueId = opaqueId (reqNo );
140136
141137 // this dataset will be compared with one on server side
142- var dataSize = randomIntBetween (1024 , MAX_CONTENT_LENGTH );
138+ var dataSize = randomIntBetween (1024 , maxContentLength () );
143139 var sendData = Unpooled .wrappedBuffer (randomByteArrayOfLength (dataSize ));
144140 sendData .retain ();
145141 ctx .clientChannel .writeAndFlush (fullHttpRequest (opaqueId , sendData ));
@@ -247,7 +243,7 @@ public void testServerExceptionMidStream() throws Exception {
247243 public void testClientBackpressure () throws Exception {
248244 try (var ctx = setupClientCtx ()) {
249245 var opaqueId = opaqueId (0 );
250- var payloadSize = MAX_CONTENT_LENGTH ;
246+ var payloadSize = maxContentLength () ;
251247 var totalParts = 10 ;
252248 var partSize = payloadSize / totalParts ;
253249 ctx .clientChannel .writeAndFlush (httpRequest (opaqueId , payloadSize ));
@@ -289,7 +285,7 @@ public void test100Continue() throws Exception {
289285 try (var ctx = setupClientCtx ()) {
290286 for (int reqNo = 0 ; reqNo < randomIntBetween (2 , 10 ); reqNo ++) {
291287 var id = opaqueId (reqNo );
292- var acceptableContentLength = randomIntBetween (0 , MAX_CONTENT_LENGTH );
288+ var acceptableContentLength = randomIntBetween (0 , maxContentLength () );
293289
294290 // send request header and await 100-continue
295291 var req = httpRequest (id , acceptableContentLength );
@@ -321,7 +317,7 @@ public void test413TooLargeOnExpect100Continue() throws Exception {
321317 try (var ctx = setupClientCtx ()) {
322318 for (int reqNo = 0 ; reqNo < randomIntBetween (2 , 10 ); reqNo ++) {
323319 var id = opaqueId (reqNo );
324- var oversized = MAX_CONTENT_LENGTH + 1 ;
320+ var oversized = maxContentLength () + 1 ;
325321
326322 // send request header and await 413 too large
327323 var req = httpRequest (id , oversized );
@@ -337,28 +333,32 @@ public void test413TooLargeOnExpect100Continue() throws Exception {
337333 }
338334 }
339335
340- // ensures that oversized chunked encoded request has maxContentLength limit and returns 413
341- public void testOversizedChunkedEncoding () throws Exception {
336+ // ensures that oversized chunked encoded request has no limits at http layer
337+ // rest handler is responsible for oversized requests
338+ public void testOversizedChunkedEncodingNoLimits () throws Exception {
342339 try (var ctx = setupClientCtx ()) {
343- var id = opaqueId (0 );
344- var contentSize = MAX_CONTENT_LENGTH + 1 ;
345- var content = randomByteArrayOfLength (contentSize );
346- var is = new ByteBufInputStream (Unpooled .wrappedBuffer (content ));
347- var chunkedIs = new ChunkedStream (is );
348- var httpChunkedIs = new HttpChunkedInput (chunkedIs , LastHttpContent .EMPTY_LAST_CONTENT );
349- var req = httpRequest (id , 0 );
350- HttpUtil .setTransferEncodingChunked (req , true );
351-
352- ctx .clientChannel .pipeline ().addLast (new ChunkedWriteHandler ());
353- ctx .clientChannel .writeAndFlush (req );
354- ctx .clientChannel .writeAndFlush (httpChunkedIs );
355- var handler = ctx .awaitRestChannelAccepted (id );
356- var consumed = handler .readAllBytes ();
357- assertTrue (consumed <= MAX_CONTENT_LENGTH );
358-
359- var resp = (FullHttpResponse ) safePoll (ctx .clientRespQueue );
360- assertEquals (HttpResponseStatus .REQUEST_ENTITY_TOO_LARGE , resp .status ());
361- resp .release ();
340+ for (var reqNo = 0 ; reqNo < randomIntBetween (2 , 10 ); reqNo ++) {
341+ var id = opaqueId (reqNo );
342+ var contentSize = maxContentLength () + 1 ;
343+ var content = randomByteArrayOfLength (contentSize );
344+ var is = new ByteBufInputStream (Unpooled .wrappedBuffer (content ));
345+ var chunkedIs = new ChunkedStream (is );
346+ var httpChunkedIs = new HttpChunkedInput (chunkedIs , LastHttpContent .EMPTY_LAST_CONTENT );
347+ var req = httpRequest (id , 0 );
348+ HttpUtil .setTransferEncodingChunked (req , true );
349+
350+ ctx .clientChannel .pipeline ().addLast (new ChunkedWriteHandler ());
351+ ctx .clientChannel .writeAndFlush (req );
352+ ctx .clientChannel .writeAndFlush (httpChunkedIs );
353+ var handler = ctx .awaitRestChannelAccepted (id );
354+ var consumed = handler .readAllBytes ();
355+ assertEquals (contentSize , consumed );
356+ handler .sendResponse (new RestResponse (RestStatus .OK , "" ));
357+
358+ var resp = (FullHttpResponse ) safePoll (ctx .clientRespQueue );
359+ assertEquals (HttpResponseStatus .OK , resp .status ());
360+ resp .release ();
361+ }
362362 }
363363 }
364364
@@ -369,7 +369,7 @@ public void testBadRequestReleaseQueuedChunks() throws Exception {
369369 try (var ctx = setupClientCtx ()) {
370370 for (var reqNo = 0 ; reqNo < randomIntBetween (2 , 10 ); reqNo ++) {
371371 var id = opaqueId (reqNo );
372- var contentSize = randomIntBetween (0 , MAX_CONTENT_LENGTH );
372+ var contentSize = randomIntBetween (0 , maxContentLength () );
373373 var req = httpRequest (id , contentSize );
374374 var content = randomContent (contentSize , true );
375375
@@ -405,7 +405,7 @@ public void testHttpClientStats() throws Exception {
405405
406406 for (var reqNo = 0 ; reqNo < randomIntBetween (2 , 10 ); reqNo ++) {
407407 var id = opaqueId (reqNo );
408- var contentSize = randomIntBetween (0 , MAX_CONTENT_LENGTH );
408+ var contentSize = randomIntBetween (0 , maxContentLength () );
409409 totalBytesSent += contentSize ;
410410 ctx .clientChannel .writeAndFlush (httpRequest (id , contentSize ));
411411 ctx .clientChannel .writeAndFlush (randomContent (contentSize , true ));
@@ -485,6 +485,10 @@ private void assertHttpBodyLogging(Function<Ctx, Runnable> test) throws Exceptio
485485 }
486486 }
487487
488+ private int maxContentLength () {
489+ return HttpHandlingSettings .fromSettings (internalCluster ().getInstance (Settings .class )).maxContentLength ();
490+ }
491+
488492 private String opaqueId (int reqNo ) {
489493 return getTestName () + "-" + reqNo ;
490494 }
@@ -654,22 +658,14 @@ void sendResponse(RestResponse response) {
654658 int readBytes (int bytes ) {
655659 var consumed = 0 ;
656660 if (recvLast == false ) {
657- stream .next ();
658- while (consumed < bytes && streamClosed == false ) {
659- try {
660- var recvChunk = recvChunks .poll (10 , TimeUnit .MILLISECONDS );
661- if (recvChunk != null ) {
662- consumed += recvChunk .chunk .length ();
663- recvChunk .chunk .close ();
664- if (recvChunk .isLast ) {
665- recvLast = true ;
666- break ;
667- }
668- stream .next ();
669- }
670- } catch (InterruptedException e ) {
671- Thread .currentThread ().interrupt ();
672- throw new AssertionError (e );
661+ while (consumed < bytes ) {
662+ stream .next ();
663+ var recvChunk = safePoll (recvChunks );
664+ consumed += recvChunk .chunk .length ();
665+ recvChunk .chunk .close ();
666+ if (recvChunk .isLast ) {
667+ recvLast = true ;
668+ break ;
673669 }
674670 }
675671 }
0 commit comments