Skip to content

Commit 92c8b58

Browse files
authored
Improve duplex network wait conditions and ports (#1669)
* Improve duplex network wait conditions and ports * Add CopyOnWriteArrayList import to support thread-safe list operations * Override addNetworkConnectors method in DuplexNetworkTest and MulticastNetworkTest to use what's in the XML for now * Remove overridden addNetworkConnectors method in DuplexNetworkTest as it's no longer needed
1 parent 3a88c10 commit 92c8b58

File tree

5 files changed

+126
-84
lines changed

5 files changed

+126
-84
lines changed

activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java

Lines changed: 66 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818

1919
import org.apache.activemq.ActiveMQConnectionFactory;
2020
import org.apache.activemq.broker.BrokerService;
21+
import org.apache.activemq.command.ActiveMQQueue;
22+
import org.apache.activemq.network.DiscoveryNetworkConnector;
23+
import org.apache.activemq.util.Wait;
2124
import org.junit.Test;
2225
import org.junit.runner.RunWith;
2326
import org.slf4j.Logger;
@@ -27,9 +30,12 @@
2730

2831
import jakarta.annotation.Resource;
2932
import jakarta.jms.*;
33+
import java.net.URI;
3034
import java.util.concurrent.CountDownLatch;
35+
import java.util.concurrent.TimeUnit;
3136

3237
import static org.junit.Assert.assertEquals;
38+
import static org.junit.Assert.assertTrue;
3339

3440
@RunWith(SpringJUnit4ClassRunner.class)
3541
@ContextConfiguration({ "virtual-topic-network-test.xml" })
@@ -53,45 +59,64 @@ public class MessageDestinationVirtualTopicTest {
5359

5460
private Session session1;
5561

56-
public void init() throws JMSException {
62+
public void init() throws Exception {
63+
// Get actual assigned ephemeral ports
64+
final String broker1URL = broker1.getTransportConnectors().get(0).getConnectUri().toString();
65+
final String broker2URL = broker2.getTransportConnectors().get(0).getConnectUri().toString();
66+
LOG.info("Broker1 URL: {}", broker1URL);
67+
LOG.info("Broker2 URL: {}", broker2URL);
68+
69+
// Add network connector from broker2 to broker1 programmatically using actual port
70+
final DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector(
71+
new URI("static://(" + broker1URL + ")"));
72+
nc.setName("linkToBrokerB1");
73+
nc.setNetworkTTL(1);
74+
nc.setDuplex(true);
75+
broker2.addNetworkConnector(nc);
76+
nc.start();
77+
78+
// Wait for bridge to be established
79+
assertTrue("Network bridge should be established",
80+
Wait.waitFor(() -> nc.activeBridges().size() == 1, 10_000, 500));
81+
5782
// Create connection on Broker B2
58-
ConnectionFactory broker2ConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:62616");
59-
Connection connection2 = broker2ConnectionFactory.createConnection();
83+
final ConnectionFactory broker2ConnectionFactory = new ActiveMQConnectionFactory(broker2URL);
84+
final Connection connection2 = broker2ConnectionFactory.createConnection();
6085
connection2.start();
61-
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
62-
Queue consumerDQueue = session2.createQueue("Consumer.D.VirtualTopic.T1");
86+
final Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
87+
final Queue consumerDQueue = session2.createQueue("Consumer.D.VirtualTopic.T1");
6388

6489
// Bind listener on queue for consumer D
65-
MessageConsumer consumer = session2.createConsumer(consumerDQueue);
90+
final MessageConsumer consumer = session2.createConsumer(consumerDQueue);
6691
listener2 = new SimpleMessageListener();
6792
consumer.setMessageListener(listener2);
6893

6994
// Create connection on Broker B1
70-
ConnectionFactory broker1ConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
71-
Connection connection1 = broker1ConnectionFactory.createConnection();
95+
final ConnectionFactory broker1ConnectionFactory = new ActiveMQConnectionFactory(broker1URL);
96+
final Connection connection1 = broker1ConnectionFactory.createConnection();
7297
connection1.start();
7398
session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
74-
Queue consumerCQueue = session1.createQueue("Consumer.C.VirtualTopic.T1");
99+
final Queue consumerCQueue = session1.createQueue("Consumer.C.VirtualTopic.T1");
75100

76-
// Bind listener on queue for consumer D
77-
MessageConsumer consumer1 = session1.createConsumer(consumerCQueue);
101+
// Bind listener on queue for consumer C
102+
final MessageConsumer consumer1 = session1.createConsumer(consumerCQueue);
78103
listener1 = new SimpleMessageListener();
79104
consumer1.setMessageListener(listener1);
80105

81-
// Create listener on Broker B1 for VT T2 witout setOriginalDest
82-
Queue consumer3Queue = session1.createQueue("Consumer.A.VirtualTopic.T2");
106+
// Create listener on Broker B1 for VT T2 without setOriginalDest
107+
final Queue consumer3Queue = session1.createQueue("Consumer.A.VirtualTopic.T2");
83108

84-
// Bind listener on queue for consumer D
85-
MessageConsumer consumerD = session1.createConsumer(consumer3Queue);
109+
// Bind listener on queue for consumer A
110+
final MessageConsumer consumerD = session1.createConsumer(consumer3Queue);
86111
listener3 = new SimpleMessageListener();
87112
consumerD.setMessageListener(listener3);
88113

89114
// Create producer for topic, on B1
90-
Topic virtualTopicT1 = session1.createTopic("VirtualTopic.T1,VirtualTopic.T2");
115+
final Topic virtualTopicT1 = session1.createTopic("VirtualTopic.T1,VirtualTopic.T2");
91116
producer = session1.createProducer(virtualTopicT1);
92117
}
93118

94-
@Test
119+
@Test(timeout = 30_000)
95120
public void testDestinationNames() throws Exception {
96121

97122
LOG.info("Started waiting for broker 1 and 2");
@@ -102,32 +127,46 @@ public void testDestinationNames() throws Exception {
102127
init();
103128

104129
// Create a monitor
105-
CountDownLatch monitor = new CountDownLatch(3);
130+
final CountDownLatch monitor = new CountDownLatch(3);
106131
listener1.setCountDown(monitor);
107132
listener2.setCountDown(monitor);
108133
listener3.setCountDown(monitor);
109134

135+
// Wait for the consumer on broker2 to be visible on broker1 via the network bridge.
136+
// The virtual topic Consumer.D.VirtualTopic.T1 on broker2 must be forwarded to broker1
137+
// before sending, otherwise the message won't reach listener2.
138+
assertTrue("Consumer.D queue should exist on broker1 via network bridge",
139+
Wait.waitFor(() -> {
140+
try {
141+
final org.apache.activemq.broker.region.Destination dest =
142+
broker1.getDestination(new ActiveMQQueue("Consumer.D.VirtualTopic.T1"));
143+
return dest != null && dest.getConsumers().size() >= 1;
144+
} catch (final Exception e) {
145+
return false;
146+
}
147+
}, 10_000, 200));
148+
110149
LOG.info("Sending message");
111150
// Send a message on the topic
112-
TextMessage message = session1.createTextMessage("Hello World !");
151+
final TextMessage message = session1.createTextMessage("Hello World !");
113152
producer.send(message);
114153
LOG.info("Waiting for message reception");
115154
// Wait the two messages in the related queues
116-
monitor.await();
155+
assertTrue("All 3 listeners should receive messages", monitor.await(15, TimeUnit.SECONDS));
117156

118157
// Get the message destinations
119-
String lastJMSDestination2 = listener2.getLastJMSDestination();
120-
System.err.println(lastJMSDestination2);
121-
String lastJMSDestination1 = listener1.getLastJMSDestination();
122-
System.err.println(lastJMSDestination1);
158+
final String lastJMSDestination2 = listener2.getLastJMSDestination();
159+
LOG.info("Listener2 destination: {}", lastJMSDestination2);
160+
final String lastJMSDestination1 = listener1.getLastJMSDestination();
161+
LOG.info("Listener1 destination: {}", lastJMSDestination1);
123162

124-
String lastJMSDestination3 = listener3.getLastJMSDestination();
125-
System.err.println(lastJMSDestination3);
163+
final String lastJMSDestination3 = listener3.getLastJMSDestination();
164+
LOG.info("Listener3 destination: {}", lastJMSDestination3);
126165

127166
// The destination names
128167
assertEquals("queue://Consumer.D.VirtualTopic.T1", lastJMSDestination2);
129168
assertEquals("queue://Consumer.C.VirtualTopic.T1", lastJMSDestination1);
130169
assertEquals("topic://VirtualTopic.T2", lastJMSDestination3);
131170

132171
}
133-
}
172+
}

activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,15 @@ protected void doSetUp(boolean deleteAllMessages) throws Exception {
9393
// Use startNetworkConnector() instead of connector.start() to ensure proper JMX MBean registration.
9494
addNetworkConnectors();
9595

96-
// Wait for both network bridges to be FULLY started (advisory consumers registered).
96+
// Wait for network bridges to be FULLY started (advisory consumers registered).
9797
// activeBridges().isEmpty() is NOT sufficient because bridges are added to the map
9898
// before start() completes asynchronously. We must wait for the startedLatch.
9999
waitForBridgeFullyStarted(localBroker, "Local");
100-
waitForBridgeFullyStarted(remoteBroker, "Remote");
100+
// Only wait for remote bridge if the remote broker has its own network connector
101+
// (duplex bridges don't add a separate connector on the remote side)
102+
if (!remoteBroker.getNetworkConnectors().isEmpty()) {
103+
waitForBridgeFullyStarted(remoteBroker, "Remote");
104+
}
101105

102106
final URI localURI = localBroker.getVmConnectorURI();
103107
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
@@ -155,13 +159,9 @@ protected void addNetworkConnectors() throws Exception {
155159
}
156160

157161
protected void waitForBridgeFullyStarted(final BrokerService broker, final String label) throws Exception {
158-
// Skip if broker has no network connectors (e.g., duplex target broker receives
159-
// bridge connections but doesn't initiate them)
160-
if (broker.getNetworkConnectors().isEmpty()) {
161-
return;
162-
}
163162
assertTrue(label + " broker bridge should be fully started", Wait.waitFor(() -> {
164-
if (broker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
163+
if (broker.getNetworkConnectors().isEmpty()
164+
|| broker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
165165
return false;
166166
}
167167
final NetworkBridge bridge = broker.getNetworkConnectors().get(0).activeBridges().iterator().next();

activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,8 +232,9 @@ protected void assertSubscriptionMapCounts(NetworkBridge networkBridge, final in
232232
protected DemandForwardingBridge findDuplexBridge(final TransportConnector connector) throws Exception {
233233
assertNotNull(connector);
234234

235-
for (TransportConnection tc : connector.getConnections()) {
236-
if (tc.getConnectionId().startsWith("networkConnector_")) {
235+
for (final TransportConnection tc : connector.getConnections()) {
236+
final String connectionId = tc.getConnectionId();
237+
if (connectionId != null && connectionId.startsWith("networkConnector_")) {
237238
final Field bridgeField = TransportConnection.class.getDeclaredField("duplexBridge");
238239
bridgeField.setAccessible(true);
239240
return (DemandForwardingBridge) bridgeField.get(tc);

activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import static org.junit.Assert.assertTrue;
2323

2424
import java.lang.reflect.Field;
25+
import java.net.URI;
26+
import java.util.ArrayList;
27+
import java.util.List;
2528
import java.util.concurrent.CopyOnWriteArrayList;
2629

2730
import jakarta.jms.MessageProducer;
@@ -31,6 +34,9 @@
3134
import org.apache.activemq.broker.BrokerService;
3235
import org.apache.activemq.broker.TransportConnection;
3336
import org.apache.activemq.broker.TransportConnector;
37+
import org.apache.activemq.command.ActiveMQDestination;
38+
import org.apache.activemq.command.ActiveMQQueue;
39+
import org.apache.activemq.command.ActiveMQTopic;
3440
import org.apache.activemq.util.Wait;
3541
import org.junit.Test;
3642

@@ -39,8 +45,6 @@
3945
*/
4046
public class DynamicallyIncludedDestinationsDuplexNetworkTest extends SimpleNetworkTest {
4147

42-
private static final int REMOTE_BROKER_TCP_PORT = 61617;
43-
4448
@Override
4549
protected String getLocalBrokerURI() {
4650
return "org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml";
@@ -50,13 +54,35 @@ protected String getLocalBrokerURI() {
5054
protected BrokerService createRemoteBroker() throws Exception {
5155
final BrokerService broker = new BrokerService();
5256
broker.setBrokerName("remoteBroker");
53-
broker.addConnector("tcp://localhost:" + REMOTE_BROKER_TCP_PORT);
57+
broker.addConnector("tcp://localhost:0");
5458
return broker;
5559
}
5660

5761
@Override
5862
protected void addNetworkConnectors() throws Exception {
59-
// No-op: duplex network connector is already defined in duplexDynamicIncludedDestLocalBroker.xml
63+
// For duplex test: only one connector from local to remote, with duplex=true
64+
final URI remoteConnectURI = remoteBroker.getTransportConnectors().get(0).getConnectUri();
65+
66+
final DiscoveryNetworkConnector localToRemote = new DiscoveryNetworkConnector(
67+
new URI("static:(" + remoteConnectURI + ")"));
68+
localToRemote.setName("networkConnector");
69+
localToRemote.setDuplex(true);
70+
localToRemote.setDynamicOnly(false);
71+
localToRemote.setConduitSubscriptions(true);
72+
localToRemote.setDecreaseNetworkConsumerPriority(false);
73+
74+
final List<ActiveMQDestination> dynamicallyIncluded = new ArrayList<>();
75+
dynamicallyIncluded.add(new ActiveMQQueue("include.test.foo"));
76+
dynamicallyIncluded.add(new ActiveMQTopic("include.test.bar"));
77+
localToRemote.setDynamicallyIncludedDestinations(dynamicallyIncluded);
78+
79+
final List<ActiveMQDestination> excluded = new ArrayList<>();
80+
excluded.add(new ActiveMQQueue("exclude.test.foo"));
81+
excluded.add(new ActiveMQTopic("exclude.test.bar"));
82+
localToRemote.setExcludedDestinations(excluded);
83+
84+
localBroker.addNetworkConnector(localToRemote);
85+
localBroker.startNetworkConnector(localToRemote, null);
6086
}
6187

6288
// we have to override this, because with dynamicallyIncludedDestinations working properly
@@ -70,14 +96,14 @@ public void testRequestReply() throws Exception {
7096

7197
@Test
7298
public void testTempQueues() throws Exception {
73-
TemporaryQueue temp = localSession.createTemporaryQueue();
74-
MessageProducer producer = localSession.createProducer(temp);
99+
final TemporaryQueue temp = localSession.createTemporaryQueue();
100+
final MessageProducer producer = localSession.createProducer(temp);
75101
producer.send(localSession.createTextMessage("test"));
76-
Thread.sleep(100);
77-
assertEquals("Destination not created", 1, remoteBroker.getAdminView().getTemporaryQueues().length);
102+
assertTrue("Destination created on remote",
103+
Wait.waitFor(() -> remoteBroker.getAdminView().getTemporaryQueues().length == 1, 5000, 100));
78104
temp.delete();
79-
Thread.sleep(100);
80-
assertEquals("Destination not deleted", 0, remoteBroker.getAdminView().getTemporaryQueues().length);
105+
assertTrue("Destination deleted on remote",
106+
Wait.waitFor(() -> remoteBroker.getAdminView().getTemporaryQueues().length == 0, 5000, 100));
81107
}
82108

83109
@Test(timeout = 60 * 1000)
@@ -101,18 +127,16 @@ public void testDynamicallyIncludedDestinationsForDuplex() throws Exception{
101127
configuration.getDestinationFilter());
102128
}
103129

104-
private NetworkBridgeConfiguration getConfigurationFromNetworkBridge(DemandForwardingBridgeSupport duplexBridge) throws NoSuchFieldException, IllegalAccessException {
105-
Field f = DemandForwardingBridgeSupport.class.getDeclaredField("configuration");
130+
private NetworkBridgeConfiguration getConfigurationFromNetworkBridge(final DemandForwardingBridgeSupport duplexBridge) throws NoSuchFieldException, IllegalAccessException {
131+
final Field f = DemandForwardingBridgeSupport.class.getDeclaredField("configuration");
106132
f.setAccessible(true);
107-
NetworkBridgeConfiguration configuration = (NetworkBridgeConfiguration) f.get(duplexBridge);
108-
return configuration;
133+
return (NetworkBridgeConfiguration) f.get(duplexBridge);
109134
}
110135

111-
private DemandForwardingBridge getDuplexBridgeFromConnection(TransportConnection bridgeConnection) throws NoSuchFieldException, IllegalAccessException {
112-
Field f = TransportConnection.class.getDeclaredField("duplexBridge");
136+
private DemandForwardingBridge getDuplexBridgeFromConnection(final TransportConnection bridgeConnection) throws NoSuchFieldException, IllegalAccessException {
137+
final Field f = TransportConnection.class.getDeclaredField("duplexBridge");
113138
f.setAccessible(true);
114-
DemandForwardingBridge bridge = (DemandForwardingBridge) f.get(bridgeConnection);
115-
return bridge;
139+
return (DemandForwardingBridge) f.get(bridgeConnection);
116140
}
117141

118142
private DemandForwardingBridge waitForDuplexBridge(final TransportConnection bridgeConnection) throws Exception {
@@ -136,13 +160,9 @@ protected void assertNetworkBridgeStatistics(final long expectedLocalSent, final
136160

137161
final NetworkBridge localBridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
138162

139-
assertTrue(Wait.waitFor(new Wait.Condition() {
140-
@Override
141-
public boolean isSatisified() throws Exception {
142-
return expectedLocalSent == localBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
143-
expectedRemoteSent == localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount();
144-
}
145-
}));
163+
assertTrue(Wait.waitFor(() ->
164+
expectedLocalSent == localBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
165+
expectedRemoteSent == localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount()));
146166

147167
}
148168
}

activemq-unit-tests/src/test/resources/org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
The ASF licenses this file to You under the Apache License, Version 2.0
77
(the "License"); you may not use this file except in compliance with
88
the License. You may obtain a copy of the License at
9-
9+
1010
http://www.apache.org/licenses/LICENSE-2.0
11-
11+
1212
Unless required by applicable law or agreed to in writing, software
1313
distributed under the License is distributed on an "AS IS" BASIS,
1414
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,28 +25,10 @@
2525
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
2626

2727
<broker brokerName="localBroker" start="false" persistent="true" useShutdownHook="false" xmlns="http://activemq.apache.org/schema/core">
28-
<networkConnectors>
29-
<networkConnector uri="static:(tcp://localhost:61617)"
30-
duplex="true"
31-
dynamicOnly = "false"
32-
conduitSubscriptions = "true"
33-
decreaseNetworkConsumerPriority = "false">
34-
35-
<dynamicallyIncludedDestinations>
36-
<queue physicalName="include.test.foo"/>
37-
<topic physicalName="include.test.bar"/>
38-
</dynamicallyIncludedDestinations>
39-
40-
<excludedDestinations>
41-
<queue physicalName="exclude.test.foo"/>
42-
<topic physicalName="exclude.test.bar"/>
43-
</excludedDestinations>
44-
45-
</networkConnector>
46-
</networkConnectors>
28+
<!-- Network connector is added programmatically in the test to use ephemeral ports -->
4729

4830
<transportConnectors>
49-
<transportConnector uri="tcp://localhost:61616"/>
31+
<transportConnector uri="tcp://localhost:0"/>
5032
</transportConnectors>
5133

5234
</broker>

0 commit comments

Comments
 (0)