@@ -94,64 +94,6 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
9494
9595 private static final int MAX_CONTENT_LENGTH = ByteSizeUnit .MB .toIntBytes (50 );
9696
97- private static long transportStatsRequestBytesSize (Ctx ctx ) {
98- var httpTransport = internalCluster ().getInstance (HttpServerTransport .class , ctx .nodeName );
99- var stats = httpTransport .stats ().clientStats ();
100- var bytes = 0L ;
101- for (var s : stats ) {
102- bytes += s .requestSizeBytes ();
103- }
104- return bytes ;
105- }
106-
107- static int MBytes (int m ) {
108- return m * 1024 * 1024 ;
109- }
110-
111- static <T > T safePoll (BlockingDeque <T > queue ) {
112- try {
113- var t = queue .poll (SAFE_AWAIT_TIMEOUT .seconds (), TimeUnit .SECONDS );
114- assertNotNull ("queue is empty" , t );
115- return t ;
116- } catch (InterruptedException e ) {
117- Thread .currentThread ().interrupt ();
118- throw new AssertionError (e );
119- }
120- }
121-
122- private static FullHttpRequest fullHttpRequest (String opaqueId , ByteBuf content ) {
123- var req = new DefaultFullHttpRequest (HTTP_1_1 , POST , ControlServerRequestPlugin .ROUTE , Unpooled .wrappedBuffer (content ));
124- req .headers ().add (CONTENT_LENGTH , content .readableBytes ());
125- req .headers ().add (CONTENT_TYPE , APPLICATION_JSON );
126- req .headers ().add (Task .X_OPAQUE_ID_HTTP_HEADER , opaqueId );
127- return req ;
128- }
129-
130- private static HttpRequest httpRequest (String opaqueId , int contentLength ) {
131- return httpRequest (ControlServerRequestPlugin .ROUTE , opaqueId , contentLength );
132- }
133-
134- private static HttpRequest httpRequest (String uri , String opaqueId , int contentLength ) {
135- var req = new DefaultHttpRequest (HTTP_1_1 , POST , uri );
136- req .headers ().add (CONTENT_LENGTH , contentLength );
137- req .headers ().add (CONTENT_TYPE , APPLICATION_JSON );
138- req .headers ().add (Task .X_OPAQUE_ID_HTTP_HEADER , opaqueId );
139- return req ;
140- }
141-
142- private static HttpContent randomContent (int size , boolean isLast ) {
143- var buf = Unpooled .wrappedBuffer (randomByteArrayOfLength (size ));
144- if (isLast ) {
145- return new DefaultLastHttpContent (buf );
146- } else {
147- return new DefaultHttpContent (buf );
148- }
149- }
150-
151- private static ByteBuf randomByteBuf (int size ) {
152- return Unpooled .wrappedBuffer (randomByteArrayOfLength (size ));
153- }
154-
15597 @ Override
15698 protected Settings nodeSettings (int nodeOrdinal , Settings otherSettings ) {
15799 Settings .Builder builder = Settings .builder ().put (super .nodeSettings (nodeOrdinal , otherSettings ));
@@ -236,6 +178,8 @@ public void testClientConnectionCloseMidStream() throws Exception {
236178
237179 // await stream handler is ready and request full content
238180 var handler = ctx .awaitRestChannelAccepted (opaqueId );
181+ assertBusy (() -> assertNotEquals (0 , handler .stream .bufSize ()));
182+
239183 assertFalse (handler .streamClosed );
240184
241185 // terminate client connection
@@ -246,7 +190,10 @@ public void testClientConnectionCloseMidStream() throws Exception {
246190 handler .stream .next ();
247191
248192 // wait for resources to be released
249- assertBusy (() -> assertTrue (handler .streamClosed ));
193+ assertBusy (() -> {
194+ assertEquals (0 , handler .stream .bufSize ());
195+ assertTrue (handler .streamClosed );
196+ });
250197 }
251198 }
252199
@@ -261,11 +208,15 @@ public void testServerCloseConnectionMidStream() throws Exception {
261208
262209 // await stream handler is ready and request full content
263210 var handler = ctx .awaitRestChannelAccepted (opaqueId );
211+ assertBusy (() -> assertNotEquals (0 , handler .stream .bufSize ()));
264212 assertFalse (handler .streamClosed );
265213
266214 // terminate connection on server and wait resources are released
267215 handler .channel .request ().getHttpChannel ().close ();
268- assertBusy (() -> assertTrue (handler .streamClosed ));
216+ assertBusy (() -> {
217+ assertEquals (0 , handler .stream .bufSize ());
218+ assertTrue (handler .streamClosed );
219+ });
269220 }
270221 }
271222
@@ -279,12 +230,16 @@ public void testServerExceptionMidStream() throws Exception {
279230
280231 // await stream handler is ready and request full content
281232 var handler = ctx .awaitRestChannelAccepted (opaqueId );
233+ assertBusy (() -> assertNotEquals (0 , handler .stream .bufSize ()));
282234 assertFalse (handler .streamClosed );
283235
284236 handler .shouldThrowInsideHandleChunk = true ;
285237 handler .stream .next ();
286238
287- assertBusy (() -> assertTrue (handler .streamClosed ));
239+ assertBusy (() -> {
240+ assertEquals (0 , handler .stream .bufSize ());
241+ assertTrue (handler .streamClosed );
242+ });
288243 }
289244 }
290245
@@ -325,7 +280,7 @@ public void testClientBackpressure() throws Exception {
325280 });
326281 handler .readBytes (partSize );
327282 }
328- assertTrue (handler .recvLast );
283+ assertTrue (handler .stream . hasLast () );
329284 }
330285 }
331286
@@ -430,6 +385,16 @@ public void testBadRequestReleaseQueuedChunks() throws Exception {
430385 }
431386 }
432387
388+ private static long transportStatsRequestBytesSize (Ctx ctx ) {
389+ var httpTransport = internalCluster ().getInstance (HttpServerTransport .class , ctx .nodeName );
390+ var stats = httpTransport .stats ().clientStats ();
391+ var bytes = 0L ;
392+ for (var s : stats ) {
393+ bytes += s .requestSizeBytes ();
394+ }
395+ return bytes ;
396+ }
397+
433398 /**
434399 * ensures that {@link org.elasticsearch.http.HttpClientStatsTracker} counts streamed content bytes
435400 */
@@ -524,15 +489,63 @@ private String opaqueId(int reqNo) {
524489 return getTestName () + "-" + reqNo ;
525490 }
526491
527- private Ctx setupClientCtx () throws Exception {
492+ static int MBytes (int m ) {
493+ return m * 1024 * 1024 ;
494+ }
495+
496+ static <T > T safePoll (BlockingDeque <T > queue ) {
497+ try {
498+ var t = queue .poll (SAFE_AWAIT_TIMEOUT .seconds (), TimeUnit .SECONDS );
499+ assertNotNull ("queue is empty" , t );
500+ return t ;
501+ } catch (InterruptedException e ) {
502+ Thread .currentThread ().interrupt ();
503+ throw new AssertionError (e );
504+ }
505+ }
506+
507+ static FullHttpRequest fullHttpRequest (String opaqueId , ByteBuf content ) {
508+ var req = new DefaultFullHttpRequest (HTTP_1_1 , POST , ControlServerRequestPlugin .ROUTE , Unpooled .wrappedBuffer (content ));
509+ req .headers ().add (CONTENT_LENGTH , content .readableBytes ());
510+ req .headers ().add (CONTENT_TYPE , APPLICATION_JSON );
511+ req .headers ().add (Task .X_OPAQUE_ID_HTTP_HEADER , opaqueId );
512+ return req ;
513+ }
514+
515+ static HttpRequest httpRequest (String opaqueId , int contentLength ) {
516+ return httpRequest (ControlServerRequestPlugin .ROUTE , opaqueId , contentLength );
517+ }
518+
519+ static HttpRequest httpRequest (String uri , String opaqueId , int contentLength ) {
520+ var req = new DefaultHttpRequest (HTTP_1_1 , POST , uri );
521+ req .headers ().add (CONTENT_LENGTH , contentLength );
522+ req .headers ().add (CONTENT_TYPE , APPLICATION_JSON );
523+ req .headers ().add (Task .X_OPAQUE_ID_HTTP_HEADER , opaqueId );
524+ return req ;
525+ }
526+
527+ static HttpContent randomContent (int size , boolean isLast ) {
528+ var buf = Unpooled .wrappedBuffer (randomByteArrayOfLength (size ));
529+ if (isLast ) {
530+ return new DefaultLastHttpContent (buf );
531+ } else {
532+ return new DefaultHttpContent (buf );
533+ }
534+ }
535+
536+ static ByteBuf randomByteBuf (int size ) {
537+ return Unpooled .wrappedBuffer (randomByteArrayOfLength (size ));
538+ }
539+
540+ Ctx setupClientCtx () throws Exception {
528541 var nodeName = internalCluster ().getRandomNodeName ();
529542 var clientRespQueue = new LinkedBlockingDeque <>(16 );
530543 var bootstrap = bootstrapClient (nodeName , clientRespQueue );
531544 var channel = bootstrap .connect ().sync ().channel ();
532545 return new Ctx (getTestName (), nodeName , bootstrap , channel , clientRespQueue );
533546 }
534547
535- private Bootstrap bootstrapClient (String node , BlockingQueue <Object > queue ) {
548+ Bootstrap bootstrapClient (String node , BlockingQueue <Object > queue ) {
536549 var httpServer = internalCluster ().getInstance (HttpServerTransport .class , node );
537550 var remoteAddr = randomFrom (httpServer .boundAddress ().boundAddresses ());
538551 return new Bootstrap ().group (new NioEventLoopGroup (1 ))
@@ -570,13 +583,9 @@ protected boolean addMockHttpTransport() {
570583 return false ; // enable http
571584 }
572585
573- private record Ctx (
574- String testName ,
575- String nodeName ,
576- Bootstrap clientBootstrap ,
577- Channel clientChannel ,
578- BlockingDeque <Object > clientRespQueue
579- ) implements AutoCloseable {
586+ record Ctx (String testName , String nodeName , Bootstrap clientBootstrap , Channel clientChannel , BlockingDeque <Object > clientRespQueue )
587+ implements
588+ AutoCloseable {
580589
581590 @ Override
582591 public void close () throws Exception {
@@ -601,7 +610,7 @@ ServerRequestHandler awaitRestChannelAccepted(String opaqueId) throws Exception
601610 }
602611 }
603612
604- private static class ServerRequestHandler implements BaseRestHandler .RequestBodyChunkConsumer {
613+ static class ServerRequestHandler implements BaseRestHandler .RequestBodyChunkConsumer {
605614 final SubscribableListener <Void > channelAccepted = new SubscribableListener <>();
606615 final String opaqueId ;
607616 final BlockingDeque <Chunk > recvChunks = new LinkedBlockingDeque <>();
0 commit comments