Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
2171aef
[maven-release-plugin] prepare for next development iteration
jbonofre Mar 7, 2025
af2a38c
AMQ-9685 - Virtual topic name should have at least one character to a…
graben Mar 13, 2025
a7de033
AMQ-9685 - Improve test
cshannon Mar 18, 2025
c49960d
fix imports
cshannon Mar 18, 2025
58006d4
AMQ-9689 - Network of Broker durable sync TTL fixes and improvements …
cshannon Apr 11, 2025
5477545
AMQ-9698 - Fix message expiration on durable subs (#1423)
cshannon May 7, 2025
ea2b818
AMQ-9698 - Add recovery listener to store recoverExpired() method
cshannon May 8, 2025
780008c
AMQ-9716: Fix `maxMessageSize=-1` to correctly disable message size l…
therepanic May 28, 2025
d0cee10
AMQ-9697: Removed inline JS and CSS from the Web Console. Added CSP h…
sergio-d-lemos May 30, 2025
36af213
AMQ-7517: Remove lib in the default classpath (#1442)
jbonofre May 31, 2025
cf19b85
AMQ-9721 - Fix performance issues during non-persistent cursor remova…
cshannon Jun 4, 2025
2296dc6
NO-JIRA: Fix flaky DurableSubscriptionHangTestCase
cshannon Jun 5, 2025
d1b7e8a
AMQ-9726 - Fix FilePendingMessageCursor clear() method (#1452)
cshannon Jun 5, 2025
3d2e2a9
AMQ-9747 - Handle IOExceptionHandler thrown exceptions in KahaDB (#1474)
cshannon Jul 23, 2025
4cbfcbd
AMQ-9700: Upgrade to commons-io 2.19.0 (#1433)
jbonofre May 25, 2025
ca7e911
AMQ-9759: Updating Commons Lang to 3.18.0
coheigea Aug 25, 2025
2ef99b7
[AMQ-9773] Fix for only one message being recovered from backup
mattrpav Sep 24, 2025
aeeaa07
[AMQ-9773-b] Update code comments for backup only recoverying one mes…
mattrpav Sep 26, 2025
1bf66d8
AMQ-9762: Upgrade to Jackson 2.20.0
jbonofre Oct 3, 2025
b5775d1
AMQ-9717: Upgrade to commons-beanutils 1.11.0
jbonofre Oct 3, 2025
73d8bd7
AMQ-9763: Upgrade to commons-io 2.20.0
jbonofre Oct 4, 2025
41fa2d7
AMQ-9701: Upgrade to XBean 4.27
jbonofre Oct 4, 2025
ade46fd
AMQ-9705: Upgrade to ASM 9.8
jbonofre Oct 4, 2025
f1c719d
AMQ-9767: Upgrade to jmdns 3.6.2
jbonofre Oct 4, 2025
a2246e3
Upgrade to Jetty 9.4.58.v20250814
jbonofre Oct 4, 2025
9f4171c
Prepare ActiveMQ 5.19.1 release
jbonofre Oct 4, 2025
6e85639
[AMQ-9780] Add a null guard during checkpointUpdate when kahadb is lo…
mattrpav Oct 7, 2025
88b8428
AMQ-9781: Upgrade to ASM 9.9 (#1507)
jbonofre Oct 8, 2025
242ff9b
[maven-release-plugin] prepare release activemq-5.19.1
jbonofre Oct 8, 2025
54b4fca
Merge tag 'activemq-5.19.1' into activemq-5.19.x-TT.x-patch-forward
jgallimore Nov 13, 2025
9c7dac7
Bump project versions to 5.19.2-TT.1-SNAPSHOT
jgallimore Nov 13, 2025
d63ac83
Set SCM <tag> to HEAD for snapshot development
jgallimore Nov 13, 2025
b6f7d31
Set commons-collections version to 3.2.2-TT.1 in root pom
jgallimore Nov 13, 2025
e5b5395
Fix AI's mistakes
jgallimore Nov 13, 2025
7fe91a8
Add 5.19.2-TT.1 release notes: merged upstream 5.19.1 changes
jgallimore Nov 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion activemq-all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.19.1-TT.5-SNAPSHOT</version>
<version>5.19.2-TT.1-SNAPSHOT</version>
</parent>

<artifactId>activemq-all</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion activemq-amqp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.19.1-TT.5-SNAPSHOT</version>
<version>5.19.2-TT.1-SNAPSHOT</version>
</parent>

<artifactId>activemq-amqp</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion activemq-blueprint/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.19.1-TT.5-SNAPSHOT</version>
<version>5.19.2-TT.1-SNAPSHOT</version>
</parent>

<artifactId>activemq-blueprint</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion activemq-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.19.1-TT.5-SNAPSHOT</version>
<version>5.19.2-TT.1-SNAPSHOT</version>
</parent>

<artifactId>activemq-broker</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,16 @@ public void removePending(MessageReference node) throws IOException {
pending.remove(node);
}

@Override
protected void processExpiredAck(ConnectionContext context, Destination dest,
MessageReference node) {

// Each subscription needs to expire both on the store and
// decrement the reference count
super.processExpiredAck(context, dest, node);
node.decrementReferenceCount();
}

@Override
protected void doAddRecoveredMessage(MessageReference message) throws Exception {
synchronized (pending) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,8 @@ public final void acknowledge(final ConnectionContext context,final MessageAck a
inAckRange = true;
}
if (inAckRange) {
Destination regionDestination = nodeDest;
if (broker.isExpired(node)) {
regionDestination.messageExpired(context, this, node);
processExpiredAck(context, nodeDest, node);
}
iter.remove();
decrementPrefetchCounter(node);
Expand Down Expand Up @@ -396,6 +395,11 @@ public final void acknowledge(final ConnectionContext context,final MessageAck a
}
}

protected void processExpiredAck(final ConnectionContext context, final Destination dest,
final MessageReference node) {
dest.messageExpired(context, this, node);
}

private void registerRemoveSync(ConnectionContext context, final MessageReference node) {
// setup a Synchronization to remove nodes from the
// dispatched list.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package org.apache.activemq.broker.region;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -29,6 +31,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import java.util.stream.Collectors;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
Expand All @@ -51,8 +54,8 @@
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.management.MessageFlowStats;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore.StoreType;
import org.apache.activemq.store.NoLocalSubscriptionAware;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
Expand Down Expand Up @@ -702,6 +705,11 @@ public boolean isDuplicate(MessageId id) {
for (DurableTopicSubscription sub : durableSubscribers.values()) {
if (!sub.isActive() || sub.isEnableMessageExpirationOnActiveDurableSubs()) {
message.setRegionDestination(this);
// AMQ-9721 - Remove message from the cursor if it exists after
// loading from the store. Store recoverExpired() does not inc
// the ref count so we don't need to decrement here, but if
// the cursor finds its own copy in memory it will dec that ref.
sub.removePending(message);
messageExpired(connectionContext, sub, message);
}
}
Expand Down Expand Up @@ -825,15 +833,99 @@ protected void dispatch(final ConnectionContext context, Message message) throws
}
}

private final AtomicBoolean expiryTaskInProgress = new AtomicBoolean(false);
private final Runnable expireMessagesWork = new Runnable() {

/**
* Simple recovery listener that will check if the topic memory usage is full
* when hasSpace() is called. This could be enhanced in the future if needed.
*/
private final MessageRecoveryListener expiryListener = new MessageRecoveryListener() {

@Override
public void run() {
List<Message> browsedMessages = new InsertionCountList<Message>();
doBrowse(browsedMessages, getMaxExpirePageSize());
public boolean recoverMessage(Message message) {
return true;
}

@Override
public boolean recoverMessageReference(MessageId ref) {
return true;
}

@Override
public boolean hasSpace() {
return !Topic.this.memoryUsage.isFull();
}

@Override
public boolean isDuplicate(MessageId ref) {
return false;
}
};

private final AtomicBoolean expiryTaskInProgress = new AtomicBoolean(false);
private final Runnable expireMessagesWork = () -> {
try {
final TopicMessageStore store = Topic.this.topicStore;
if (store != null && store.getType() == StoreType.KAHADB) {
if (store.getMessageCount() == 0) {
LOG.debug("Skipping topic expiration check for {}, store size is 0", destination);
return;
}

// get the sub keys that should be checked for expired messages
final var subs = durableSubscribers.entrySet().stream()
.filter(entry -> isEligibleForExpiration(entry.getValue()))
.map(Entry::getKey).collect(Collectors.toSet());

if (subs.isEmpty()) {
LOG.debug("Skipping topic expiration check for {}, no eligible subscriptions to check", destination);
return;
}

// For each eligible subscription, return the messages in the store that are expired
// The same message refs are shared between subs if duplicated so this is efficient
var expired = store.recoverExpired(subs, getMaxExpirePageSize(),
expiryListener);

final ConnectionContext connectionContext = createConnectionContext();
// Go through any expired messages and remove for each sub
for (Entry<SubscriptionKey, List<Message>> entry : expired.entrySet()) {
DurableTopicSubscription sub = durableSubscribers.get(entry.getKey());
List<Message> expiredMessages = entry.getValue();

// If the sub still exists and there are expired messages then process
if (sub != null && !expiredMessages.isEmpty()) {
// There's a small race condition here if the sub comes online,
// but it's not a big deal as at worst there maybe be duplicate acks for
// the expired message but the store can handle it
if (isEligibleForExpiration(sub)) {
expiredMessages.forEach(message -> {
message.setRegionDestination(Topic.this);
try {
// AMQ-9721 - Remove message from the cursor if it exists after
// loading from the store. Store recoverExpired() does not inc
// the ref count so we don't need to decrement here, but if
// the cursor finds its own copy in memory it will dec that ref.
sub.removePending(message);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
messageExpired(connectionContext, sub, message);
});
}
}
}
} else {
// If not KahaDB, fall back to the legacy browse method because
// the recoverExpired() method is not supported
doBrowse(new InsertionCountList<>(), getMaxExpirePageSize());
}
} catch (Throwable e) {
LOG.warn("Failed to expire messages on Topic: {}", getActiveMQDestination().getPhysicalName(), e);
} finally {
expiryTaskInProgress.set(false);
}
};

private final Runnable expireMessagesTask = new Runnable() {
@Override
public void run() {
Expand All @@ -855,9 +947,6 @@ public void messageExpired(ConnectionContext context, Subscription subs, Message
ack.setDestination(destination);
ack.setMessageID(reference.getMessageId());
try {
if (subs instanceof DurableTopicSubscription) {
((DurableTopicSubscription)subs).removePending(reference);
}
acknowledge(context, subs, ack, reference);
} catch (Exception e) {
LOG.error("Failed to remove expired Message from the store ", e);
Expand Down Expand Up @@ -927,6 +1016,10 @@ private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscr
}
}

private static boolean isEligibleForExpiration(DurableTopicSubscription sub) {
return sub.isEnableMessageExpirationOnActiveDurableSubs() || !sub.isActive();
}

public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() {
return durableSubscribers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ public void add(MessageReference node) throws Exception {
messagesToEvict = oldMessages.length;
for (int i = 0; i < messagesToEvict; i++) {
MessageReference oldMessage = oldMessages[i];
// AMQ-9721 - discard no longer removes from matched so remove here
oldMessage.decrementReferenceCount();
matched.remove(oldMessage);
//Expired here is false as we are discarding due to the messageEvictingStrategy
discard(oldMessage, false);
}
Expand Down Expand Up @@ -751,8 +754,6 @@ public void onFailure() {
private void discard(MessageReference message, boolean expired) {
discarding = true;
try {
message.decrementReferenceCount();
matched.remove(message);
if (destination != null) {
destination.getDestinationStatistics().getDequeues().increment();
if(destination.isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public synchronized boolean recoverMessage(Message message, boolean cached) thro
}
}
message.incrementReferenceCount();
batchList.addMessageLast(message);
batchList.addMessageLast(createBatchListRef(message));
clearIterator(true);
recovered = true;
} else if (!cached) {
Expand All @@ -136,6 +136,10 @@ public synchronized boolean recoverMessage(Message message, boolean cached) thro
return recovered;
}

protected MessageReference createBatchListRef(Message message) {
return message;
}

protected boolean duplicateFromStoreExcepted(Message message) {
// expected for messages pending acks with kahadb.concurrentStoreAndDispatchQueues=true for
// which this existing unused flag has been repurposed
Expand Down Expand Up @@ -448,13 +452,15 @@ public final synchronized void remove() {

@Override
public final synchronized void remove(MessageReference node) {
if (batchList.remove(node) != null) {
final PendingNode message = batchList.remove(node);
if (message != null) {
size--;
setCacheEnabled(false);
// decrement reference count if removed from batchList
message.getMessage().decrementReferenceCount();
}
}


@Override
public final synchronized void clear() {
gc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ public synchronized void release() {
@Override
public synchronized void destroy() throws Exception {
stop();
for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
MessageReference node = i.next();
for (MessageReference node : memoryList) {
node.decrementReferenceCount();
}
memoryList.clear();
Expand Down Expand Up @@ -365,11 +364,19 @@ public synchronized long messageSize() {
*/
@Override
public synchronized void clear() {
// AMQ-9726 - Iterate over all nodes to decrement the ref count
// to decrement the memory usage tracker
for (MessageReference node : memoryList) {
node.decrementReferenceCount();
}
memoryList.clear();
if (!isDiskListEmpty()) {
try {
getDiskList().destroy();
} catch (IOException e) {
// AMQ-9726 - This method will destroy the list and
// set the reference to null so it will be reset
// for future writes
destroyDiskList();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.NullMessageReference;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
import org.apache.activemq.usage.SystemUsage;
Expand Down Expand Up @@ -274,8 +275,27 @@ public synchronized void remove() {

@Override
public synchronized void remove(MessageReference node) {
for (PendingMessageCursor tsp : storePrefetches) {
tsp.remove(node);
// AMQ-9721 - Check if message is persistent or non-persistent.
// Removing from the non-persistent cursor requires searching the
// entire list if it's paged onto disk which is quite slow,
// so it doesn't make sense to try and remove as it will never
// exist if it's persistent.

// MessageReference can be a null reference if called from DurableSubscriptionView
// so we do not know if it's persistent and just need to search everything.
if (node instanceof NullMessageReference) {
for (PendingMessageCursor tsp : storePrefetches) {
tsp.remove(node);
}
} else if (node.isPersistent()) {
for (PendingMessageCursor tsp : storePrefetches) {
if (tsp.equals(nonPersistent)) {
continue;
}
tsp.remove(node);
}
} else {
nonPersistent.remove(node);
}
}

Expand Down
Loading