Skip to content

Commit aac622a

Browse files
authored
fix(test): resolve flaky tests (#1774)
* fix(test): resolve flaky tests * fix(test): try to make it more stable
1 parent e3ab2d3 commit aac622a

File tree

6 files changed

+100
-112
lines changed

6 files changed

+100
-112
lines changed

activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -285,34 +285,41 @@ public void testProducerAdvisoriesReplayedOnlyTargetNewConsumer() throws Excepti
285285

286286
ActiveMQDestination queue = new ActiveMQQueue("test");
287287
ActiveMQDestination destination = AdvisorySupport.getProducerAdvisoryTopic(queue);
288-
288+
289289
// Setup a first connection
290290
StubConnection connection1 = createConnection();
291291
ConnectionInfo connectionInfo1 = createConnectionInfo();
292292
SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
293293
connection1.send(connectionInfo1);
294294
connection1.send(sessionInfo1);
295-
// Create the first consumer..
295+
// Create the first consumer..
296+
// Use request() to ensure the advisory consumer is fully registered before
297+
// the producer is created. This prevents a race where addConsumer and addProducer
298+
// execute concurrently on different transport threads, which could cause the
299+
// advisory consumer to receive both a broadcast AND a replay of the same producer.
296300
ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
297301
consumerInfo1.setPrefetchSize(100);
298-
connection1.send(consumerInfo1);
302+
connection1.request(consumerInfo1);
299303

300304
// Setup a producer.
301305
StubConnection connection2 = createConnection();
302306
ConnectionInfo connectionInfo2 = createConnectionInfo();
303307
SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
304308
ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
305-
producerInfo2.setDestination(queue);
309+
producerInfo2.setDestination(queue);
306310
connection2.send(connectionInfo2);
307311
connection2.send(sessionInfo2);
308-
connection2.send(producerInfo2);
309-
312+
// Use request() to ensure producer is fully registered and its "new producer"
313+
// advisory is fired before we create the advisory consumer. This prevents a race
314+
// where the advisory consumer could receive both a replay AND the broadcast advisory.
315+
connection2.request(producerInfo2);
316+
310317
Message m1 = receiveMessage(connection1);
311318
assertNotNull(m1);
312319
assertNotNull(m1.getDataStructure());
313320
assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(), producerInfo2.getProducerId());
314-
315-
// Create the 2nd consumer..
321+
322+
// Create the 2nd consumer..
316323
ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
317324
consumerInfo2.setPrefetchSize(100);
318325
connection2.send(consumerInfo2);

activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@
3030

3131
import jakarta.jms.Connection;
3232
import jakarta.jms.JMSException;
33-
import jakarta.jms.Message;
3433
import jakarta.jms.MessageConsumer;
35-
import jakarta.jms.MessageListener;
3634
import jakarta.jms.MessageProducer;
3735
import jakarta.jms.Session;
3836
import jakarta.jms.TextMessage;
@@ -57,19 +55,16 @@ public void testSimulatenousCron() throws Exception {
5755
MessageConsumer consumer = session.createConsumer(destination);
5856

5957
final CountDownLatch latch = new CountDownLatch(COUNT);
60-
consumer.setMessageListener(new MessageListener() {
61-
@Override
62-
public void onMessage(Message message) {
63-
count.incrementAndGet();
64-
latch.countDown();
65-
assertTrue(message instanceof TextMessage);
66-
TextMessage tm = (TextMessage) message;
67-
try {
68-
LOG.info("Received [{}] count: {} ", tm.getText(), count.get());
69-
} catch (JMSException e) {
70-
LOG.error("Unexpected exception in onMessage", e);
71-
fail("Unexpected exception in onMessage: " + e.getMessage());
72-
}
58+
consumer.setMessageListener(message -> {
59+
count.incrementAndGet();
60+
latch.countDown();
61+
assertTrue(message instanceof TextMessage);
62+
final TextMessage tm = (TextMessage) message;
63+
try {
64+
LOG.info("Received [{}] count: {} ", tm.getText(), count.get());
65+
} catch (JMSException e) {
66+
LOG.error("Unexpected exception in onMessage", e);
67+
fail("Unexpected exception in onMessage: " + e.getMessage());
7368
}
7469
});
7570

@@ -88,9 +83,11 @@ public void onMessage(Message message) {
8883
JobScheduler js = sb.getJobScheduler();
8984
List<Job> list = js.getAllJobs();
9085
assertEquals(COUNT, list.size());
91-
latch.await(2, TimeUnit.MINUTES);
92-
// All should messages should have been received by now
93-
assertEquals(COUNT, count.get());
86+
assertTrue("all scheduled messages should fire", latch.await(2, TimeUnit.MINUTES));
87+
// Cron "* * * * *" fires every minute, so count may exceed COUNT
88+
// if a second minute boundary is crossed during the wait
89+
assertTrue("at least " + COUNT + " messages received, got " + count.get(),
90+
count.get() >= COUNT);
9491

9592
connection.close();
9693
}

activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -803,11 +803,11 @@ private void testDurablePropagation(final int ttl, final boolean dynamicOnly, fi
803803

804804
protected void assertNCDurableSubsCount(final BrokerService brokerService, final ActiveMQTopic dest,
805805
final int count) throws Exception {
806+
final boolean result = Wait.waitFor(() -> count == getNCDurableSubs(brokerService, dest).size(),
807+
TimeUnit.SECONDS.toMillis(30), 500);
806808
assertTrue("Expected " + count + " NC durable sub(s) on " + brokerService.getBrokerName()
807809
+ " for " + dest.getTopicName() + ", but got "
808-
+ getNCDurableSubs(brokerService, dest).size(),
809-
Wait.waitFor(() -> count == getNCDurableSubs(brokerService, dest).size(),
810-
TimeUnit.SECONDS.toMillis(30), 500));
810+
+ getNCDurableSubs(brokerService, dest).size(), result);
811811
}
812812

813813
protected List<DurableTopicSubscription> getNCDurableSubs(final BrokerService brokerService,
@@ -824,7 +824,7 @@ protected List<DurableTopicSubscription> getNCDurableSubs(final BrokerService br
824824
for (final SubscriptionKey key : destination.getDurableTopicSubs().keySet()) {
825825
if (key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX)) {
826826
final DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key);
827-
if (sub != null && sub.isActive()) {
827+
if (sub != null) {
828828
subs.add(sub);
829829
}
830830
}

activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,20 @@ protected void testNonDurableReceiveThrougRestart(final String pubBroker, final
9797
networkConnector.start();
9898
waitForBridgeFormation();
9999

100+
// Wait for the network bridge to re-establish its demand subscription on the
101+
// publishing broker. waitForBridgeFormation() only verifies the bridge is connected,
102+
// but setupStaticDestinations() (which creates the durable demand subscription)
103+
// runs asynchronously after bridge connection. Without this wait, sendMessages()
104+
// can fire before the demand subscription is set up, causing the message to be
105+
// published with no subscriber to forward it to the consumer broker.
106+
// We check for an active durable subscription because the durable sub may already
107+
// exist (inactive) from the previous bridge; we need it to be reactivated.
108+
final Topic pubBrokerDest = (Topic) brokers.get(pubBroker).broker.getDestination(dest);
109+
assertTrue("Network durable subscription should be active on " + pubBroker,
110+
Wait.waitFor(() -> pubBrokerDest.getDurableTopicSubs().values().stream()
111+
.anyMatch(DurableTopicSubscription::isActive),
112+
10000, 100));
113+
100114
// Send messages
101115
sendMessages(pubBroker, dest, 1);
102116

activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public void testJMXCountersWithOfflineSubs() throws Exception {
9797
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
9898

9999
for (int i=0; i<sent/2; i++) {
100-
Message m = consumer.receive(4000);
100+
final Message m = consumer.receive(30000);
101101
assertNotNull("got message: " + i, m);
102102
LOG.info("Got :" + i + ", " + m);
103103
}
@@ -110,12 +110,8 @@ public void testJMXCountersWithOfflineSubs() throws Exception {
110110

111111
assertTrue("is active", durableSubscriptionView.isActive());
112112
assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView.getEnqueueCounter());
113-
assertTrue("correct waiting acks", Wait.waitFor(new Wait.Condition() {
114-
@Override
115-
public boolean isSatisified() throws Exception {
116-
return 5 == durableSubscriptionView.getMessageCountAwaitingAcknowledge();
117-
}
118-
}));
113+
assertTrue("correct waiting acks", Wait.waitFor(
114+
() -> 5 == durableSubscriptionView.getMessageCountAwaitingAcknowledge()));
119115
assertEquals("correct dequeue", 5, durableSubscriptionView.getDequeueCounter());
120116

121117

@@ -150,7 +146,7 @@ public boolean isSatisified() throws Exception {
150146
consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
151147

152148
for (int i=0; i<sent/2;i++) {
153-
Message m = consumer.receive(30000);
149+
final Message m = consumer.receive(30000);
154150
assertNotNull("got message: " + i, m);
155151
LOG.info("Got :" + i + ", " + m);
156152
}
@@ -162,13 +158,10 @@ public boolean isSatisified() throws Exception {
162158

163159
assertTrue("is active", durableSubscriptionView2.isActive());
164160
assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView2.getEnqueueCounter());
165-
assertTrue("correct dequeue", Wait.waitFor(new Wait.Condition() {
166-
@Override
167-
public boolean isSatisified() throws Exception {
168-
long val = durableSubscriptionView2.getDequeueCounter();
169-
LOG.info("dequeue count:" + val);
170-
return 10 == val;
171-
}
161+
assertTrue("correct dequeue", Wait.waitFor(() -> {
162+
final long val = durableSubscriptionView2.getDequeueCounter();
163+
LOG.info("dequeue count:" + val);
164+
return 10 == val;
172165
}));
173166
}
174167
}

0 commit comments

Comments
 (0)