Skip to content

Commit 3b4e7cb

Browse files
authored
Revert "Facky tests revealed mainly with faster CI (aka Github Actions but also with Java 25) (#1610)" (#1617)
This reverts commit 94c3c3d. Note: Reverting due to accidental inclusion of [AMQ-9829] ActiveMQMessageConsumer.java
1 parent 94c3c3d commit 3b4e7cb

File tree

31 files changed

+157
-587
lines changed

31 files changed

+157
-587
lines changed

activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java

Lines changed: 1 addition & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@ public PreviouslyDeliveredMap(TransactionId transactionId) {
113113
class PreviouslyDelivered {
114114
org.apache.activemq.command.Message message;
115115
boolean redelivered;
116-
boolean prefetchedOnly;
117116

118117
PreviouslyDelivered(MessageDispatch messageDispatch) {
119118
message = messageDispatch.getMessage();
@@ -123,12 +122,6 @@ class PreviouslyDelivered {
123122
message = messageDispatch.getMessage();
124123
this.redelivered = redelivered;
125124
}
126-
127-
PreviouslyDelivered(MessageDispatch messageDispatch, boolean redelivered, boolean prefetchedOnly) {
128-
message = messageDispatch.getMessage();
129-
this.redelivered = redelivered;
130-
this.prefetchedOnly = prefetchedOnly;
131-
}
132125
}
133126

134127
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class);
@@ -778,9 +771,6 @@ void clearMessagesInProgress() {
778771
// ensure unconsumed are rolledback up front as they may get redelivered to another consumer
779772
List<MessageDispatch> list = unconsumedMessages.removeAll();
780773
if (!this.info.isBrowser()) {
781-
if (session.isTransacted()) {
782-
capturePrefetchedMessagesForDuplicateSuppression(list);
783-
}
784774
for (MessageDispatch old : list) {
785775
session.connection.rollbackDuplicate(this, old.getMessage());
786776
}
@@ -943,16 +933,6 @@ private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
943933
if (!isAutoAcknowledgeBatch()) {
944934
synchronized(deliveredMessages) {
945935
deliveredMessages.addFirst(md);
946-
if (session.isTransacted()) {
947-
PreviouslyDelivered entry = null;
948-
if (previouslyDeliveredMessages != null) {
949-
entry = previouslyDeliveredMessages.get(md.getMessage().getMessageId());
950-
}
951-
if (entry != null && entry.prefetchedOnly) {
952-
entry.prefetchedOnly = false;
953-
entry.redelivered = true;
954-
}
955-
}
956936
}
957937
if (session.getTransacted()) {
958938
if (transactedIndividualAck) {
@@ -1440,8 +1420,7 @@ public void dispatch(MessageDispatch md) {
14401420
synchronized (unconsumedMessages.getMutex()) {
14411421
if (!unconsumedMessages.isClosed()) {
14421422
// deliverySequenceId non zero means previously queued dispatch
1443-
if (this.info.isBrowser() || md.getDeliverySequenceId() != 0l || isPrefetchedRedelivery(md)
1444-
|| !session.connection.isDuplicate(this, md.getMessage())) {
1423+
if (this.info.isBrowser() || md.getDeliverySequenceId() != 0l || !session.connection.isDuplicate(this, md.getMessage())) {
14451424
if (listener != null && unconsumedMessages.isRunning()) {
14461425
if (redeliveryExceeded(md)) {
14471426
poisonAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
@@ -1591,37 +1570,6 @@ private void captureDeliveredMessagesForDuplicateSuppressionWithRequireRedeliver
15911570
LOG.trace("{} tracking existing transacted {} delivered list({})", getConsumerId(), previouslyDeliveredMessages.transactionId, deliveredMessages.size());
15921571
}
15931572

1594-
private void capturePrefetchedMessagesForDuplicateSuppression(final List<MessageDispatch> list) {
1595-
if (list.isEmpty()) {
1596-
return;
1597-
}
1598-
if (previouslyDeliveredMessages == null) {
1599-
previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, PreviouslyDelivered>(session.getTransactionContext().getTransactionId());
1600-
}
1601-
for (MessageDispatch pending : list) {
1602-
if (pending.getMessage() != null) {
1603-
previouslyDeliveredMessages.put(pending.getMessage().getMessageId(), new PreviouslyDelivered(pending, false, true));
1604-
}
1605-
}
1606-
LOG.trace("{} tracking existing transacted {} prefetched list({})", getConsumerId(), previouslyDeliveredMessages.transactionId, list.size());
1607-
}
1608-
1609-
private boolean isPrefetchedRedelivery(final MessageDispatch md) {
1610-
if (!session.isTransacted()) {
1611-
return false;
1612-
}
1613-
if (md.getMessage() == null) {
1614-
return false;
1615-
}
1616-
synchronized (deliveredMessages) {
1617-
if (previouslyDeliveredMessages != null) {
1618-
PreviouslyDelivered entry = previouslyDeliveredMessages.get(md.getMessage().getMessageId());
1619-
return entry != null && entry.prefetchedOnly;
1620-
}
1621-
}
1622-
return false;
1623-
}
1624-
16251573
public int getMessageSize() {
16261574
return unconsumedMessages.size();
16271575
}

activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionSecurityExceptionTest.java

Lines changed: 8 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public class PooledConnectionSecurityExceptionTest {
6969
@Test
7070
public void testFailedConnectThenSucceeds() throws JMSException {
7171
try (final Connection connection1 = pooledConnFact.createConnection("invalid", "credentials")) {
72-
assertSecurityExceptionOnStart(connection1);
72+
assertThrows(JMSSecurityException.class, connection1::start);
7373

7474
try (final Connection connection2 = pooledConnFact.createConnection("system", "manager")) {
7575
connection2.start();
@@ -93,7 +93,7 @@ public void onException(JMSException exception) {
9393
onExceptionCalled.countDown();
9494
}
9595
});
96-
assertSecurityExceptionOnStart(connection1);
96+
assertThrows(JMSSecurityException.class, connection1::start);
9797

9898
try (final Connection connection2 = pooledConnFact.createConnection("system", "manager")) {
9999
connection2.start();
@@ -118,7 +118,7 @@ public void testFailureGetsNewConnectionOnRetry() throws Exception {
118118
pooledConnFact.setMaxConnections(1);
119119

120120
try (final Connection connection1 = pooledConnFact.createConnection("invalid", "credentials")) {
121-
assertSecurityExceptionOnStart(connection1);
121+
assertThrows(JMSSecurityException.class, connection1::start);
122122

123123
// The pool should process the async error
124124
// we should eventually get a different connection instance from the pool regardless of the underlying connection
@@ -145,9 +145,9 @@ public void testFailureGetsNewConnectionOnRetryBigPool() throws JMSException {
145145
pooledConnFact.setMaxConnections(10);
146146

147147
try (final Connection connection1 = pooledConnFact.createConnection("invalid", "credentials")) {
148-
assertSecurityExceptionOnStart(connection1);
148+
assertThrows(JMSSecurityException.class, connection1::start);
149149
try (final Connection connection2 = pooledConnFact.createConnection("invalid", "credentials")) {
150-
assertSecurityExceptionOnStart(connection2);
150+
assertThrows(JMSSecurityException.class, connection2::start);
151151
assertNotSame(connection1, connection2);
152152
}
153153
}
@@ -165,7 +165,7 @@ public void testFailoverWithInvalidCredentialsCanConnect() throws JMSException {
165165
pooledConnFact.setMaxConnections(1);
166166

167167
try (final Connection connection = pooledConnFact.createConnection("invalid", "credentials")) {
168-
assertSecurityExceptionOnStart(connection);
168+
assertThrows(JMSSecurityException.class, connection::start);
169169

170170
try (final Connection connection2 = pooledConnFact.createConnection("system", "manager")) {
171171
connection2.start();
@@ -185,7 +185,7 @@ public void testFailoverWithInvalidCredentials() throws Exception {
185185
pooledConnFact.setMaxConnections(1);
186186

187187
try (final PooledConnection connection1 = (PooledConnection) pooledConnFact.createConnection("invalid", "credentials")) {
188-
assertSecurityExceptionOnStart(connection1);
188+
assertThrows(JMSSecurityException.class, connection1::start);
189189

190190
// The pool should process the async error
191191
assertTrue("Should get new connection", Wait.waitFor(new Wait.Condition() {
@@ -202,7 +202,7 @@ public boolean isSatisified() throws Exception {
202202

203203
try (final PooledConnection connection2 = (PooledConnection) pooledConnFact.createConnection("invalid", "credentials")) {
204204
assertNotSame(connection1.pool, connection2.pool);
205-
assertSecurityExceptionOnStart(connection2);
205+
assertThrows(JMSSecurityException.class, connection2::start);
206206
}
207207
}
208208
}
@@ -230,55 +230,6 @@ public String getName() {
230230
return name.getMethodName();
231231
}
232232

233-
/**
234-
* Helper method to assert that a connection start fails with security exception.
235-
* On different test environments, the connection may be disposed asynchronously
236-
* before the security exception is fully propagated, resulting in either JMSSecurityException
237-
* or generic JMSException with "Disposed" message. Both indicate authentication failure.
238-
*
239-
* This method uses an ExceptionListener to detect when async disposal completes, providing
240-
* more reliable detection of security failures across different Java versions and environments.
241-
*
242-
* @param connection the connection to start
243-
* @throws AssertionError if no exception is thrown or the exception doesn't indicate auth failure
244-
*/
245-
private void assertSecurityExceptionOnStart(final Connection connection) {
246-
try {
247-
final ExceptionListener listener = connection.getExceptionListener();
248-
if (listener == null) { // some tests already leverage the exception listener
249-
final CountDownLatch exceptionLatch = new CountDownLatch(1);
250-
251-
// Install listener to capture async exception propagation
252-
connection.setExceptionListener(new ExceptionListener() {
253-
@Override
254-
public void onException(final JMSException exception) {
255-
LOG.info("Connection received exception: {}", exception.getMessage());
256-
assertTrue(exception instanceof JMSSecurityException);
257-
exceptionLatch.countDown();
258-
}
259-
});
260-
connection.start(); // should trigger the security exception reliably and asynchronously
261-
exceptionLatch.await(1, java.util.concurrent.TimeUnit.SECONDS);
262-
263-
} else {
264-
265-
// Attempt to start and capture the synchronous exception.
266-
final JMSException thrownException = assertThrows(JMSException.class, connection::start);
267-
assertTrue("Should be JMSSecurityException or disposed due to security exception",
268-
thrownException instanceof JMSSecurityException ||
269-
thrownException.getMessage().contains("Disposed"));
270-
}
271-
272-
273-
} catch (final JMSException e) {
274-
// Ignore
275-
276-
} catch (final InterruptedException e) {
277-
throw new RuntimeException(e);
278-
}
279-
280-
}
281-
282233
@Before
283234
public void setUp() throws Exception {
284235
LOG.info("========== start " + getName() + " ==========");

activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public void testGcDoneAtStop() throws Exception {
5555
LOG.info("kahadb store: " + kahaDBPersistenceAdapter);
5656
int numKahadbFiles = kahaDBPersistenceAdapter.getStore().getJournal().getFileMap().size();
5757

58-
LOG.info("Num files, job store: {}, message store: {}", numSchedulerFiles, numKahadbFiles);
58+
LOG.info("Num files, job store: {}, message store: {}", numKahadbFiles, numKahadbFiles);
5959

6060
// pull the dirs before we stop
6161
File jobDir = jobSchedulerStore.getJournal().getDirectory();
@@ -94,10 +94,8 @@ public void testNoGcAtStop() throws Exception {
9494

9595
brokerService.stop();
9696

97-
final int jobFilesOnDisk = verifyFilesOnDisk(jobDir);
98-
final int kahaFilesOnDisk = verifyFilesOnDisk(kahaDir);
99-
assertTrue("Expected job store data files at least " + numSchedulerFiles, jobFilesOnDisk >= numSchedulerFiles);
100-
assertTrue("Expected kahadb data files at least " + numKahadbFiles, kahaFilesOnDisk >= numKahadbFiles);
97+
assertEquals("Expected job store data files", numSchedulerFiles, verifyFilesOnDisk(jobDir));
98+
assertEquals("Expected kahadb data files", numKahadbFiles, verifyFilesOnDisk(kahaDir));
10199
}
102100

103101
private int verifyFilesOnDisk(File directory) {

activemq-mqtt/pom.xml

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -198,23 +198,12 @@
198198
</plugins>
199199
</pluginManagement>
200200
<plugins>
201-
<plugin>
202-
<groupId>org.apache.maven.plugins</groupId>
203-
<artifactId>maven-dependency-plugin</artifactId>
204-
<executions>
205-
<execution>
206-
<goals>
207-
<goal>properties</goal>
208-
</goals>
209-
</execution>
210-
</executions>
211-
</plugin>
212201
<plugin>
213202
<artifactId>maven-surefire-plugin</artifactId>
214203
<configuration>
215204
<forkCount>1</forkCount>
216205
<reuseForks>false</reuseForks>
217-
<argLine>-javaagent:${org.mockito:mockito-core:jar}</argLine>
206+
<argLine>${surefire.argLine}</argLine>
218207
<runOrder>alphabetical</runOrder>
219208
<systemPropertyValues>
220209
<org.apache.activemq.default.directory.prefix>target</org.apache.activemq.default.directory.prefix>

activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public void run() {
129129
executorService.awaitTermination(10, TimeUnit.SECONDS);
130130

131131
ArgumentCaptor<RemoveInfo> removeInfo = ArgumentCaptor.forClass(RemoveInfo.class);
132-
Mockito.verify(transport, times(1)).sendToActiveMQ(removeInfo.capture());
132+
Mockito.verify(transport, times(4)).sendToActiveMQ(removeInfo.capture());
133133

134134
}
135135
}

activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public void testXAResourceReconnect() throws Exception {
136136
try {
137137
final TransportConnector transportConnector = brokerService.getTransportConnectors().get(0);
138138

139-
String failoverUrl = String.format("failover:(%s)?maxReconnectAttempts=10&initialReconnectDelay=100", transportConnector.getConnectUri());
139+
String failoverUrl = String.format("failover:(%s)?maxReconnectAttempts=1", transportConnector.getConnectUri());
140140

141141
ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
142142
ra.start(null);
@@ -165,22 +165,6 @@ public boolean isSatisified() throws Exception {
165165

166166
transportConnector.start();
167167

168-
// Wait for failover to reconnect and recover() to succeed
169-
// The ReconnectingXAResource should handle reconnection transparently
170-
final XAResource resource = resources[0];
171-
assertTrue("connection re-established and can recover", Wait.waitFor(new Wait.Condition() {
172-
@Override
173-
public boolean isSatisified() throws Exception {
174-
try {
175-
resource.recover(100);
176-
return true;
177-
} catch (Exception e) {
178-
// Still reconnecting
179-
return false;
180-
}
181-
}
182-
}, 30000, 500));
183-
184168
// should recover ok
185169
assertEquals("no pending transactions", 0, resources[0].recover(100).length);
186170

activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -565,26 +565,13 @@ public void testAckMessageWithNoId() throws Exception {
565565
received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL;
566566
stompConnection.sendFrame(ack);
567567

568-
// Unsubscribe immediately after invalid ACK to prevent message redelivery
569-
// while waiting for ERROR frame. This avoids race condition where message
570-
// could be redelivered before ERROR is received.
568+
StompFrame error = stompConnection.receive();
569+
LOG.info("Received Frame: {}", error);
570+
assertTrue("Expected ERROR but got: " + error.getAction(), error.getAction().equals("ERROR"));
571+
571572
String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
572573
"id:12345\n\n" + Stomp.NULL;
573574
stompConnection.sendFrame(unsub);
574-
575-
// Receive frames until we get the ERROR frame, ignoring any MESSAGE frames
576-
// that arrive due to redelivery (especially relevant for SSL transport)
577-
StompFrame error = null;
578-
for (int i = 0; i < 5; i++) {
579-
error = stompConnection.receive();
580-
LOG.info("Received Frame: {}", error);
581-
if (error.getAction().equals("ERROR")) {
582-
break;
583-
}
584-
// If we get a MESSAGE, it's a redelivery - keep trying for ERROR
585-
}
586-
assertNotNull("Did not receive any frame", error);
587-
assertTrue("Expected ERROR but got: " + error.getAction(), error.getAction().equals("ERROR"));
588575
}
589576

590577
@Test(timeout = 60000)

activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.apache.activemq.transport.stomp;
1818

1919
import static org.junit.Assert.assertEquals;
20-
import static org.junit.Assert.assertNotNull;
2120
import static org.junit.Assert.assertTrue;
2221

2322
import java.io.IOException;
@@ -159,32 +158,9 @@ public void testClientAckWithoutAckId() throws Exception {
159158
String frame = "ACK\n" + "message-id:" + ackId + "\n\n" + Stomp.NULL;
160159
stompConnection.sendFrame(frame);
161160

162-
// Unsubscribe immediately to prevent message redelivery while waiting for ERROR
163-
String unsubscribe = "UNSUBSCRIBE\n" + "id:1\n\n" + Stomp.NULL;
164-
stompConnection.sendFrame(unsubscribe);
165-
166-
// Receive frames until we get the ERROR frame, ignoring any MESSAGE frames
167-
// that arrive due to redelivery (especially relevant for SSL transport)
168-
StompFrame error = null;
169-
for (int i = 0; i < 5; i++) {
170-
error = stompConnection.receive();
171-
LOG.info("Broker sent: " + error);
172-
if (error.getAction().equals("ERROR")) {
173-
break;
174-
}
175-
// If we get a MESSAGE, it's a redelivery - keep trying for ERROR
176-
}
177-
assertNotNull("Did not receive any frame", error);
178-
assertTrue("Expected ERROR but got: " + error.getAction(), error.getAction().equals("ERROR"));
179-
180-
// Re-subscribe to receive the message again and test correct ACK
181-
stompConnection.sendFrame(subscribe);
182-
receipt = stompConnection.receive();
183-
assertTrue(receipt.getAction().startsWith("RECEIPT"));
184-
185161
received = stompConnection.receive();
186-
assertTrue(received.getAction().equals("MESSAGE"));
187-
ackId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID);
162+
assertTrue(received.getAction().equals("ERROR"));
163+
LOG.info("Broker sent: " + received);
188164

189165
// Now place it in the correct location and check it still works.
190166
frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2\n\n" + Stomp.NULL;

0 commit comments

Comments
 (0)