Skip to content

Commit faa3eb4

Browse files
authored
GH-10314: Revise TcpListener.onMessage() contract for void (#10315)
Fixes: #10314 Looks like a `boolean` return for the `TcpListener.onMessage()` is a leftover of some earlier design or some idea which didn't make it into the project. On the other hand, all the logic in the project around this `onMessage()` usage is properly handled by the delegation or exceptions. * Change `TcpListener.onMessage()` to have a `void` as return type * Fix all the respective usages and simplify some implementations with removing those bogus `return false;` and using proper `if..else` logic if necessary * Fix some code style in the affected classes, like proper `assertThat` or diamond operators
1 parent 4900ac7 commit faa3eb4

22 files changed

+116
-168
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpInboundGateway.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,29 +84,28 @@ public class TcpInboundGateway extends MessagingGatewaySupport implements
8484
private volatile boolean shuttingDown;
8585

8686
@Override
87-
public boolean onMessage(Message<?> message) {
87+
public void onMessage(Message<?> message) {
8888
boolean isErrorMessage = message instanceof ErrorMessage;
8989
try {
9090
if (this.shuttingDown) {
91-
logger.info(() -> "Inbound message ignored; shutting down; " + message.toString());
91+
logger.info(() -> "Inbound message ignored; shutting down; " + message);
9292
}
9393
else {
9494
if (isErrorMessage) {
9595
/*
9696
* Socket errors are sent here, so they can be conveyed to any waiting thread.
9797
* There's not one here; simply ignore.
9898
*/
99-
return false;
99+
return;
100100
}
101101
this.activeCount.incrementAndGet();
102102
try {
103-
return doOnMessage(message);
103+
doOnMessage(message);
104104
}
105105
finally {
106106
this.activeCount.decrementAndGet();
107107
}
108108
}
109-
return false;
110109
}
111110
finally {
112111
String connectionId = (String) message.getHeaders().get(IpHeaders.CONNECTION_ID);
@@ -121,19 +120,19 @@ else if (this.clientConnectionFactory != null) {
121120
}
122121
}
123122

124-
private boolean doOnMessage(Message<?> message) {
123+
private void doOnMessage(Message<?> message) {
125124
Message<?> reply = sendAndReceiveMessage(message);
126125
if (reply == null) {
127126
logger.debug(() -> "null reply received for " + message + " nothing to send");
128-
return false;
127+
return;
129128
}
130129
String connectionId = (String) message.getHeaders().get(IpHeaders.CONNECTION_ID);
131130
if (connectionId != null) {
132131
TcpConnection connection = this.connections.get(connectionId);
133132
if (connection == null) {
134133
publishNoConnectionEvent(message, connectionId);
135134
logger.error(() -> "Connection not found when processing reply " + reply + " for " + message);
136-
return false;
135+
return;
137136
}
138137
try {
139138
connection.send(reply);
@@ -142,7 +141,6 @@ private boolean doOnMessage(Message<?> message) {
142141
logger.error(ex, "Failed to send reply");
143142
}
144143
}
145-
return false;
146144
}
147145

148146
@SuppressWarnings("NullAway") // Dataflow analysis limitation

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpOutboundGateway.java

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -323,35 +323,32 @@ private void cleanUp(boolean haveSemaphore, @Nullable TcpConnection connection,
323323
}
324324

325325
@Override
326-
public boolean onMessage(Message<?> message) {
326+
public void onMessage(Message<?> message) {
327327
String connectionId = message.getHeaders().get(IpHeaders.CONNECTION_ID, String.class);
328328
if (connectionId == null) {
329329
if (unsolicitedSupported(message)) {
330-
return false;
330+
return;
331331
}
332332
logger.error("Cannot correlate response - no connection id");
333333
publishNoConnectionEvent(message, null, "Cannot correlate response - no connection id");
334-
return false;
334+
return;
335335
}
336336
logger.trace(() -> "onMessage: " + connectionId + "(" + message + ")");
337337
AsyncReply reply = this.pendingReplies.get(connectionId);
338338
if (reply == null) {
339-
if (message instanceof ErrorMessage) {
340-
/*
341-
* Socket errors are sent here, so they can be conveyed to any waiting thread.
342-
* If there's not one, simply ignore.
343-
*/
344-
return false;
345-
}
346-
else {
339+
/*
340+
* Socket errors are sent here, so they can be conveyed to any waiting thread.
341+
* If there's not one, simply ignore.
342+
*/
343+
if (!(message instanceof ErrorMessage)) {
347344
if (unsolicitedSupported(message)) {
348-
return false;
345+
return;
349346
}
350347
String errorMessage = "Cannot correlate response - no pending reply for " + connectionId;
351348
logger.error(errorMessage);
352349
publishNoConnectionEvent(message, connectionId, errorMessage);
353-
return false;
354350
}
351+
return;
355352
}
356353
if (isAsync()) {
357354
reply.getFuture().complete(message);
@@ -360,7 +357,6 @@ public boolean onMessage(Message<?> message) {
360357
else {
361358
reply.setReply(message);
362359
}
363-
return false;
364360
}
365361

366362
private boolean unsolicitedSupported(Message<?> message) {
@@ -489,7 +485,8 @@ boolean isHaveSemaphore() {
489485
* Sender blocks here until the reply is received, or we time out.
490486
* @return The return message or null if we time out
491487
*/
492-
@Nullable Message<?> getReply() {
488+
@Nullable
489+
Message<?> getReply() {
493490
try {
494491
if (!this.latch.await(this.remoteTimeout, TimeUnit.MILLISECONDS)) {
495492
return null;

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpReceivingChannelAdapter.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -70,20 +70,17 @@ public class TcpReceivingChannelAdapter
7070
private final AtomicInteger activeCount = new AtomicInteger();
7171

7272
@Override
73-
public boolean onMessage(Message<?> message) {
73+
public void onMessage(Message<?> message) {
7474
boolean isErrorMessage = message instanceof ErrorMessage;
7575
try {
7676
if (this.shuttingDown) {
7777
logger.info(() -> "Inbound message ignored; shutting down; " + message);
7878
}
79-
else {
80-
if (isErrorMessage) {
81-
/*
82-
* Socket errors are sent here so they can be conveyed to any waiting thread.
83-
* There's not one here; simply ignore.
84-
*/
85-
return false;
86-
}
79+
/*
80+
* Socket errors are sent here so they can be conveyed to any waiting thread.
81+
* There's not one here; simply ignore.
82+
*/
83+
else if (!isErrorMessage) {
8784
this.activeCount.incrementAndGet();
8885
try {
8986
sendMessage(message);
@@ -92,7 +89,6 @@ public boolean onMessage(Message<?> message) {
9289
this.activeCount.decrementAndGet();
9390
}
9491
}
95-
return false;
9692
}
9793
finally {
9894
String connectionId = (String) message.getHeaders().get(IpHeaders.CONNECTION_ID);

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ public String toString() {
462462
* purposes.
463463
*/
464464
@Override
465-
public boolean onMessage(Message<?> message) {
465+
public void onMessage(Message<?> message) {
466466
Message<?> modifiedMessage;
467467
Object connectionId = message.getHeaders().get(IpHeaders.CONNECTION_ID);
468468
if (message instanceof ErrorMessage) {
@@ -492,7 +492,6 @@ public boolean onMessage(Message<?> message) {
492492
logger.debug("Message discarded; no listener: " + message);
493493
}
494494
}
495-
return true;
496495
}
497496

498497
private void physicallyClose() {

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactory.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,7 @@ public void setSerializer(Serializer<?> serializer) {
461461
* purposes.
462462
*/
463463
@Override
464-
public boolean onMessage(Message<?> message) {
464+
public void onMessage(Message<?> message) {
465465
if (this.delegate.getConnectionId().equals(message.getHeaders().get(IpHeaders.CONNECTION_ID))) {
466466
AbstractIntegrationMessageBuilder<?> messageBuilder =
467467
getMessageBuilderFactory()
@@ -476,17 +476,15 @@ public boolean onMessage(Message<?> message) {
476476
if (this.logger.isDebugEnabled()) {
477477
logger.debug("No listener for " + message);
478478
}
479-
return false;
480479
}
481480
else {
482-
return listener.onMessage(messageBuilder.build());
481+
listener.onMessage(messageBuilder.build());
483482
}
484483
}
485484
else {
486485
if (logger.isDebugEnabled()) {
487486
logger.debug("Message from defunct connection ignored " + message);
488487
}
489-
return false;
490488
}
491489
}
492490

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -194,16 +194,14 @@ public boolean isServer() {
194194
}
195195

196196
@Override
197-
public boolean onMessage(Message<?> message) {
197+
public void onMessage(Message<?> message) {
198198
if (this.tcpListener == null) {
199-
if (message instanceof ErrorMessage) {
200-
return false;
201-
}
202-
else {
199+
if (!(message instanceof ErrorMessage)) {
203200
throw new NoListenerException("No listener registered for message reception");
204201
}
202+
return;
205203
}
206-
return this.tcpListener.onMessage(message);
204+
this.tcpListener.onMessage(message);
207205
}
208206

209207
@Override
@@ -250,12 +248,16 @@ public void removeDeadConnection(TcpConnection connection) {
250248
return;
251249
}
252250
this.removed = true;
253-
if (this.theConnection instanceof TcpConnectionInterceptorSupport tcpConnectionInterceptorSupport && !this.theConnection.equals(this)) {
251+
if (this.theConnection instanceof TcpConnectionInterceptorSupport tcpConnectionInterceptorSupport
252+
&& !this.theConnection.equals(this)) {
253+
254254
tcpConnectionInterceptorSupport.removeDeadConnection(this);
255255
}
256256
TcpSender sender = getSender();
257-
if (sender != null && this.interceptedSenders != null && !(sender instanceof TcpConnectionInterceptorSupport)) {
258-
this.interceptedSenders.forEach(snder -> snder.removeDeadConnection(connection));
257+
if (sender != null && this.interceptedSenders != null
258+
&& !(sender instanceof TcpConnectionInterceptorSupport)) {
259+
260+
this.interceptedSenders.forEach(intercepted -> intercepted.removeDeadConnection(connection));
259261
}
260262
}
261263
finally {

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
* {@link TcpConnection}.
2525
*
2626
* @author Gary Russell
27+
* @author Artem Bilan
2728
*
2829
* @since 2.0
2930
*/
@@ -33,8 +34,7 @@ public interface TcpListener {
3334
/**
3435
* Called by a TCPConnection when a new message arrives.
3536
* @param message The message.
36-
* @return true if the message was intercepted
3737
*/
38-
boolean onMessage(Message<?> message);
38+
void onMessage(Message<?> message);
3939

4040
}

spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/ConnectionFactoryTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ public void test() throws Exception {
6565
server.registerListener(m -> {
6666
received.set(new ObjectToStringTransformer().transform(m));
6767
latch.countDown();
68-
return false;
6968
});
7069
server.setApplicationEventPublisher(publisher);
7170
server.setBeanFactory(TEST_INTEGRATION_CONTEXT);

spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpInboundGatewayTests.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -351,10 +351,7 @@ private void testCloseStream(AbstractServerConnectionFactory scf,
351351
consumer.start();
352352
AbstractClientConnectionFactory client = ccf.apply(port);
353353
CountDownLatch latch = new CountDownLatch(1);
354-
client.registerListener(message -> {
355-
latch.countDown();
356-
return false;
357-
});
354+
client.registerListener(message -> latch.countDown());
358355
client.setBeanFactory(TEST_INTEGRATION_CONTEXT);
359356
client.afterPropertiesSet();
360357
client.start();

0 commit comments

Comments
 (0)