Skip to content

Commit 1357cd8

Browse files
authored
Multi broker setup hardening (#1627)
* Use ephemeral ports for broker creation in DurableFiveBrokerNetworkBridgeTest to prevent port conflicts during parallel test execution Enhance broker startup reliability by replacing Thread.sleep with Wait.waitFor for transport connector readiness * Improve broker readiness check in JmsMultipleBrokersTestSupport to skip VM-only brokers and enhance reliability * Fix startAllBrokers() transport connector check for VM-only and secured brokers - Skip readiness check for VM transport connectors (no network to verify) - Treat JMSSecurityException as ready (broker accepts connections, just enforces auth) - Fix AdvisoryViaNetworkTest race conditions with waitForConsumerOnBroker()
1 parent 578de9b commit 1357cd8

File tree

3 files changed

+74
-22
lines changed

3 files changed

+74
-22
lines changed

activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -271,14 +271,46 @@ protected void waitForBridgeFormation() throws Exception {
271271
}
272272

273273
protected void startAllBrokers() throws Exception {
274-
Collection<BrokerItem> brokerList = brokers.values();
274+
final Collection<BrokerItem> brokerList = brokers.values();
275275
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
276-
BrokerService broker = i.next().broker;
276+
final BrokerService broker = i.next().broker;
277277
broker.start();
278278
broker.waitUntilStarted();
279279
}
280280

281-
Thread.sleep(maxSetupTime);
281+
// Wait for all brokers with TCP transport connectors to be ready to accept connections
282+
// instead of using Thread.sleep which is unreliable across different machines.
283+
// Skip this check for VM-only brokers since VM transport doesn't require network readiness
284+
// and some tests intentionally configure brokers to reject connections.
285+
for (final BrokerItem brokerItem : brokerList) {
286+
final BrokerService broker = brokerItem.broker;
287+
// Only check transport connector readiness for TCP/network connectors
288+
// (skip VM-only brokers which don't require network readiness checks)
289+
final boolean hasTcpConnector = broker.getTransportConnectors().stream()
290+
.anyMatch(tc -> {
291+
final String scheme = tc.getUri().getScheme();
292+
return scheme != null && !scheme.equals("vm");
293+
});
294+
if (hasTcpConnector) {
295+
assertTrue("Broker " + broker.getBrokerName() + " transport connectors ready",
296+
Wait.waitFor(() -> {
297+
if (!broker.isStarted()) {
298+
return false;
299+
}
300+
// Try to create a test connection to verify transport is accepting connections
301+
try (final Connection testConn = brokerItem.createConnection()) {
302+
return true;
303+
} catch (final jakarta.jms.JMSSecurityException e) {
304+
// Security exception means the broker IS accepting connections,
305+
// it's just enforcing authentication - consider it ready
306+
return true;
307+
} catch (final Exception e) {
308+
LOG.debug("Broker {} not ready yet: {}", broker.getBrokerName(), e.getMessage());
309+
return false;
310+
}
311+
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.SECONDS.toMillis(1)));
312+
}
313+
}
282314
}
283315

284316
protected BrokerService createBroker(String brokerName) throws Exception {

activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -313,9 +313,8 @@ protected void testDurablePropagation5Broker() throws Exception {
313313
// Setup consumers
314314
Session ses = createSession("Broker_A_A");
315315
MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
316-
Thread.sleep(1000);
317316

318-
// let consumers propagate around the network
317+
// let consumers propagate around the network (assertNCDurableSubsCount waits internally)
319318
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
320319
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
321320
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
@@ -328,9 +327,8 @@ protected void testDurablePropagation5Broker() throws Exception {
328327
//bring online a consumer on the other side
329328
Session ses2 = createSession("Broker_E_E");
330329
MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
331-
Thread.sleep(1000);
332330

333-
//there will be 2 network durables, 1 for each direction of the bridge
331+
//there will be 2 network durables, 1 for each direction of the bridge (assertNCDurableSubsCount waits internally)
334332
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
335333
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 2);
336334
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 2);
@@ -384,16 +382,14 @@ protected void testDurablePropagationSpoke() throws Exception {
384382

385383
MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
386384
MessageConsumer clientAB = ses.createDurableSubscriber(dest, "subAB");
387-
Thread.sleep(1000);
388385

389-
// let consumers propagate around the network
386+
// let consumers propagate around the network (assertNCDurableSubsCount waits internally)
390387
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
391388
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
392389
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
393390
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
394391

395392
MessageConsumer clientD = ses4.createDurableSubscriber(dest, "subD");
396-
Thread.sleep(1000);
397393

398394
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
399395
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
@@ -759,7 +755,6 @@ private void testDurablePropagation(int ttl, boolean dynamicOnly, boolean restar
759755
Session ses2 = createSession("Broker_E_E");
760756
MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
761757
MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
762-
Thread.sleep(1000);
763758

764759
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
765760
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
@@ -768,9 +763,8 @@ private void testDurablePropagation(int ttl, boolean dynamicOnly, boolean restar
768763
assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 0);
769764

770765
startNetworkConnectors(nc1, nc2, nc3, nc4);
771-
Thread.sleep(1000);
772766

773-
// Check that the correct network durables exist
767+
// Check that the correct network durables exist (assertNCDurableSubsCount waits internally)
774768
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, expected.get(0));
775769
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, expected.get(1));
776770
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, expected.get(2));
@@ -795,12 +789,10 @@ private void testDurablePropagation(int ttl, boolean dynamicOnly, boolean restar
795789
// to test sync works ok. Things should work for all cases both dynamicOnly
796790
// false and true because TTL info still exits and consumers are online
797791
stopNetworkConnectors(nc1, nc2, nc3, nc4);
798-
Thread.sleep(1000);
799792
startNetworkConnectors(nc1, nc2, nc3, nc4);
800-
Thread.sleep(1000);
801793
}
802794

803-
// after restarting the bridges, check sync/demand are correct
795+
// after restarting the bridges, check sync/demand are correct (assertNCDurableSubsCount waits internally)
804796
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, expected.get(0));
805797
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, expected.get(1));
806798
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, expected.get(2));
@@ -846,12 +838,13 @@ public void setUp() throws Exception {
846838
super.setAutoFail(true);
847839
super.setUp();
848840
deletePersistentMessagesOnStartup = true;
849-
String options = new String("?persistent=true&useJmx=false");
850-
createBroker(new URI("broker:(tcp://localhost:61616)/Broker_A_A" + options));
851-
createBroker(new URI("broker:(tcp://localhost:61617)/Broker_B_B" + options));
852-
createBroker(new URI("broker:(tcp://localhost:61618)/Broker_C_C" + options));
853-
createBroker(new URI("broker:(tcp://localhost:61619)/Broker_D_D" + options));
854-
createBroker(new URI("broker:(tcp://localhost:61620)/Broker_E_E" + options));
841+
final String options = "?persistent=true&useJmx=false";
842+
// Use ephemeral ports (0) to avoid port conflicts when tests run in parallel
843+
createBroker(new URI("broker:(tcp://localhost:0)/Broker_A_A" + options));
844+
createBroker(new URI("broker:(tcp://localhost:0)/Broker_B_B" + options));
845+
createBroker(new URI("broker:(tcp://localhost:0)/Broker_C_C" + options));
846+
createBroker(new URI("broker:(tcp://localhost:0)/Broker_D_D" + options));
847+
createBroker(new URI("broker:(tcp://localhost:0)/Broker_E_E" + options));
855848
}
856849

857850
@Override

activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ public void testAdvisoryPrefetchSize() throws Exception {
104104
createConsumer("A", topic1);
105105
createConsumer("A", new ActiveMQTopic("A.FOO2"));
106106

107+
// Wait for network bridge to propagate consumers to broker B
108+
waitForConsumerOnBroker(brokerB, advisoryTopic);
109+
waitForConsumerOnBroker(brokerB, topic1);
110+
107111
//verify that brokerB's advisory prefetch is 10 but normal topic prefetch is 1
108112
assertEquals(10, brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize());
109113
assertEquals(1, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
@@ -137,6 +141,10 @@ public void testAdvisoryPrefetchSize1() throws Exception {
137141
createConsumer("A", topic1);
138142
createConsumer("A", new ActiveMQTopic("A.FOO2"));
139143

144+
// Wait for network bridge to propagate consumers to broker B
145+
waitForConsumerOnBroker(brokerB, advisoryTopic);
146+
waitForConsumerOnBroker(brokerB, topic1);
147+
140148
//verify that brokerB's advisory prefetch is 1 but normal topic prefetch is 10
141149
assertEquals(1, brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize());
142150
assertEquals(10, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
@@ -168,6 +176,10 @@ public void testAdvisoryPrefetchSizeNotSet() throws Exception {
168176
createConsumer("A", topic1);
169177
createConsumer("A", new ActiveMQTopic("A.FOO2"));
170178

179+
// Wait for network bridge to propagate consumers to broker B
180+
waitForConsumerOnBroker(brokerB, advisoryTopic);
181+
waitForConsumerOnBroker(brokerB, topic1);
182+
171183
//verify that both consumers have a prefetch of 10
172184
assertEquals(10, brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize());
173185
assertEquals(10, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
@@ -198,6 +210,10 @@ public void testPrefetchSize1() throws Exception {
198210
createConsumer("A", topic1);
199211
createConsumer("A", new ActiveMQTopic("A.FOO2"));
200212

213+
// Wait for network bridge to propagate consumers to broker B
214+
waitForConsumerOnBroker(brokerB, advisoryTopic);
215+
waitForConsumerOnBroker(brokerB, topic1);
216+
201217
//verify that both consumers have a prefetch of 1
202218
assertEquals(1, brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize());
203219
assertEquals(1, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
@@ -392,4 +408,15 @@ public void setUp() throws Exception {
392408
super.setUp();
393409
}
394410

411+
private void waitForConsumerOnBroker(final BrokerService broker, final ActiveMQTopic topic) throws Exception {
412+
assertTrue("Consumer on " + broker.getBrokerName() + " for " + topic,
413+
Wait.waitFor(() -> {
414+
try {
415+
return !broker.getDestination(topic).getConsumers().isEmpty();
416+
} catch (final Exception e) {
417+
return false;
418+
}
419+
}, 30000, 100));
420+
}
421+
395422
}

0 commit comments

Comments
 (0)