Skip to content

Commit 0fa8083

Browse files
authored
test: use ephemeral ports (#1703)
* Use ephemeral ports in tests to avoid port conflicts * Refactor tests to remove unnecessary network connector starts and improve connection handling
1 parent 8d51a29 commit 0fa8083

13 files changed

+143
-90
lines changed

activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@
3636

3737
public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTestSupport {
3838
protected SyncCreateDataSource sharedDs;
39-
protected String MASTER_URL = "tcp://localhost:62001";
40-
protected String SLAVE_URL = "tcp://localhost:62002";
4139
protected String findStatement;
4240

4341
protected void setUp() throws Exception {
@@ -55,7 +53,7 @@ protected void tearDown() throws Exception {
5553
protected void createMaster() throws Exception {
5654
master = new BrokerService();
5755
master.setBrokerName("master");
58-
master.addConnector(MASTER_URL);
56+
master.addConnector("tcp://localhost:0");
5957
master.setUseJmx(false);
6058
master.setPersistent(true);
6159
master.setDeleteAllMessagesOnStartup(true);
@@ -83,7 +81,7 @@ public void run() {
8381
BrokerService broker = new BrokerService();
8482
broker.setBrokerName("slave");
8583
TransportConnector connector = new TransportConnector();
86-
connector.setUri(new URI(SLAVE_URL));
84+
connector.setUri(new URI("tcp://localhost:" + masterPort));
8785
broker.addConnector(connector);
8886
// no need for broker.setMasterConnectorURI(masterConnectorURI)
8987
// as the db lock provides the slave/master initialisation

activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,14 @@ abstract public class QueueMasterSlaveTestSupport extends JmsTopicSendReceiveWit
4848
protected CountDownLatch slaveStarted;
4949
protected int inflightMessageCount;
5050
protected int failureCount = 50;
51-
protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false&useExponentialBackOff=false";
51+
protected int masterPort;
52+
protected String uriString;
5253

5354
@Override
5455
protected void setUp() throws Exception {
56+
// Use ephemeral port for XML-based broker configs
57+
System.setProperty("masterPort", "0");
58+
5559
slaveStarted = new CountDownLatch(1);
5660
slave.set(null);
5761
setMaxTestTime(TimeUnit.MINUTES.toMillis(10));
@@ -64,6 +68,15 @@ protected void setUp() throws Exception {
6468
failureCount = super.messageCount / 2;
6569
super.topic = isTopic();
6670
createMaster();
71+
72+
// Get the actual port assigned by the OS after master starts
73+
masterPort = master.getTransportConnectors().get(0).getConnectUri().getPort();
74+
uriString = "failover://(tcp://localhost:" + masterPort
75+
+ ")?randomize=false&useExponentialBackOff=false";
76+
77+
// Slave reuses the same port so failover reconnects to the same address
78+
System.setProperty("slavePort", String.valueOf(masterPort));
79+
6780
createSlave();
6881
// wait for thing to connect
6982
Thread.sleep(1000);

activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@
3232

3333
public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSupport {
3434
protected DataSource sharedDs;
35-
protected String MASTER_URL = "tcp://localhost:62001";
36-
protected String SLAVE_URL = "tcp://localhost:62002";
3735
File sharedDbDirFile;
3836

3937
@Override
@@ -54,7 +52,7 @@ protected void tearDown() throws Exception {
5452
protected void createMaster() throws Exception {
5553
master = new BrokerService();
5654
master.setBrokerName("master");
57-
master.addConnector(MASTER_URL);
55+
master.addConnector("tcp://localhost:0");
5856
master.setUseJmx(false);
5957
master.setPersistent(true);
6058
master.setDeleteAllMessagesOnStartup(true);
@@ -87,7 +85,7 @@ public void run() {
8785
BrokerService broker = new BrokerService();
8886
broker.setBrokerName("slave");
8987
TransportConnector connector = new TransportConnector();
90-
connector.setUri(new URI(SLAVE_URL));
88+
connector.setUri(new URI("tcp://localhost:" + masterPort));
9189
broker.addConnector(connector);
9290
broker.setUseJmx(false);
9391
broker.setPersistent(true);

activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/mKahaDbQueueMasterSlaveTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,11 @@
2626
import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
2727

2828
public class mKahaDbQueueMasterSlaveTest extends QueueMasterSlaveTestSupport {
29-
protected String MASTER_URL = "tcp://localhost:62001";
30-
protected String SLAVE_URL = "tcp://localhost:62002";
3129

3230
protected void createMaster() throws Exception {
3331
master = new BrokerService();
3432
master.setBrokerName("master");
35-
master.addConnector(MASTER_URL);
33+
master.addConnector("tcp://localhost:0");
3634
master.setUseJmx(false);
3735
master.setPersistent(true);
3836
master.setDeleteAllMessagesOnStartup(true);
@@ -59,7 +57,7 @@ public void run() {
5957
BrokerService broker = new BrokerService();
6058
broker.setBrokerName("slave");
6159
TransportConnector connector = new TransportConnector();
62-
connector.setUri(new URI(SLAVE_URL));
60+
connector.setUri(new URI("tcp://localhost:" + masterPort));
6361
broker.addConnector(connector);
6462
// no need for broker.setMasterConnectorURI(masterConnectorURI)
6563
// as the db lock provides the slave/master initialisation

activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -63,22 +63,30 @@
6363
public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport {
6464
static final String payload = new String(new byte[10 * 1024]);
6565
private static final Logger LOG = LoggerFactory.getLogger(AMQ4485LowLimitTest.class);
66-
final int portBase = 61600;
6766
int numBrokers = 4;
67+
final int[] brokerPorts = new int[numBrokers];
6868
final int numProducers = 10;
6969
final int numMessages = 200;
7070
final int consumerSleepTime = 5;
71-
StringBuilder brokersUrl = new StringBuilder();
7271
HashMap<ActiveMQQueue, AtomicInteger> accumulators = new HashMap<ActiveMQQueue, AtomicInteger>();
7372
private ArrayList<Throwable> exceptions = new ArrayList<Throwable>();
7473

75-
protected void buildUrlList() throws Exception {
74+
protected void collectBrokerPorts() throws Exception {
7675
for (int i = 0; i < numBrokers; i++) {
77-
brokersUrl.append("tcp://localhost:" + (portBase + i));
76+
final BrokerService broker = brokers.get("B" + i).broker;
77+
brokerPorts[i] = broker.getTransportConnectors().get(0).getConnectUri().getPort();
78+
}
79+
}
80+
81+
protected String buildBrokersUrl() {
82+
final StringBuilder sb = new StringBuilder();
83+
for (int i = 0; i < numBrokers; i++) {
84+
sb.append("tcp://localhost:").append(brokerPorts[i]);
7885
if (i != numBrokers - 1) {
79-
brokersUrl.append(',');
86+
sb.append(',');
8087
}
8188
}
89+
return sb.toString();
8290
}
8391

8492
protected BrokerService createBroker(int brokerid) throws Exception {
@@ -95,11 +103,8 @@ protected BrokerService createBroker(int brokerid, boolean addToNetwork) throws
95103

96104
broker.setUseJmx(true);
97105
broker.setBrokerName("B" + brokerid);
98-
broker.addConnector(new URI("tcp://localhost:" + (portBase + brokerid)));
106+
broker.addConnector(new URI("tcp://localhost:0"));
99107

100-
if (addToNetwork) {
101-
addNetworkConnector(broker);
102-
}
103108
broker.setSchedulePeriodForDestinationPurge(0);
104109
broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024l);
105110

@@ -132,12 +137,11 @@ protected BrokerService createBroker(int brokerid, boolean addToNetwork) throws
132137
return broker;
133138
}
134139

135-
private void addNetworkConnector(BrokerService broker) throws Exception {
136-
StringBuilder networkConnectorUrl = new StringBuilder("static:(").append(brokersUrl.toString());
137-
networkConnectorUrl.append(')');
140+
private void addNetworkConnector(BrokerService broker, String brokersUrl) throws Exception {
141+
final String networkConnectorUrl = "static:(" + brokersUrl + ")";
138142

139143
for (int i = 0; i < 2; i++) {
140-
NetworkConnector nc = new DiscoveryNetworkConnector(new URI(networkConnectorUrl.toString()));
144+
final NetworkConnector nc = new DiscoveryNetworkConnector(new URI(networkConnectorUrl));
141145
nc.setName("Bridge-" + i);
142146
nc.setNetworkTTL(1);
143147
nc.setDecreaseNetworkConsumerPriority(true);
@@ -146,6 +150,7 @@ private void addNetworkConnector(BrokerService broker) throws Exception {
146150
nc.setDynamicallyIncludedDestinations(
147151
Arrays.asList(new ActiveMQDestination[]{new ActiveMQQueue("GW.*")}));
148152
broker.addNetworkConnector(nc);
153+
broker.startNetworkConnector(nc, null);
149154
}
150155
}
151156

@@ -156,7 +161,8 @@ public void x_testInterleavedSend() throws Exception {
156161
BrokerService b = createBroker(0, false);
157162
b.start();
158163

159-
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:" + (portBase + 0));
164+
final int port = b.getTransportConnectors().get(0).getConnectUri().getPort();
165+
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:" + port);
160166
connectionFactory.setWatchTopicAdvisories(false);
161167

162168
QueueConnection c1 = connectionFactory.createQueueConnection();
@@ -207,13 +213,19 @@ public void run() {
207213

208214
public void testBrokers() throws Exception {
209215

210-
buildUrlList();
211-
212216
for (int i = 0; i < numBrokers; i++) {
213217
createBroker(i);
214218
}
215219

216220
startAllBrokers();
221+
222+
// Get actual ports after brokers start and add network connectors
223+
collectBrokerPorts();
224+
final String brokersUrl = buildBrokersUrl();
225+
for (int i = 0; i < numBrokers; i++) {
226+
addNetworkConnector(brokers.get("B" + i).broker, brokersUrl);
227+
}
228+
217229
waitForBridgeFormation(numBrokers - 1);
218230

219231
verifyPeerBrokerInfos(numBrokers - 1);
@@ -266,7 +278,7 @@ public boolean isSatisified() throws Exception {
266278

267279
private void startConsumer(String brokerName, ActiveMQDestination destination) throws Exception {
268280
int id = Integer.parseInt(brokerName.substring(1));
269-
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:" + (portBase + id));
281+
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:" + brokerPorts[id]);
270282
connectionFactory.setWatchTopicAdvisories(false);
271283
QueueConnection queueConnection = connectionFactory.createQueueConnection();
272284
queueConnection.start();
@@ -303,7 +315,7 @@ private void startAllGWFanoutConsumers(int nBrokers) throws Exception {
303315
ActiveMQQueue compositeQ = new ActiveMQQueue(compositeDest.toString());
304316

305317
for (int id = 0; id < nBrokers; id++) {
306-
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
318+
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + brokerPorts[id] + ")");
307319
connectionFactory.setWatchTopicAdvisories(false);
308320

309321
QueueConnection queueConnection = connectionFactory.createQueueConnection();
@@ -330,7 +342,7 @@ public void onMessage(Message message) {
330342
private List<ConsumerState> startAllGWConsumers(int nBrokers) throws Exception {
331343
List<ConsumerState> consumerStates = new LinkedList<ConsumerState>();
332344
for (int id = 0; id < nBrokers; id++) {
333-
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
345+
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + brokerPorts[id] + ")");
334346
connectionFactory.setWatchTopicAdvisories(false);
335347

336348
QueueConnection queueConnection = connectionFactory.createQueueConnection();
@@ -393,7 +405,7 @@ private void produce(final int numMessages) throws Exception {
393405
@Override
394406
public void run() {
395407
try {
396-
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
408+
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + brokerPorts[id] + ")");
397409
connectionFactory.setWatchTopicAdvisories(false);
398410
QueueConnection queueConnection = connectionFactory.createQueueConnection();
399411
queueConnection.start();

activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -59,22 +59,30 @@
5959
public class AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest extends JmsMultipleBrokersTestSupport {
6060
static final String payload = new String(new byte[10 * 1024]);
6161
private static final Logger LOG = LoggerFactory.getLogger(AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.class);
62-
final int portBase = 61600;
6362
final int numBrokers = 4;
63+
final int[] brokerPorts = new int[numBrokers];
6464
final int numProducers = 10;
6565
final int numMessages = 800;
6666
final int consumerSleepTime = 20;
67-
StringBuilder brokersUrl = new StringBuilder();
6867
HashMap<ActiveMQQueue, AtomicInteger> accumulators = new HashMap<ActiveMQQueue, AtomicInteger>();
6968
private ArrayList<Throwable> exceptions = new ArrayList<Throwable>();
7069

71-
protected void buildUrlList() throws Exception {
70+
protected void collectBrokerPorts() throws Exception {
7271
for (int i = 0; i < numBrokers; i++) {
73-
brokersUrl.append("tcp://localhost:" + (portBase + i));
72+
final BrokerService broker = brokers.get("B" + i).broker;
73+
brokerPorts[i] = broker.getTransportConnectors().get(0).getConnectUri().getPort();
74+
}
75+
}
76+
77+
protected String buildBrokersUrl() {
78+
final StringBuilder sb = new StringBuilder();
79+
for (int i = 0; i < numBrokers; i++) {
80+
sb.append("tcp://localhost:").append(brokerPorts[i]);
7481
if (i != numBrokers - 1) {
75-
brokersUrl.append(',');
82+
sb.append(',');
7683
}
7784
}
85+
return sb.toString();
7886
}
7987

8088
protected BrokerService createBroker(int brokerid) throws Exception {
@@ -86,9 +94,8 @@ protected BrokerService createBroker(int brokerid) throws Exception {
8694

8795
broker.setUseJmx(true);
8896
broker.setBrokerName("B" + brokerid);
89-
broker.addConnector(new URI("tcp://localhost:" + (portBase + brokerid)));
97+
broker.addConnector(new URI("tcp://localhost:0"));
9098

91-
addNetworkConnector(broker);
9299
broker.setSchedulePeriodForDestinationPurge(0);
93100
broker.getSystemUsage().setSendFailIfNoSpace(true);
94101
broker.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 1024);
@@ -113,12 +120,11 @@ protected BrokerService createBroker(int brokerid) throws Exception {
113120
return broker;
114121
}
115122

116-
private void addNetworkConnector(BrokerService broker) throws Exception {
117-
StringBuilder networkConnectorUrl = new StringBuilder("static:(").append(brokersUrl.toString());
118-
networkConnectorUrl.append(')');
123+
private void addNetworkConnector(BrokerService broker, String brokersUrl) throws Exception {
124+
final String networkConnectorUrl = "static:(" + brokersUrl + ")";
119125

120126
for (int i = 0; i < 2; i++) {
121-
NetworkConnector nc = new DiscoveryNetworkConnector(new URI(networkConnectorUrl.toString()));
127+
final NetworkConnector nc = new DiscoveryNetworkConnector(new URI(networkConnectorUrl));
122128
nc.setName("Bridge-" + i);
123129
nc.setNetworkTTL(1);
124130
nc.setDecreaseNetworkConsumerPriority(true);
@@ -127,18 +133,25 @@ private void addNetworkConnector(BrokerService broker) throws Exception {
127133
nc.setDynamicallyIncludedDestinations(
128134
Arrays.asList(new ActiveMQDestination[]{new ActiveMQQueue("GW.*")}));
129135
broker.addNetworkConnector(nc);
136+
broker.startNetworkConnector(nc, null);
130137
}
131138
}
132139

133140
public void testBrokers() throws Exception {
134141

135-
buildUrlList();
136-
137142
for (int i = 0; i < numBrokers; i++) {
138143
createBroker(i);
139144
}
140145

141146
startAllBrokers();
147+
148+
// Get actual ports after brokers start and add network connectors
149+
collectBrokerPorts();
150+
final String brokersUrl = buildBrokersUrl();
151+
for (int i = 0; i < numBrokers; i++) {
152+
addNetworkConnector(brokers.get("B" + i).broker, brokersUrl);
153+
}
154+
142155
waitForBridgeFormation(numBrokers - 1);
143156

144157
verifyPeerBrokerInfos(numBrokers - 1);
@@ -192,7 +205,7 @@ private void startAllGWFanoutConsumers(int nBrokers) throws Exception {
192205
ActiveMQQueue compositeQ = new ActiveMQQueue(compositeDest.toString());
193206

194207
for (int id = 0; id < nBrokers; id++) {
195-
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
208+
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + brokerPorts[id] + ")");
196209
connectionFactory.setWatchTopicAdvisories(false);
197210

198211
QueueConnection queueConnection = connectionFactory.createQueueConnection();
@@ -219,7 +232,7 @@ public void onMessage(Message message) {
219232
private List<ConsumerState> startAllGWConsumers(int nBrokers) throws Exception {
220233
List<ConsumerState> consumerStates = new LinkedList<ConsumerState>();
221234
for (int id = 0; id < nBrokers; id++) {
222-
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
235+
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + brokerPorts[id] + ")");
223236
connectionFactory.setWatchTopicAdvisories(false);
224237

225238
QueueConnection queueConnection = connectionFactory.createQueueConnection();
@@ -281,7 +294,7 @@ private void produce(int numMessages) throws Exception {
281294
@Override
282295
public void run() {
283296
try {
284-
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
297+
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + brokerPorts[id] + ")");
285298
connectionFactory.setWatchTopicAdvisories(false);
286299
QueueConnection queueConnection = connectionFactory.createQueueConnection();
287300
queueConnection.start();

0 commit comments

Comments
 (0)