Skip to content

Commit bd66c70

Browse files
committed
Potential fix for flaky STOMP integration tests
When ReactorNetty2StompBrokerRelayIntegrationTests fail, typically there are multiple exceptions "Connection refused: /127.0.0.1:61613" that appear after we've conneted, sent CONNECT, and expecting CONNECTED, but that does not come within the 10 second timeout. 61613 is the default port for STOMP. However, in all integration tests we start ActiveMQ with port 0 which results in a random port. Moreover, the stacktrace is for Netty 4 (not 5), and the eventloop thread id's are different than the one where the connection to the correct, random port was established. The suspicion is that these are log messages from MessageBrokerConfigurationTests which focuses on testing configuration but nevertheless as a bean starts and attempts to connect to the default port and fails. Perhaps those attempts to connect on the default port somehow affect the ActiveMQ server, and it stops responding. This change adds a no-op TcpClient in MessageBrokerConfigurationTests to avoid unnecessary attempts to connect that are not needed. See gh-29287
1 parent e737980 commit bd66c70

File tree

2 files changed

+30
-3
lines changed

2 files changed

+30
-3
lines changed

spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Iterator;
2121
import java.util.List;
2222
import java.util.Set;
23+
import java.util.concurrent.CompletableFuture;
2324
import java.util.concurrent.ConcurrentHashMap;
2425

2526
import org.junit.jupiter.api.Test;
@@ -69,6 +70,9 @@
6970
import org.springframework.messaging.support.ChannelInterceptor;
7071
import org.springframework.messaging.support.ExecutorSubscribableChannel;
7172
import org.springframework.messaging.support.MessageBuilder;
73+
import org.springframework.messaging.tcp.ReconnectStrategy;
74+
import org.springframework.messaging.tcp.TcpConnectionHandler;
75+
import org.springframework.messaging.tcp.TcpOperations;
7276
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
7377
import org.springframework.stereotype.Controller;
7478
import org.springframework.util.AntPathMatcher;
@@ -622,7 +626,9 @@ static class BrokerRelayConfig extends SimpleBrokerConfig {
622626

623627
@Override
624628
public void configureMessageBroker(MessageBrokerRegistry registry) {
625-
registry.enableStompBrokerRelay("/topic", "/queue").setAutoStartup(true)
629+
registry.enableStompBrokerRelay("/topic", "/queue")
630+
.setAutoStartup(true)
631+
.setTcpClient(new NoOpTcpClient())
626632
.setUserDestinationBroadcast("/topic/unresolved-user-destination")
627633
.setUserRegistryBroadcast("/topic/simp-user-registry");
628634
}
@@ -787,4 +793,24 @@ public void validate(@Nullable Object target, Errors errors) {
787793
private static class CustomThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
788794
}
789795

796+
797+
private static class NoOpTcpClient implements TcpOperations<byte[]> {
798+
799+
@Override
800+
public CompletableFuture<Void> connectAsync(TcpConnectionHandler<byte[]> handler) {
801+
return CompletableFuture.completedFuture(null);
802+
}
803+
804+
@Override
805+
public CompletableFuture<Void> connectAsync(TcpConnectionHandler<byte[]> handler, ReconnectStrategy strategy) {
806+
return CompletableFuture.completedFuture(null);
807+
}
808+
809+
@Override
810+
public CompletableFuture<Void> shutdownAsync() {
811+
return CompletableFuture.completedFuture(null);
812+
}
813+
814+
}
815+
790816
}

spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/AbstractStompBrokerRelayIntegrationTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,13 +228,14 @@ public void relayReconnectsIfBrokerComesBackUp() throws Exception {
228228
this.relay.handleMessage(subscribe.message);
229229
this.responseHandler.expectMessages(subscribe);
230230

231-
MessageExchange error = MessageExchangeBuilder.error(sess1).build();
232231
stopActiveMqBrokerAndAwait();
233-
this.responseHandler.expectMessages(error);
234232

233+
MessageExchange error = MessageExchangeBuilder.error(sess1).build();
234+
this.responseHandler.expectMessages(error);
235235
this.eventPublisher.expectBrokerAvailabilityEvent(false);
236236

237237
startActiveMQBroker();
238+
238239
this.eventPublisher.expectBrokerAvailabilityEvent(true);
239240
}
240241

0 commit comments

Comments
 (0)