1919import java .io .ByteArrayOutputStream ;
2020import java .io .IOException ;
2121import java .io .InputStream ;
22+ import java .io .UncheckedIOException ;
2223import java .lang .reflect .Method ;
2324import java .net .ConnectException ;
2425import java .net .InetAddress ;
7677import org .springframework .messaging .MessagingException ;
7778import org .springframework .messaging .support .ErrorMessage ;
7879import org .springframework .scheduling .concurrent .ThreadPoolTaskExecutor ;
80+ import org .springframework .util .ReflectionUtils ;
7981import org .springframework .util .StopWatch ;
8082
8183import static org .assertj .core .api .Assertions .assertThat ;
84+ import static org .assertj .core .api .Assertions .assertThatExceptionOfType ;
85+ import static org .assertj .core .api .Assertions .assertThatIllegalArgumentException ;
8286import static org .assertj .core .api .Assertions .fail ;
8387import static org .awaitility .Awaitility .await ;
8488import static org .awaitility .Awaitility .with ;
@@ -125,10 +129,10 @@ public void testWriteTimeout(TestInfo testInfo) throws Exception {
125129 s .close ();
126130 }
127131 catch (Exception e ) {
128- e . printStackTrace ( );
132+ ReflectionUtils . rethrowRuntimeException ( e );
129133 }
130134 });
131- assertThat (latch .await (10000 , TimeUnit .MILLISECONDS )).isTrue ();
135+ assertThat (latch .await (30 , TimeUnit .SECONDS )).isTrue ();
132136 TcpNioClientConnectionFactory factory =
133137 new TcpNioClientConnectionFactory ("localhost" , serverSocket .get ().getLocalPort ());
134138 factory .setLookupHost (true );
@@ -174,10 +178,10 @@ public void testReadTimeout(TestInfo testInfo) throws Exception {
174178 done .await (10 , TimeUnit .SECONDS );
175179 }
176180 catch (Exception e ) {
177- e . printStackTrace ( );
181+ ReflectionUtils . rethrowRuntimeException ( e );
178182 }
179183 });
180- assertThat (latch .await (10000 , TimeUnit .MILLISECONDS )).isTrue ();
184+ assertThat (latch .await (30 , TimeUnit .SECONDS )).isTrue ();
181185 TcpNioClientConnectionFactory factory = new TcpNioClientConnectionFactory ("localhost" ,
182186 serverSocket .get ().getLocalPort ());
183187 factory .setApplicationEventPublisher (nullPublisher );
@@ -215,10 +219,10 @@ public void testMemoryLeak(TestInfo testInfo) throws Exception {
215219 readFully (socket .getInputStream (), b );
216220 }
217221 catch (Exception e ) {
218- e . printStackTrace ( );
222+ ReflectionUtils . rethrowRuntimeException ( e );
219223 }
220224 });
221- assertThat (latch .await (10000 , TimeUnit .MILLISECONDS )).isTrue ();
225+ assertThat (latch .await (30 , TimeUnit .SECONDS )).isTrue ();
222226 TcpNioClientConnectionFactory factory = new TcpNioClientConnectionFactory ("localhost" ,
223227 serverSocket .get ().getLocalPort ());
224228 factory .setApplicationEventPublisher (nullPublisher );
@@ -231,10 +235,9 @@ public void testMemoryLeak(TestInfo testInfo) throws Exception {
231235 connection .close ();
232236 assertThat (!connection .isOpen ()).isTrue ();
233237 TestUtils .getPropertyValue (factory , "selector" , Selector .class ).wakeup ();
234- await ().atMost (Duration .ofSeconds (10 )).until (() -> connections . size () == 0 );
238+ await ().atMost (Duration .ofSeconds (10 )).until (connections :: isEmpty );
235239 }
236240 catch (Exception e ) {
237- e .printStackTrace ();
238241 fail ("Unexpected exception " + e );
239242 }
240243 factory .stop ();
@@ -330,16 +333,12 @@ public void testInsufficientThreads() throws Exception {
330333 }
331334 return null ;
332335 });
333- try {
334- Object o = future .get (10 , TimeUnit .SECONDS );
335- fail ("Expected exception, got " + o );
336- }
337- catch (ExecutionException e ) {
338- assertThat (e .getCause ().getMessage ()).isEqualTo ("Timed out waiting for buffer space" );
339- }
340- finally {
341- exec .shutdownNow ();
342- }
336+
337+ assertThatExceptionOfType (ExecutionException .class )
338+ .isThrownBy (() -> future .get (10 , TimeUnit .SECONDS ))
339+ .withStackTraceContaining ("Timed out waiting for buffer space" );
340+
341+ exec .shutdownNow ();
343342 }
344343
345344 @ Test
@@ -374,7 +373,6 @@ public void testSufficientThreads() throws Exception {
374373 }
375374 }
376375 catch (Exception e ) {
377- e .printStackTrace ();
378376 throw (Exception ) e .getCause ();
379377 }
380378 return null ;
@@ -445,18 +443,13 @@ public void testByteArrayReadWithBadArgs() throws Exception {
445443 .getPropertyValue ("channelInputStream" );
446444 stream .write (ByteBuffer .wrap ("foo" .getBytes ()));
447445 byte [] out = new byte [5 ];
448- try {
449- stream .read (out , 1 , 5 );
450- fail ("Expected IndexOutOfBoundsException" );
451- }
452- catch (IndexOutOfBoundsException e ) {
453- }
454- try {
455- stream .read (null , 1 , 5 );
456- fail ("Expected IllegalArgumentException" );
457- }
458- catch (IllegalArgumentException e ) {
459- }
446+
447+ assertThatExceptionOfType (IndexOutOfBoundsException .class )
448+ .isThrownBy (() -> stream .read (out , 1 , 5 ));
449+
450+ assertThatIllegalArgumentException ()
451+ .isThrownBy (() -> stream .read (null , 1 , 5 ));
452+
460453 assertThat (stream .read (out , 0 , 0 )).isEqualTo (0 );
461454 assertThat (stream .read (out )).isEqualTo (3 );
462455 }
@@ -476,7 +469,7 @@ public void testByteArrayBlocksForZeroRead() throws Exception {
476469 stream .read (out );
477470 }
478471 catch (IOException e ) {
479- e . printStackTrace ( );
472+ throw new UncheckedIOException ( e );
480473 }
481474 latch .countDown ();
482475 });
@@ -514,16 +507,12 @@ public Integer answer(InvocationOnMock invocation) {
514507 SocketChannel outChannel = mock (SocketChannel .class );
515508 when (outChannel .socket ()).thenReturn (outSocket );
516509 TcpNioConnection outboundConnection = new TcpNioConnection (outChannel , true , false , nullPublisher , null );
517- doAnswer (new Answer <Object >() {
518-
519- @ Override
520- public Object answer (InvocationOnMock invocation ) throws Throwable {
521- ByteBuffer buff = invocation .getArgument (0 );
522- byte [] bytes = new byte [buff .limit ()];
523- buff .get (bytes );
524- written .write (bytes );
525- return null ;
526- }
510+ doAnswer (invocation -> {
511+ ByteBuffer buff = invocation .getArgument (0 );
512+ byte [] bytes = new byte [buff .limit ()];
513+ buff .get (bytes );
514+ written .write (bytes );
515+ return null ;
527516 }).when (outChannel ).write (any (ByteBuffer .class ));
528517
529518 MapMessageConverter outConverter = new MapMessageConverter ();
@@ -539,14 +528,10 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
539528
540529 final AtomicReference <Message <?>> inboundMessage = new AtomicReference <Message <?>>();
541530 final CountDownLatch latch = new CountDownLatch (1 );
542- TcpListener listener = new TcpListener () {
543-
544- @ Override
545- public boolean onMessage (Message <?> message ) {
546- inboundMessage .set (message );
547- latch .countDown ();
548- return false ;
549- }
531+ TcpListener listener = message1 -> {
532+ inboundMessage .set (message1 );
533+ latch .countDown ();
534+ return false ;
550535 };
551536 inboundConnection .registerListener (listener );
552537 inboundConnection .readPacket ();
@@ -565,19 +550,14 @@ public void testAssemblerUsesSecondaryExecutor() throws Exception {
565550
566551 factory .setSoTimeout (1000 );
567552 factory .setTaskExecutor (compositeExec );
568- final AtomicReference <String > threadName = new AtomicReference <String >();
553+ final AtomicReference <String > threadName = new AtomicReference <>();
569554 final CountDownLatch latch = new CountDownLatch (1 );
570- factory .registerListener (new TcpListener () {
571-
572- @ Override
573- public boolean onMessage (Message <?> message ) {
574- if (!(message instanceof ErrorMessage )) {
575- threadName .set (Thread .currentThread ().getName ());
576- latch .countDown ();
577- }
578- return false ;
555+ factory .registerListener (message -> {
556+ if (!(message instanceof ErrorMessage )) {
557+ threadName .set (Thread .currentThread ().getName ());
558+ latch .countDown ();
579559 }
580-
560+ return false ;
581561 });
582562 factory .start ();
583563 TestingUtilities .waitListening (factory , null );
@@ -598,7 +578,7 @@ public boolean onMessage(Message<?> message) {
598578 socket .getOutputStream ().write ("foo\r \n " .getBytes ());
599579 socket .close ();
600580
601- assertThat (latch .await (10 , TimeUnit .SECONDS )).isTrue ();
581+ assertThat (latch .await (30 , TimeUnit .SECONDS )).isTrue ();
602582 assertThat (threadName .get ()).contains ("assembler" );
603583
604584 factory .stop ();
@@ -720,17 +700,12 @@ public void publishEvent(Object event) {
720700 });
721701 final CountDownLatch assemblerLatch = new CountDownLatch (1 );
722702 final AtomicReference <Thread > assembler = new AtomicReference <Thread >();
723- factory .registerListener (new TcpListener () {
724-
725- @ Override
726- public boolean onMessage (Message <?> message ) {
727- if (!(message instanceof ErrorMessage )) {
728- assembler .set (Thread .currentThread ());
729- assemblerLatch .countDown ();
730- }
731- return false ;
703+ factory .registerListener (message -> {
704+ if (!(message instanceof ErrorMessage )) {
705+ assembler .set (Thread .currentThread ());
706+ assemblerLatch .countDown ();
732707 }
733-
708+ return false ;
734709 });
735710 ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor ();
736711 te .setCorePoolSize (3 ); // selector, reader, assembler
@@ -756,38 +731,26 @@ public boolean onMessage(Message<?> message) {
756731
757732 final CountDownLatch readerLatch = new CountDownLatch (4 ); // 3 dataAvailable, 1 continuing
758733 final CountDownLatch readerFinishedLatch = new CountDownLatch (1 );
759- doAnswer (new Answer <Void >() {
760-
761- @ Override
762- public Void answer (InvocationOnMock invocation ) throws Throwable {
763- invocation .callRealMethod ();
764- // delay the reader thread resetting writingToPipe
765- readerLatch .await (10 , TimeUnit .SECONDS );
766- Thread .sleep (100 );
767- readerFinishedLatch .countDown ();
768- return null ;
769- }
734+ doAnswer (invocation -> {
735+ invocation .callRealMethod ();
736+ // delay the reader thread resetting writingToPipe
737+ readerLatch .await (10 , TimeUnit .SECONDS );
738+ Thread .sleep (100 );
739+ readerFinishedLatch .countDown ();
740+ return null ;
770741 }).when (cis ).write (any (ByteBuffer .class ));
771742
772743 doReturn (true ).when (logger ).isTraceEnabled ();
773- doAnswer (new Answer <Void >() {
774-
775- @ Override
776- public Void answer (InvocationOnMock invocation ) throws Throwable {
777- invocation .callRealMethod ();
778- readerLatch .countDown ();
779- return null ;
780- }
744+ doAnswer (invocation -> {
745+ invocation .callRealMethod ();
746+ readerLatch .countDown ();
747+ return null ;
781748 }).when (logger ).trace (contains ("checking data avail" ));
782749
783- doAnswer (new Answer <Void >() {
784-
785- @ Override
786- public Void answer (InvocationOnMock invocation ) throws Throwable {
787- invocation .callRealMethod ();
788- readerLatch .countDown ();
789- return null ;
790- }
750+ doAnswer (invocation -> {
751+ invocation .callRealMethod ();
752+ readerLatch .countDown ();
753+ return null ;
791754 }).when (logger ).trace (contains ("Nio assembler continuing" ));
792755
793756 socket .getOutputStream ().write ("foo\r \n " .getBytes ());
0 commit comments