11/*
2- * Copyright 2002-2016 the original author or authors.
2+ * Copyright 2002-2017 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
3434
3535import javax .net .SocketFactory ;
3636
37+ import org .junit .Rule ;
3738import org .junit .Test ;
3839
3940import org .springframework .context .ApplicationEventPublisher ;
4344import org .springframework .integration .ip .tcp .serializer .ByteArrayStxEtxSerializer ;
4445import org .springframework .integration .ip .util .SocketTestUtils ;
4546import org .springframework .integration .ip .util .TestingUtilities ;
47+ import org .springframework .integration .test .support .LongRunningIntegrationTest ;
4648import org .springframework .messaging .Message ;
4749import org .springframework .messaging .support .ErrorMessage ;
4850
4951/**
5052 * @author Gary Russell
5153 * @author Artem Bilan
54+ *
5255 * @since 2.0
5356 */
5457public class TcpNioConnectionReadTests {
5558
59+ @ Rule
60+ public LongRunningIntegrationTest longRunningIntegrationTest = new LongRunningIntegrationTest ();
61+
5662 private final CountDownLatch latch = new CountDownLatch (1 );
5763
5864 private AbstractServerConnectionFactory getConnectionFactory (
@@ -81,15 +87,10 @@ public void testReadLength() throws Exception {
8187 ByteArrayLengthHeaderSerializer serializer = new ByteArrayLengthHeaderSerializer ();
8288 final List <Message <?>> responses = new ArrayList <Message <?>>();
8389 final Semaphore semaphore = new Semaphore (0 );
84- AbstractServerConnectionFactory scf = getConnectionFactory (serializer , new TcpListener () {
85-
86- @ Override
87- public boolean onMessage (Message <?> message ) {
88- responses .add (message );
89- semaphore .release ();
90- return false ;
91- }
92-
90+ AbstractServerConnectionFactory scf = getConnectionFactory (serializer , message -> {
91+ responses .add (message );
92+ semaphore .release ();
93+ return false ;
9394 });
9495
9596 // Fire up the sender.
@@ -102,34 +103,28 @@ public boolean onMessage(Message<?> message) {
102103 assertEquals ("Data" , SocketTestUtils .TEST_STRING + SocketTestUtils .TEST_STRING ,
103104 new String ((byte []) responses .get (0 ).getPayload ()));
104105 assertEquals ("Data" , SocketTestUtils .TEST_STRING + SocketTestUtils .TEST_STRING ,
105- new String ((byte []) responses .get (1 ).getPayload ()));
106+ new String ((byte []) responses .get (1 ).getPayload ()));
106107 scf .stop ();
107108 done .countDown ();
108109 }
109110
110111
111-
112112 @ SuppressWarnings ("unchecked" )
113113 @ Test
114114 public void testFragmented () throws Exception {
115115 ByteArrayLengthHeaderSerializer serializer = new ByteArrayLengthHeaderSerializer ();
116116 final List <Message <?>> responses = new ArrayList <Message <?>>();
117117 final Semaphore semaphore = new Semaphore (0 );
118- AbstractServerConnectionFactory scf = getConnectionFactory (serializer , new TcpListener () {
119-
120- @ Override
121- public boolean onMessage (Message <?> message ) {
122- responses .add (message );
123- try {
124- Thread .sleep (1000 );
125- }
126- catch (InterruptedException e ) {
127- Thread .currentThread ().interrupt ();
128- }
129- semaphore .release ();
130- return false ;
118+ AbstractServerConnectionFactory scf = getConnectionFactory (serializer , message -> {
119+ responses .add (message );
120+ try {
121+ Thread .sleep (10 );
131122 }
132-
123+ catch (InterruptedException e ) {
124+ Thread .currentThread ().interrupt ();
125+ }
126+ semaphore .release ();
127+ return false ;
133128 });
134129
135130 int howMany = 2 ;
@@ -140,7 +135,7 @@ public boolean onMessage(Message<?> message) {
140135 assertEquals ("Expected" , howMany , responses .size ());
141136 for (int i = 0 ; i < howMany ; i ++) {
142137 assertEquals ("Data" , "xx" ,
143- new String (((Message <byte []>) responses .get (0 )).getPayload ()));
138+ new String (((Message <byte []>) responses .get (0 )).getPayload ()));
144139 }
145140 scf .stop ();
146141 done .countDown ();
@@ -152,15 +147,10 @@ public void testReadStxEtx() throws Exception {
152147 ByteArrayStxEtxSerializer serializer = new ByteArrayStxEtxSerializer ();
153148 final List <Message <?>> responses = new ArrayList <Message <?>>();
154149 final Semaphore semaphore = new Semaphore (0 );
155- AbstractServerConnectionFactory scf = getConnectionFactory (serializer , new TcpListener () {
156-
157- @ Override
158- public boolean onMessage (Message <?> message ) {
159- responses .add (message );
160- semaphore .release ();
161- return false ;
162- }
163-
150+ AbstractServerConnectionFactory scf = getConnectionFactory (serializer , message -> {
151+ responses .add (message );
152+ semaphore .release ();
153+ return false ;
164154 });
165155
166156 // Fire up the sender.
@@ -171,9 +161,9 @@ public boolean onMessage(Message<?> message) {
171161 assertTrue (semaphore .tryAcquire (1 , 10000 , TimeUnit .MILLISECONDS ));
172162 assertEquals ("Did not receive data" , 2 , responses .size ());
173163 assertEquals ("Data" , SocketTestUtils .TEST_STRING + SocketTestUtils .TEST_STRING ,
174- new String (((Message <byte []>) responses .get (0 )).getPayload ()));
164+ new String (((Message <byte []>) responses .get (0 )).getPayload ()));
175165 assertEquals ("Data" , SocketTestUtils .TEST_STRING + SocketTestUtils .TEST_STRING ,
176- new String (((Message <byte []>) responses .get (1 )).getPayload ()));
166+ new String (((Message <byte []>) responses .get (1 )).getPayload ()));
177167 scf .stop ();
178168 done .countDown ();
179169 }
@@ -184,15 +174,10 @@ public void testReadCrLf() throws Exception {
184174 ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer ();
185175 final List <Message <?>> responses = new ArrayList <Message <?>>();
186176 final Semaphore semaphore = new Semaphore (0 );
187- AbstractServerConnectionFactory scf = getConnectionFactory (serializer , new TcpListener () {
188-
189- @ Override
190- public boolean onMessage (Message <?> message ) {
191- responses .add (message );
192- semaphore .release ();
193- return false ;
194- }
195-
177+ AbstractServerConnectionFactory scf = getConnectionFactory (serializer , message -> {
178+ responses .add (message );
179+ semaphore .release ();
180+ return false ;
196181 });
197182
198183 // Fire up the sender.
@@ -203,9 +188,9 @@ public boolean onMessage(Message<?> message) {
203188 assertTrue (semaphore .tryAcquire (1 , 10000 , TimeUnit .MILLISECONDS ));
204189 assertEquals ("Did not receive data" , 2 , responses .size ());
205190 assertEquals ("Data" , SocketTestUtils .TEST_STRING + SocketTestUtils .TEST_STRING ,
206- new String (((Message <byte []>) responses .get (0 )).getPayload ()));
191+ new String (((Message <byte []>) responses .get (0 )).getPayload ()));
207192 assertEquals ("Data" , SocketTestUtils .TEST_STRING + SocketTestUtils .TEST_STRING ,
208- new String (((Message <byte []>) responses .get (1 )).getPayload ()));
193+ new String (((Message <byte []>) responses .get (1 )).getPayload ()));
209194 scf .stop ();
210195 done .countDown ();
211196 }
@@ -251,7 +236,8 @@ public void removeDeadConnection(TcpConnection connection) {
251236 assertTrue (errorMessageLetch .await (10 , TimeUnit .SECONDS ));
252237
253238 assertThat (errorMessageRef .get ().getMessage (),
254- containsString ("Message length 2147483647 exceeds max message length: 2048" ));
239+ anyOf (containsString ("Message length 2147483647 exceeds max message length: 2048" ),
240+ containsString ("Connection is closed" )));
255241
256242 assertTrue (semaphore .tryAcquire (10000 , TimeUnit .MILLISECONDS ));
257243 assertTrue (removed .size () > 0 );
@@ -510,11 +496,13 @@ private void testClosureMidMessageGuts(AbstractByteArraySerializer serializer, S
510496 }
511497 return false ;
512498 }, new TcpSender () {
499+
513500 @ Override
514501 public void addNewConnection (TcpConnection connection ) {
515502 added .add (connection );
516503 semaphore .release ();
517504 }
505+
518506 @ Override
519507 public void removeDeadConnection (TcpConnection connection ) {
520508 removed .add (connection );
0 commit comments