Skip to content

Commit 170292a

Browse files
committed
Shutdown executors in TCP/IP tests
1 parent 2736de9 commit 170292a

15 files changed

+298
-219
lines changed

spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpInboundGatewayTests.java

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,7 +28,6 @@
2828
import java.util.HashSet;
2929
import java.util.Set;
3030
import java.util.concurrent.CountDownLatch;
31-
import java.util.concurrent.Executors;
3231
import java.util.concurrent.TimeUnit;
3332
import java.util.concurrent.atomic.AtomicBoolean;
3433
import java.util.concurrent.atomic.AtomicInteger;
@@ -39,6 +38,7 @@
3938
import org.junit.Test;
4039

4140
import org.springframework.beans.factory.BeanFactory;
41+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
4242
import org.springframework.integration.channel.DirectChannel;
4343
import org.springframework.integration.channel.QueueChannel;
4444
import org.springframework.integration.handler.ServiceActivatingHandler;
@@ -56,6 +56,8 @@
5656

5757
/**
5858
* @author Gary Russell
59+
* @author Artem Bilan
60+
*
5961
* @since 2.0
6062
*/
6163
public class TcpInboundGatewayTests {
@@ -119,30 +121,31 @@ public void testNetClientMode() throws Exception {
119121
final CountDownLatch latch2 = new CountDownLatch(1);
120122
final CountDownLatch latch3 = new CountDownLatch(1);
121123
final AtomicBoolean done = new AtomicBoolean();
122-
Executors.newSingleThreadExecutor().execute(() -> {
123-
try {
124-
ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(0, 10);
125-
port.set(server.getLocalPort());
126-
latch1.countDown();
127-
Socket socket = server.accept();
128-
socket.getOutputStream().write("Test1\r\nTest2\r\n".getBytes());
129-
byte[] bytes = new byte[12];
130-
readFully(socket.getInputStream(), bytes);
131-
assertEquals("Echo:Test1\r\n", new String(bytes));
132-
readFully(socket.getInputStream(), bytes);
133-
assertEquals("Echo:Test2\r\n", new String(bytes));
134-
latch2.await();
135-
socket.close();
136-
server.close();
137-
done.set(true);
138-
latch3.countDown();
139-
}
140-
catch (Exception e) {
141-
if (!done.get()) {
142-
e.printStackTrace();
143-
}
144-
}
145-
});
124+
new SimpleAsyncTaskExecutor()
125+
.execute(() -> {
126+
try {
127+
ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(0, 10);
128+
port.set(server.getLocalPort());
129+
latch1.countDown();
130+
Socket socket = server.accept();
131+
socket.getOutputStream().write("Test1\r\nTest2\r\n".getBytes());
132+
byte[] bytes = new byte[12];
133+
readFully(socket.getInputStream(), bytes);
134+
assertEquals("Echo:Test1\r\n", new String(bytes));
135+
readFully(socket.getInputStream(), bytes);
136+
assertEquals("Echo:Test2\r\n", new String(bytes));
137+
latch2.await();
138+
socket.close();
139+
server.close();
140+
done.set(true);
141+
latch3.countDown();
142+
}
143+
catch (Exception e) {
144+
if (!done.get()) {
145+
e.printStackTrace();
146+
}
147+
}
148+
});
146149
assertTrue(latch1.await(10, TimeUnit.SECONDS));
147150
AbstractClientConnectionFactory ccf = new TcpNetClientConnectionFactory("localhost", port.get());
148151
ccf.setSingleUse(false);

spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpOutboundGatewayTests.java

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import java.util.Set;
4444
import java.util.concurrent.CountDownLatch;
4545
import java.util.concurrent.ExecutionException;
46-
import java.util.concurrent.Executors;
4746
import java.util.concurrent.Future;
4847
import java.util.concurrent.TimeUnit;
4948
import java.util.concurrent.atomic.AtomicBoolean;
@@ -61,6 +60,8 @@
6160
import org.springframework.beans.factory.BeanFactory;
6261
import org.springframework.core.serializer.DefaultDeserializer;
6362
import org.springframework.core.serializer.DefaultSerializer;
63+
import org.springframework.core.task.AsyncTaskExecutor;
64+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
6465
import org.springframework.expression.EvaluationContext;
6566
import org.springframework.expression.Expression;
6667
import org.springframework.expression.spel.standard.SpelExpressionParser;
@@ -90,6 +91,8 @@ public class TcpOutboundGatewayTests {
9091

9192
private static final Log logger = LogFactory.getLog(TcpOutboundGatewayTests.class);
9293

94+
private AsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
95+
9396
@ClassRule
9497
public static LongRunningIntegrationTest longTests = new LongRunningIntegrationTest();
9598

@@ -101,13 +104,13 @@ public class TcpOutboundGatewayTests {
101104
public void testGoodNetSingle() throws Exception {
102105
final CountDownLatch latch = new CountDownLatch(1);
103106
final AtomicBoolean done = new AtomicBoolean();
104-
final AtomicReference<ServerSocket> serverSocket = new AtomicReference<ServerSocket>();
105-
Executors.newSingleThreadExecutor().execute(() -> {
107+
final AtomicReference<ServerSocket> serverSocket = new AtomicReference<>();
108+
this.executor.execute(() -> {
106109
try {
107110
ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(0, 100);
108111
serverSocket.set(server);
109112
latch.countDown();
110-
List<Socket> sockets = new ArrayList<Socket>();
113+
List<Socket> sockets = new ArrayList<>();
111114
int i = 0;
112115
while (true) {
113116
Socket socket = server.accept();
@@ -158,15 +161,16 @@ public void testGoodNetSingle() throws Exception {
158161
assertTrue(replies.remove("Reply" + i));
159162
}
160163
done.set(true);
164+
ccf.stop();
161165
serverSocket.get().close();
162166
}
163167

164168
@Test
165169
public void testGoodNetMultiplex() throws Exception {
166170
final CountDownLatch latch = new CountDownLatch(1);
167171
final AtomicBoolean done = new AtomicBoolean();
168-
final AtomicReference<ServerSocket> serverSocket = new AtomicReference<ServerSocket>();
169-
Executors.newSingleThreadExecutor().execute(() -> {
172+
final AtomicReference<ServerSocket> serverSocket = new AtomicReference<>();
173+
this.executor.execute(() -> {
170174
try {
171175
ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(0, 10);
172176
serverSocket.set(server);
@@ -213,15 +217,16 @@ public void testGoodNetMultiplex() throws Exception {
213217
}
214218
done.set(true);
215219
gateway.stop();
220+
ccf.stop();
216221
serverSocket.get().close();
217222
}
218223

219224
@Test
220225
public void testGoodNetTimeout() throws Exception {
221226
final CountDownLatch latch = new CountDownLatch(1);
222227
final AtomicBoolean done = new AtomicBoolean();
223-
final AtomicReference<ServerSocket> serverSocket = new AtomicReference<ServerSocket>();
224-
Executors.newSingleThreadExecutor().execute(() -> {
228+
final AtomicReference<ServerSocket> serverSocket = new AtomicReference<>();
229+
this.executor.execute(() -> {
225230
try {
226231
ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(0);
227232
serverSocket.set(server);
@@ -260,12 +265,12 @@ public void testGoodNetTimeout() throws Exception {
260265
Future<Integer>[] results = (Future<Integer>[]) new Future<?>[2];
261266
for (int i = 0; i < 2; i++) {
262267
final int j = i;
263-
results[j] = (Executors.newSingleThreadExecutor().submit(() -> {
268+
results[j] = (this.executor.submit(() -> {
264269
gateway.handleMessage(MessageBuilder.withPayload("Test" + j).build());
265270
return 0;
266271
}));
267272
}
268-
Set<String> replies = new HashSet<String>();
273+
Set<String> replies = new HashSet<>();
269274
int timeouts = 0;
270275
for (int i = 0; i < 2; i++) {
271276
try {
@@ -294,6 +299,7 @@ public void testGoodNetTimeout() throws Exception {
294299
}
295300
done.set(true);
296301
gateway.stop();
302+
ccf.stop();
297303
serverSocket.get().close();
298304
}
299305

@@ -344,7 +350,7 @@ private void testGoodNetGWTimeoutGuts(final int port, AbstractClientConnectionFa
344350
final AtomicReference<String> lastReceived = new AtomicReference<String>();
345351
final CountDownLatch serverLatch = new CountDownLatch(2);
346352

347-
Executors.newSingleThreadExecutor().execute(() -> {
353+
this.executor.execute(() -> {
348354
try {
349355
latch.countDown();
350356
int i = 0;
@@ -398,7 +404,7 @@ private void testGoodNetGWTimeoutGuts(final int port, AbstractClientConnectionFa
398404

399405
for (int i = 0; i < 2; i++) {
400406
final int j = i;
401-
results[j] = (Executors.newSingleThreadExecutor().submit(() -> {
407+
results[j] = (this.executor.submit(() -> {
402408
gateway.handleMessage(MessageBuilder.withPayload("Test" + j).build());
403409
return j;
404410
}));
@@ -433,6 +439,7 @@ private void testGoodNetGWTimeoutGuts(final int port, AbstractClientConnectionFa
433439
done.set(true);
434440
assertEquals(0, TestUtils.getPropertyValue(gateway, "pendingReplies", Map.class).size());
435441
gateway.stop();
442+
ccf.stop();
436443
}
437444

438445
@Test
@@ -442,7 +449,7 @@ public void testCachingFailover() throws Exception {
442449
final AtomicBoolean done = new AtomicBoolean();
443450
final CountDownLatch serverLatch = new CountDownLatch(1);
444451

445-
Executors.newSingleThreadExecutor().execute(() -> {
452+
this.executor.execute(() -> {
446453
try {
447454
ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(0);
448455
serverSocket.set(server);
@@ -512,17 +519,18 @@ public void testCachingFailover() throws Exception {
512519
done.set(true);
513520
gateway.stop();
514521
verify(mockConn1).send(Mockito.any(Message.class));
522+
factory2.stop();
515523
serverSocket.get().close();
516524
}
517525

518526
@Test
519527
public void testFailoverCached() throws Exception {
520-
final AtomicReference<ServerSocket> serverSocket = new AtomicReference<ServerSocket>();
528+
final AtomicReference<ServerSocket> serverSocket = new AtomicReference<>();
521529
final CountDownLatch latch = new CountDownLatch(1);
522530
final AtomicBoolean done = new AtomicBoolean();
523531
final CountDownLatch serverLatch = new CountDownLatch(1);
524532

525-
Executors.newSingleThreadExecutor().execute(() -> {
533+
this.executor.execute(() -> {
526534
try {
527535
ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(0);
528536
serverSocket.set(server);
@@ -595,6 +603,7 @@ public void testFailoverCached() throws Exception {
595603
done.set(true);
596604
gateway.stop();
597605
verify(mockConn1).send(Mockito.any(Message.class));
606+
factory2.stop();
598607
serverSocket.get().close();
599608
}
600609

@@ -667,11 +676,11 @@ private void testGWPropagatesSocketCloseGuts(final int port, AbstractClientConne
667676
final ServerSocket server) throws Exception {
668677
final CountDownLatch latch = new CountDownLatch(1);
669678
final AtomicBoolean done = new AtomicBoolean();
670-
final AtomicReference<String> lastReceived = new AtomicReference<String>();
679+
final AtomicReference<String> lastReceived = new AtomicReference<>();
671680
final CountDownLatch serverLatch = new CountDownLatch(1);
672681

673-
Executors.newSingleThreadExecutor().execute(() -> {
674-
List<Socket> sockets = new ArrayList<Socket>();
682+
this.executor.execute(() -> {
683+
List<Socket> sockets = new ArrayList<>();
675684
try {
676685
latch.countDown();
677686
while (!done.get()) {
@@ -793,8 +802,8 @@ private void testGWPropagatesSocketTimeoutGuts(final int port, AbstractClientCon
793802
final CountDownLatch latch = new CountDownLatch(1);
794803
final AtomicBoolean done = new AtomicBoolean();
795804

796-
Executors.newSingleThreadExecutor().execute(() -> {
797-
List<Socket> sockets = new ArrayList<Socket>();
805+
this.executor.execute(() -> {
806+
List<Socket> sockets = new ArrayList<>();
798807
try {
799808
latch.countDown();
800809
while (!done.get()) {

spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpReceivingChannelAdapterTests.java

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2015 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -33,7 +33,6 @@
3333
import java.util.Set;
3434
import java.util.concurrent.CountDownLatch;
3535
import java.util.concurrent.Executor;
36-
import java.util.concurrent.Executors;
3736
import java.util.concurrent.TimeUnit;
3837
import java.util.concurrent.atomic.AtomicBoolean;
3938
import java.util.concurrent.atomic.AtomicReference;
@@ -46,6 +45,7 @@
4645
import org.springframework.beans.factory.BeanFactory;
4746
import org.springframework.core.serializer.DefaultDeserializer;
4847
import org.springframework.core.serializer.DefaultSerializer;
48+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
4949
import org.springframework.integration.channel.DirectChannel;
5050
import org.springframework.integration.channel.QueueChannel;
5151
import org.springframework.integration.handler.ServiceActivatingHandler;
@@ -64,6 +64,7 @@
6464

6565
/**
6666
* @author Gary Russell
67+
* @author Artem Bilan
6768
*/
6869
public class TcpReceivingChannelAdapterTests extends AbstractTcpChannelAdapterTests {
6970

@@ -97,27 +98,24 @@ public void testNet() throws Exception {
9798

9899
@Test
99100
public void testNetClientMode() throws Exception {
100-
final AtomicReference<ServerSocket> serverSocket = new AtomicReference<ServerSocket>();
101+
final AtomicReference<ServerSocket> serverSocket = new AtomicReference<>();
101102
final CountDownLatch latch1 = new CountDownLatch(1);
102103
final CountDownLatch latch2 = new CountDownLatch(1);
103104
final AtomicBoolean done = new AtomicBoolean();
104-
Executors.newSingleThreadExecutor().execute(new Runnable() {
105-
@Override
106-
public void run() {
107-
try {
108-
ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(0, 10);
109-
serverSocket.set(server);
110-
latch1.countDown();
111-
Socket socket = server.accept();
112-
socket.getOutputStream().write("Test1\r\nTest2\r\n".getBytes());
113-
latch2.await();
114-
socket.close();
115-
server.close();
116-
}
117-
catch (Exception e) {
118-
if (!done.get()) {
119-
e.printStackTrace();
120-
}
105+
new SimpleAsyncTaskExecutor().execute(() -> {
106+
try {
107+
ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(0, 10);
108+
serverSocket.set(server);
109+
latch1.countDown();
110+
Socket socket = server.accept();
111+
socket.getOutputStream().write("Test1\r\nTest2\r\n".getBytes());
112+
latch2.await();
113+
socket.close();
114+
server.close();
115+
}
116+
catch (Exception e) {
117+
if (!done.get()) {
118+
e.printStackTrace();
121119
}
122120
}
123121
});
@@ -416,7 +414,7 @@ public void testNioSingleSharedMany() throws Exception {
416414
handler.setConnectionFactory(scf);
417415
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
418416
adapter.setConnectionFactory(scf);
419-
Executor te = Executors.newCachedThreadPool();
417+
Executor te = new SimpleAsyncTaskExecutor();
420418
scf.setTaskExecutor(te);
421419
scf.start();
422420
QueueChannel channel = new QueueChannel();
@@ -650,6 +648,7 @@ public void testException() throws Exception {
650648
}
651649

652650
private class FailingService {
651+
653652
@SuppressWarnings("unused")
654653
public String serviceMethod(byte[] bytes) {
655654
throw new RuntimeException("Failed");

0 commit comments

Comments
 (0)