@@ -94,6 +94,64 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
9494
9595 private static final int MAX_CONTENT_LENGTH = ByteSizeUnit .MB .toIntBytes (50 );
9696
97+ private static long transportStatsRequestBytesSize (Ctx ctx ) {
98+ var httpTransport = internalCluster ().getInstance (HttpServerTransport .class , ctx .nodeName );
99+ var stats = httpTransport .stats ().clientStats ();
100+ var bytes = 0L ;
101+ for (var s : stats ) {
102+ bytes += s .requestSizeBytes ();
103+ }
104+ return bytes ;
105+ }
106+
107+ static int MBytes (int m ) {
108+ return m * 1024 * 1024 ;
109+ }
110+
111+ static <T > T safePoll (BlockingDeque <T > queue ) {
112+ try {
113+ var t = queue .poll (SAFE_AWAIT_TIMEOUT .seconds (), TimeUnit .SECONDS );
114+ assertNotNull ("queue is empty" , t );
115+ return t ;
116+ } catch (InterruptedException e ) {
117+ Thread .currentThread ().interrupt ();
118+ throw new AssertionError (e );
119+ }
120+ }
121+
122+ private static FullHttpRequest fullHttpRequest (String opaqueId , ByteBuf content ) {
123+ var req = new DefaultFullHttpRequest (HTTP_1_1 , POST , ControlServerRequestPlugin .ROUTE , Unpooled .wrappedBuffer (content ));
124+ req .headers ().add (CONTENT_LENGTH , content .readableBytes ());
125+ req .headers ().add (CONTENT_TYPE , APPLICATION_JSON );
126+ req .headers ().add (Task .X_OPAQUE_ID_HTTP_HEADER , opaqueId );
127+ return req ;
128+ }
129+
130+ private static HttpRequest httpRequest (String opaqueId , int contentLength ) {
131+ return httpRequest (ControlServerRequestPlugin .ROUTE , opaqueId , contentLength );
132+ }
133+
134+ private static HttpRequest httpRequest (String uri , String opaqueId , int contentLength ) {
135+ var req = new DefaultHttpRequest (HTTP_1_1 , POST , uri );
136+ req .headers ().add (CONTENT_LENGTH , contentLength );
137+ req .headers ().add (CONTENT_TYPE , APPLICATION_JSON );
138+ req .headers ().add (Task .X_OPAQUE_ID_HTTP_HEADER , opaqueId );
139+ return req ;
140+ }
141+
142+ private static HttpContent randomContent (int size , boolean isLast ) {
143+ var buf = Unpooled .wrappedBuffer (randomByteArrayOfLength (size ));
144+ if (isLast ) {
145+ return new DefaultLastHttpContent (buf );
146+ } else {
147+ return new DefaultHttpContent (buf );
148+ }
149+ }
150+
151+ private static ByteBuf randomByteBuf (int size ) {
152+ return Unpooled .wrappedBuffer (randomByteArrayOfLength (size ));
153+ }
154+
97155 @ Override
98156 protected Settings nodeSettings (int nodeOrdinal , Settings otherSettings ) {
99157 Settings .Builder builder = Settings .builder ().put (super .nodeSettings (nodeOrdinal , otherSettings ));
@@ -178,8 +236,6 @@ public void testClientConnectionCloseMidStream() throws Exception {
178236
179237 // await stream handler is ready and request full content
180238 var handler = ctx .awaitRestChannelAccepted (opaqueId );
181- assertBusy (() -> assertNotEquals (0 , handler .stream .bufSize ()));
182-
183239 assertFalse (handler .streamClosed );
184240
185241 // terminate client connection
@@ -190,10 +246,7 @@ public void testClientConnectionCloseMidStream() throws Exception {
190246 handler .stream .next ();
191247
192248 // wait for resources to be released
193- assertBusy (() -> {
194- assertEquals (0 , handler .stream .bufSize ());
195- assertTrue (handler .streamClosed );
196- });
249+ assertBusy (() -> assertTrue (handler .streamClosed ));
197250 }
198251 }
199252
@@ -208,15 +261,11 @@ public void testServerCloseConnectionMidStream() throws Exception {
208261
209262 // await stream handler is ready and request full content
210263 var handler = ctx .awaitRestChannelAccepted (opaqueId );
211- assertBusy (() -> assertNotEquals (0 , handler .stream .bufSize ()));
212264 assertFalse (handler .streamClosed );
213265
214266 // terminate connection on server and wait resources are released
215267 handler .channel .request ().getHttpChannel ().close ();
216- assertBusy (() -> {
217- assertEquals (0 , handler .stream .bufSize ());
218- assertTrue (handler .streamClosed );
219- });
268+ assertBusy (() -> assertTrue (handler .streamClosed ));
220269 }
221270 }
222271
@@ -230,16 +279,12 @@ public void testServerExceptionMidStream() throws Exception {
230279
231280 // await stream handler is ready and request full content
232281 var handler = ctx .awaitRestChannelAccepted (opaqueId );
233- assertBusy (() -> assertNotEquals (0 , handler .stream .bufSize ()));
234282 assertFalse (handler .streamClosed );
235283
236284 handler .shouldThrowInsideHandleChunk = true ;
237285 handler .stream .next ();
238286
239- assertBusy (() -> {
240- assertEquals (0 , handler .stream .bufSize ());
241- assertTrue (handler .streamClosed );
242- });
287+ assertBusy (() -> assertTrue (handler .streamClosed ));
243288 }
244289 }
245290
@@ -280,7 +325,7 @@ public void testClientBackpressure() throws Exception {
280325 });
281326 handler .readBytes (partSize );
282327 }
283- assertTrue (handler .stream . hasLast () );
328+ assertTrue (handler .recvLast );
284329 }
285330 }
286331
@@ -385,16 +430,6 @@ public void testBadRequestReleaseQueuedChunks() throws Exception {
385430 }
386431 }
387432
388- private static long transportStatsRequestBytesSize (Ctx ctx ) {
389- var httpTransport = internalCluster ().getInstance (HttpServerTransport .class , ctx .nodeName );
390- var stats = httpTransport .stats ().clientStats ();
391- var bytes = 0L ;
392- for (var s : stats ) {
393- bytes += s .requestSizeBytes ();
394- }
395- return bytes ;
396- }
397-
398433 /**
399434 * ensures that {@link org.elasticsearch.http.HttpClientStatsTracker} counts streamed content bytes
400435 */
@@ -489,63 +524,15 @@ private String opaqueId(int reqNo) {
489524 return getTestName () + "-" + reqNo ;
490525 }
491526
492- static int MBytes (int m ) {
493- return m * 1024 * 1024 ;
494- }
495-
496- static <T > T safePoll (BlockingDeque <T > queue ) {
497- try {
498- var t = queue .poll (SAFE_AWAIT_TIMEOUT .seconds (), TimeUnit .SECONDS );
499- assertNotNull ("queue is empty" , t );
500- return t ;
501- } catch (InterruptedException e ) {
502- Thread .currentThread ().interrupt ();
503- throw new AssertionError (e );
504- }
505- }
506-
507- static FullHttpRequest fullHttpRequest (String opaqueId , ByteBuf content ) {
508- var req = new DefaultFullHttpRequest (HTTP_1_1 , POST , ControlServerRequestPlugin .ROUTE , Unpooled .wrappedBuffer (content ));
509- req .headers ().add (CONTENT_LENGTH , content .readableBytes ());
510- req .headers ().add (CONTENT_TYPE , APPLICATION_JSON );
511- req .headers ().add (Task .X_OPAQUE_ID_HTTP_HEADER , opaqueId );
512- return req ;
513- }
514-
515- static HttpRequest httpRequest (String opaqueId , int contentLength ) {
516- return httpRequest (ControlServerRequestPlugin .ROUTE , opaqueId , contentLength );
517- }
518-
519- static HttpRequest httpRequest (String uri , String opaqueId , int contentLength ) {
520- var req = new DefaultHttpRequest (HTTP_1_1 , POST , uri );
521- req .headers ().add (CONTENT_LENGTH , contentLength );
522- req .headers ().add (CONTENT_TYPE , APPLICATION_JSON );
523- req .headers ().add (Task .X_OPAQUE_ID_HTTP_HEADER , opaqueId );
524- return req ;
525- }
526-
527- static HttpContent randomContent (int size , boolean isLast ) {
528- var buf = Unpooled .wrappedBuffer (randomByteArrayOfLength (size ));
529- if (isLast ) {
530- return new DefaultLastHttpContent (buf );
531- } else {
532- return new DefaultHttpContent (buf );
533- }
534- }
535-
536- static ByteBuf randomByteBuf (int size ) {
537- return Unpooled .wrappedBuffer (randomByteArrayOfLength (size ));
538- }
539-
540- Ctx setupClientCtx () throws Exception {
527+ private Ctx setupClientCtx () throws Exception {
541528 var nodeName = internalCluster ().getRandomNodeName ();
542529 var clientRespQueue = new LinkedBlockingDeque <>(16 );
543530 var bootstrap = bootstrapClient (nodeName , clientRespQueue );
544531 var channel = bootstrap .connect ().sync ().channel ();
545532 return new Ctx (getTestName (), nodeName , bootstrap , channel , clientRespQueue );
546533 }
547534
548- Bootstrap bootstrapClient (String node , BlockingQueue <Object > queue ) {
535+ private Bootstrap bootstrapClient (String node , BlockingQueue <Object > queue ) {
549536 var httpServer = internalCluster ().getInstance (HttpServerTransport .class , node );
550537 var remoteAddr = randomFrom (httpServer .boundAddress ().boundAddresses ());
551538 return new Bootstrap ().group (new NioEventLoopGroup (1 ))
@@ -583,9 +570,13 @@ protected boolean addMockHttpTransport() {
583570 return false ; // enable http
584571 }
585572
586- record Ctx (String testName , String nodeName , Bootstrap clientBootstrap , Channel clientChannel , BlockingDeque <Object > clientRespQueue )
587- implements
588- AutoCloseable {
573+ private record Ctx (
574+ String testName ,
575+ String nodeName ,
576+ Bootstrap clientBootstrap ,
577+ Channel clientChannel ,
578+ BlockingDeque <Object > clientRespQueue
579+ ) implements AutoCloseable {
589580
590581 @ Override
591582 public void close () throws Exception {
@@ -610,7 +601,7 @@ ServerRequestHandler awaitRestChannelAccepted(String opaqueId) throws Exception
610601 }
611602 }
612603
613- static class ServerRequestHandler implements BaseRestHandler .RequestBodyChunkConsumer {
604+ private static class ServerRequestHandler implements BaseRestHandler .RequestBodyChunkConsumer {
614605 final SubscribableListener <Void > channelAccepted = new SubscribableListener <>();
615606 final String opaqueId ;
616607 final BlockingDeque <Chunk > recvChunks = new LinkedBlockingDeque <>();
0 commit comments