Skip to content

Commit 902b85c

Browse files
authored
Support ephemeral ports in network config and Spring (#1674)
1 parent 6450290 commit 902b85c

File tree

10 files changed

+307
-130
lines changed

10 files changed

+307
-130
lines changed

activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java

Lines changed: 90 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,22 @@
1616
*/
1717
package org.apache.activemq.network;
1818

19+
import static org.junit.Assert.assertTrue;
20+
1921
import java.net.URI;
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
import java.util.concurrent.TimeUnit;
2025

2126
import jakarta.jms.Connection;
2227
import jakarta.jms.Session;
2328

2429
import org.apache.activemq.ActiveMQConnectionFactory;
2530
import org.apache.activemq.broker.BrokerService;
31+
import org.apache.activemq.command.ActiveMQDestination;
32+
import org.apache.activemq.command.ActiveMQQueue;
33+
import org.apache.activemq.command.ActiveMQTopic;
34+
import org.apache.activemq.util.Wait;
2635
import org.apache.activemq.xbean.BrokerFactoryBean;
2736
import org.junit.After;
2837
import org.junit.Before;
@@ -59,11 +68,15 @@ protected void doTearDown() throws Exception {
5968
if(remoteConnection != null)
6069
remoteConnection.close();
6170

62-
if(localBroker != null)
71+
if(localBroker != null) {
6372
localBroker.stop();
73+
localBroker.waitUntilStopped();
74+
}
6475

65-
if(remoteBroker != null)
76+
if(remoteBroker != null) {
6677
remoteBroker.stop();
78+
remoteBroker.waitUntilStopped();
79+
}
6780
}
6881

6982
protected void doSetUp(boolean deleteAllMessages) throws Exception {
@@ -75,14 +88,25 @@ protected void doSetUp(boolean deleteAllMessages) throws Exception {
7588
localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
7689
localBroker.start();
7790
localBroker.waitUntilStarted();
78-
URI localURI = localBroker.getVmConnectorURI();
91+
92+
// Programmatically add network connectors using the actual assigned ephemeral ports.
93+
// Use startNetworkConnector() instead of connector.start() to ensure proper JMX MBean registration.
94+
addNetworkConnectors();
95+
96+
// Wait for both network bridges to be FULLY started (advisory consumers registered).
97+
// activeBridges().isEmpty() is NOT sufficient because bridges are added to the map
98+
// before start() completes asynchronously. We must wait for the startedLatch.
99+
waitForBridgeFullyStarted(localBroker, "Local");
100+
waitForBridgeFullyStarted(remoteBroker, "Remote");
101+
102+
final URI localURI = localBroker.getVmConnectorURI();
79103
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
80104
fac.setAlwaysSyncSend(true);
81105
fac.setDispatchAsync(false);
82106
localConnection = fac.createConnection();
83107
localConnection.setClientID("clientId");
84108
localConnection.start();
85-
URI remoteURI = remoteBroker.getVmConnectorURI();
109+
final URI remoteURI = remoteBroker.getVmConnectorURI();
86110
fac = new ActiveMQConnectionFactory(remoteURI);
87111
remoteConnection = fac.createConnection();
88112
remoteConnection.setClientID("clientId");
@@ -91,21 +115,76 @@ protected void doSetUp(boolean deleteAllMessages) throws Exception {
91115
remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
92116
}
93117

118+
/**
119+
* Programmatically adds network connectors between the local and remote brokers
120+
* using the actual assigned ephemeral ports. This avoids hardcoding ports in XML
121+
* config files which causes port conflicts on CI.
122+
*/
123+
protected void addNetworkConnectors() throws Exception {
124+
final URI remoteConnectURI = remoteBroker.getTransportConnectors().get(0).getConnectUri();
125+
final URI localConnectURI = localBroker.getTransportConnectors().get(0).getConnectUri();
126+
127+
// Local -> Remote network connector (matches the original localBroker.xml config)
128+
final DiscoveryNetworkConnector localToRemote = new DiscoveryNetworkConnector(
129+
new URI("static:(" + remoteConnectURI + ")"));
130+
localToRemote.setName("networkConnector");
131+
localToRemote.setDynamicOnly(false);
132+
localToRemote.setConduitSubscriptions(true);
133+
localToRemote.setDecreaseNetworkConsumerPriority(false);
134+
135+
final List<ActiveMQDestination> dynamicallyIncluded = new ArrayList<>();
136+
dynamicallyIncluded.add(new ActiveMQQueue("include.test.foo"));
137+
dynamicallyIncluded.add(new ActiveMQTopic("include.test.bar"));
138+
localToRemote.setDynamicallyIncludedDestinations(dynamicallyIncluded);
139+
140+
final List<ActiveMQDestination> excluded = new ArrayList<>();
141+
excluded.add(new ActiveMQQueue("exclude.test.foo"));
142+
excluded.add(new ActiveMQTopic("exclude.test.bar"));
143+
localToRemote.setExcludedDestinations(excluded);
144+
145+
localBroker.addNetworkConnector(localToRemote);
146+
// startNetworkConnector handles JMX MBean registration and connector startup
147+
localBroker.startNetworkConnector(localToRemote, null);
148+
149+
// Remote -> Local network connector (matches the original remoteBroker.xml config)
150+
final DiscoveryNetworkConnector remoteToLocal = new DiscoveryNetworkConnector(
151+
new URI("static:(" + localConnectURI + ")"));
152+
remoteToLocal.setName("networkConnector");
153+
remoteBroker.addNetworkConnector(remoteToLocal);
154+
remoteBroker.startNetworkConnector(remoteToLocal, null);
155+
}
156+
157+
protected void waitForBridgeFullyStarted(final BrokerService broker, final String label) throws Exception {
158+
// Skip if broker has no network connectors (e.g., duplex target broker receives
159+
// bridge connections but doesn't initiate them)
160+
if (broker.getNetworkConnectors().isEmpty()) {
161+
return;
162+
}
163+
assertTrue(label + " broker bridge should be fully started", Wait.waitFor(() -> {
164+
if (broker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
165+
return false;
166+
}
167+
final NetworkBridge bridge = broker.getNetworkConnectors().get(0).activeBridges().iterator().next();
168+
if (bridge instanceof DemandForwardingBridgeSupport) {
169+
return ((DemandForwardingBridgeSupport) bridge).startedLatch.getCount() == 0;
170+
}
171+
return true;
172+
}, TimeUnit.SECONDS.toMillis(10), 100));
173+
}
174+
94175
protected String getRemoteBrokerURI() {
95-
return "org/apache/activemq/network/remoteBroker.xml";
176+
return "org/apache/activemq/network/remoteBroker-ephemeral.xml";
96177
}
97178

98179
protected String getLocalBrokerURI() {
99-
return "org/apache/activemq/network/localBroker.xml";
180+
return "org/apache/activemq/network/localBroker-ephemeral.xml";
100181
}
101182

102183
protected BrokerService createBroker(String uri) throws Exception {
103-
Resource resource = new ClassPathResource(uri);
104-
BrokerFactoryBean factory = new BrokerFactoryBean(resource);
105-
resource = new ClassPathResource(uri);
106-
factory = new BrokerFactoryBean(resource);
184+
final Resource resource = new ClassPathResource(uri);
185+
final BrokerFactoryBean factory = new BrokerFactoryBean(resource);
107186
factory.afterPropertiesSet();
108-
BrokerService result = factory.getBroker();
187+
final BrokerService result = factory.getBroker();
109188
return result;
110189
}
111190

activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,17 @@ protected String getLocalBrokerURI() {
4040

4141
@Override
4242
protected BrokerService createRemoteBroker() throws Exception {
43-
BrokerService broker = new BrokerService();
43+
final BrokerService broker = new BrokerService();
4444
broker.setBrokerName("remoteBroker");
4545
broker.addConnector("tcp://localhost:61617?transport.connectAttemptTimeout=2000");
4646
return broker;
4747
}
4848

49+
@Override
50+
protected void addNetworkConnectors() throws Exception {
51+
// No-op: duplex network connector is already defined in duplexLocalBroker.xml
52+
}
53+
4954
@Test
5055
public void testTempQueues() throws Exception {
5156
TemporaryQueue temp = localSession.createTemporaryQueue();

activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,17 @@ protected String getLocalBrokerURI() {
4848

4949
@Override
5050
protected BrokerService createRemoteBroker() throws Exception {
51-
BrokerService broker = new BrokerService();
51+
final BrokerService broker = new BrokerService();
5252
broker.setBrokerName("remoteBroker");
5353
broker.addConnector("tcp://localhost:" + REMOTE_BROKER_TCP_PORT);
5454
return broker;
5555
}
5656

57+
@Override
58+
protected void addNetworkConnectors() throws Exception {
59+
// No-op: duplex network connector is already defined in duplexDynamicIncludedDestLocalBroker.xml
60+
}
61+
5762
// we have to override this, because with dynamicallyIncludedDestinations working properly
5863
// (see https://issues.apache.org/jira/browse/AMQ-4209) you can't get request/response
5964
// with temps working (there is no wild card like there is for staticallyIncludedDest)

activemq-unit-tests/src/test/java/org/apache/activemq/network/MulticastNetworkTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@
2121
*/
2222
public class MulticastNetworkTest extends SimpleNetworkTest {
2323

24+
@Override
25+
protected void addNetworkConnectors() throws Exception {
26+
// No-op: multicast network connectors are already defined in the XML configs
27+
}
28+
2429
protected String getRemoteBrokerURI() {
2530
return "org/apache/activemq/network/multicast/remoteBroker.xml";
2631
}

activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java

Lines changed: 61 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.activemq.network;
1818

1919
import java.io.IOException;
20+
import java.net.URI;
2021
import java.util.concurrent.atomic.AtomicInteger;
2122

2223
import jakarta.jms.Connection;
@@ -25,7 +26,6 @@
2526
import jakarta.jms.JMSException;
2627
import jakarta.jms.Message;
2728
import jakarta.jms.MessageConsumer;
28-
import jakarta.jms.MessageListener;
2929
import jakarta.jms.MessageProducer;
3030
import jakarta.jms.Queue;
3131
import jakarta.jms.Session;
@@ -39,6 +39,7 @@
3939
import org.apache.activemq.broker.BrokerService;
4040
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
4141
import org.apache.activemq.command.ActiveMQQueue;
42+
import org.apache.activemq.command.ActiveMQTopic;
4243
import org.apache.activemq.transport.TransportFilter;
4344
import org.apache.activemq.transport.failover.FailoverTransport;
4445
import org.apache.activemq.xbean.BrokerFactoryBean;
@@ -68,32 +69,28 @@ public class NetworkFailoverTest extends TestCase {
6869
public void testRequestReply() throws Exception {
6970
final MessageProducer remoteProducer = remoteSession.createProducer(null);
7071
MessageConsumer remoteConsumer = remoteSession.createConsumer(included);
71-
remoteConsumer.setMessageListener(new MessageListener() {
72-
@Override
73-
public void onMessage(Message msg) {
74-
final TextMessage textMsg = (TextMessage)msg;
72+
remoteConsumer.setMessageListener(msg -> {
73+
final TextMessage textMsg = (TextMessage) msg;
74+
try {
75+
final String payload = "REPLY: " + textMsg.getText() + ", " + textMsg.getJMSMessageID();
76+
final Destination replyTo = msg.getJMSReplyTo();
77+
textMsg.clearBody();
78+
textMsg.setText(payload);
79+
LOG.info("*** Sending response: {}", textMsg.getText());
80+
remoteProducer.send(replyTo, textMsg);
81+
LOG.info("replied with: " + textMsg.getJMSMessageID());
82+
83+
} catch (DestinationDoesNotExistException expected) {
84+
// been removed but not yet recreated
85+
replyToNonExistDest.incrementAndGet();
7586
try {
76-
String payload = "REPLY: " + textMsg.getText() + ", " + textMsg.getJMSMessageID();
77-
Destination replyTo;
78-
replyTo = msg.getJMSReplyTo();
79-
textMsg.clearBody();
80-
textMsg.setText(payload);
81-
LOG.info("*** Sending response: {}", textMsg.getText());
82-
remoteProducer.send(replyTo, textMsg);
83-
LOG.info("replied with: " + textMsg.getJMSMessageID());
84-
85-
} catch (DestinationDoesNotExistException expected) {
86-
// been removed but not yet recreated
87-
replyToNonExistDest.incrementAndGet();
88-
try {
89-
LOG.info("NED: " + textMsg.getJMSMessageID());
90-
} catch (JMSException e) {
91-
e.printStackTrace();
92-
};
93-
} catch (Exception e) {
94-
LOG.warn("*** Responder listener caught exception: ", e);
87+
LOG.info("NED: " + textMsg.getJMSMessageID());
88+
} catch (JMSException e) {
9589
e.printStackTrace();
9690
}
91+
} catch (Exception e) {
92+
LOG.warn("*** Responder listener caught exception: ", e);
93+
e.printStackTrace();
9794
}
9895
});
9996

@@ -104,16 +101,13 @@ public void onMessage(Message msg) {
104101

105102
// track remote dlq for forward failures
106103
MessageConsumer dlqconsumer = remoteSession.createConsumer(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
107-
dlqconsumer.setMessageListener(new MessageListener() {
108-
@Override
109-
public void onMessage(Message message) {
110-
try {
111-
LOG.info("dlq " + message.getJMSMessageID());
112-
} catch (JMSException e) {
113-
e.printStackTrace();
114-
}
115-
remoteDLQCount.incrementAndGet();
104+
dlqconsumer.setMessageListener(message -> {
105+
try {
106+
LOG.info("dlq " + message.getJMSMessageID());
107+
} catch (JMSException e) {
108+
e.printStackTrace();
116109
}
110+
remoteDLQCount.incrementAndGet();
117111
});
118112

119113
// allow for consumer infos to perculate arround
@@ -176,25 +170,51 @@ protected void doTearDown() throws Exception {
176170
} catch(Exception ex) {}
177171
}
178172

179-
protected void doSetUp(boolean deleteAllMessages) throws Exception {
173+
protected void doSetUp(final boolean deleteAllMessages) throws Exception {
180174

181175
remoteBroker = createRemoteBroker();
182176
remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
183177
remoteBroker.setCacheTempDestinations(true);
184178
remoteBroker.start();
179+
remoteBroker.waitUntilStarted();
185180

186181
localBroker = createLocalBroker();
187182
localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
188183
localBroker.setCacheTempDestinations(true);
189184
localBroker.start();
190-
191-
String localURI = "tcp://localhost:61616";
192-
String remoteURI = "tcp://localhost:61617";
193-
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("failover:("+localURI+","+remoteURI+")?randomize=false&backup=false&trackMessages=true");
185+
localBroker.waitUntilStarted();
186+
187+
// Get actual assigned ephemeral ports
188+
final URI localConnectURI = localBroker.getTransportConnectors().get(0).getConnectUri();
189+
final URI remoteConnectURI = remoteBroker.getTransportConnectors().get(0).getConnectUri();
190+
final String localURI = localConnectURI.toString();
191+
final String remoteURI = remoteConnectURI.toString();
192+
193+
// Add network connectors programmatically using actual ports
194+
final DiscoveryNetworkConnector localToRemote = new DiscoveryNetworkConnector(
195+
new URI("static://(" + remoteURI + ")"));
196+
localToRemote.setName("networkConnector");
197+
localToRemote.setDynamicOnly(false);
198+
localToRemote.setConduitSubscriptions(true);
199+
localToRemote.setDecreaseNetworkConsumerPriority(false);
200+
localToRemote.setDynamicallyIncludedDestinations(
201+
java.util.List.of(new ActiveMQQueue("include.test.foo"), new ActiveMQTopic("include.test.bar")));
202+
localToRemote.setExcludedDestinations(
203+
java.util.List.of(new ActiveMQQueue("exclude.test.foo"), new ActiveMQTopic("exclude.test.bar")));
204+
localBroker.addNetworkConnector(localToRemote);
205+
localBroker.startNetworkConnector(localToRemote, null);
206+
207+
final DiscoveryNetworkConnector remoteToLocal = new DiscoveryNetworkConnector(
208+
new URI("static://(" + localURI + ")"));
209+
remoteToLocal.setName("networkConnector");
210+
remoteBroker.addNetworkConnector(remoteToLocal);
211+
remoteBroker.startNetworkConnector(remoteToLocal, null);
212+
213+
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("failover:(" + localURI + "," + remoteURI + ")?randomize=false&backup=false&trackMessages=true");
194214
localConnection = fac.createConnection();
195215
localConnection.setClientID("local");
196216
localConnection.start();
197-
fac = new ActiveMQConnectionFactory("failover:("+remoteURI + ","+localURI+")?randomize=false&backup=false&trackMessages=true");
217+
fac = new ActiveMQConnectionFactory("failover:(" + remoteURI + "," + localURI + ")?randomize=false&backup=false&trackMessages=true");
198218
fac.setWatchTopicAdvisories(false);
199219
remoteConnection = fac.createConnection();
200220
remoteConnection.setClientID("remote");
@@ -205,11 +225,11 @@ protected void doSetUp(boolean deleteAllMessages) throws Exception {
205225
}
206226

207227
protected String getRemoteBrokerURI() {
208-
return "org/apache/activemq/network/remoteBroker.xml";
228+
return "org/apache/activemq/network/remoteBroker-ephemeral.xml";
209229
}
210230

211231
protected String getLocalBrokerURI() {
212-
return "org/apache/activemq/network/localBroker.xml";
232+
return "org/apache/activemq/network/localBroker-ephemeral.xml";
213233
}
214234

215235
protected BrokerService createBroker(String uri) throws Exception {

0 commit comments

Comments
 (0)