Skip to content

Commit 600a8c5

Browse files
committed
ARTEMIS-5376 Include all messages in queue management operations
1 parent 11717d8 commit 600a8c5

File tree

4 files changed

+264
-105
lines changed

4 files changed

+264
-105
lines changed

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

Lines changed: 97 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
4444
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
4545
import java.util.concurrent.locks.ReentrantLock;
46+
import java.util.function.Predicate;
4647

4748
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
4849
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -2047,6 +2048,11 @@ private int iterQueue(final int flushLimit,
20472048
QueueIterateAction messageAction) throws Exception {
20482049
int count = 0;
20492050
int txCount = 0;
2051+
2052+
if (filter1 != null) {
2053+
messageAction.addFilter(filter1);
2054+
}
2055+
20502056
// This is to avoid scheduling depaging while iterQueue is happening
20512057
// this should minimize the use of the paged executor.
20522058
depagePending = true;
@@ -2065,7 +2071,7 @@ private int iterQueue(final int flushLimit,
20652071
while (iter.hasNext() && !messageAction.expectedHitsReached(count)) {
20662072
MessageReference ref = iter.next();
20672073

2068-
if (filter1 == null || filter1.match(ref.getMessage())) {
2074+
if (messageAction.match(ref)) {
20692075
if (messageAction.actMessage(tx, ref)) {
20702076
iter.remove();
20712077
refRemoved(ref);
@@ -2087,7 +2093,7 @@ private int iterQueue(final int flushLimit,
20872093
return count;
20882094
}
20892095

2090-
List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(ref -> filter1 == null ? true : filter1.match(ref.getMessage()));
2096+
List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(messageAction::match);
20912097
for (MessageReference messageReference : cancelled) {
20922098
messageAction.actMessage(tx, messageReference);
20932099
count++;
@@ -2110,12 +2116,12 @@ private int iterQueue(final int flushLimit,
21102116
PagedReference reference = pageIterator.next();
21112117
pageIterator.remove();
21122118

2113-
if (filter1 == null || filter1.match(reference.getMessage())) {
2114-
count++;
2115-
txCount++;
2119+
if (messageAction.match(reference)) {
21162120
if (!messageAction.actMessage(tx, reference)) {
21172121
addTail(reference, false);
21182122
}
2123+
txCount++;
2124+
count++;
21192125
} else {
21202126
addTail(reference, false);
21212127
}
@@ -2433,71 +2439,54 @@ public void run() {
24332439
}
24342440

24352441
@Override
2436-
public synchronized boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception {
2437-
try (LinkedListIterator<MessageReference> iter = iterator()) {
2438-
while (iter.hasNext()) {
2439-
MessageReference ref = iter.next();
2440-
if (ref.getMessage().getMessageID() == messageID) {
2441-
incDelivering(ref);
2442-
sendToDeadLetterAddress(null, ref);
2443-
iter.remove();
2444-
refRemoved(ref);
2445-
return true;
2446-
}
2447-
}
2448-
if (pageIterator != null && !queueDestroyed) {
2449-
while (pageIterator.hasNext()) {
2450-
PagedReference ref = pageIterator.next();
2451-
if (ref.getMessage().getMessageID() == messageID) {
2452-
incDelivering(ref);
2453-
sendToDeadLetterAddress(null, ref);
2454-
pageIterator.remove();
2455-
refRemoved(ref);
2456-
return true;
2457-
}
2458-
}
2442+
public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception {
2443+
2444+
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
2445+
2446+
@Override
2447+
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
2448+
incDelivering(ref);
2449+
sendToDeadLetterAddress(tx, ref);
2450+
return true;
24592451
}
2460-
return false;
2461-
}
2452+
2453+
}) == 1;
2454+
24622455
}
24632456

24642457
@Override
2465-
public synchronized int sendMessagesToDeadLetterAddress(Filter filter) throws Exception {
2458+
public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception {
24662459

24672460
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
24682461

24692462
@Override
24702463
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
2471-
24722464
incDelivering(ref);
2473-
return sendToDeadLetterAddress(tx, ref);
2465+
sendToDeadLetterAddress(tx, ref);
2466+
return true;
24742467
}
2468+
24752469
});
2470+
24762471
}
24772472

24782473
@Override
2479-
public synchronized boolean moveReference(final long messageID,
2480-
final SimpleString toAddress,
2481-
final Binding binding,
2482-
final boolean rejectDuplicate) throws Exception {
2483-
try (LinkedListIterator<MessageReference> iter = iterator()) {
2484-
while (iter.hasNext()) {
2485-
MessageReference ref = iter.next();
2486-
if (ref.getMessage().getMessageID() == messageID) {
2487-
iter.remove();
2488-
refRemoved(ref);
2489-
incDelivering(ref);
2490-
try {
2491-
move(null, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL, null, null, true);
2492-
} catch (Exception e) {
2493-
decDelivering(ref);
2494-
throw e;
2495-
}
2496-
return true;
2497-
}
2474+
public boolean moveReference(final long messageID,
2475+
final SimpleString toAddress,
2476+
final Binding binding,
2477+
final boolean rejectDuplicate) throws Exception {
2478+
2479+
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
2480+
2481+
@Override
2482+
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
2483+
incDelivering(ref);
2484+
move(tx, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL, null, null, true);
2485+
return true;
24982486
}
2499-
return false;
2500-
}
2487+
2488+
}) == 1;
2489+
25012490
}
25022491

25032492
@Override
@@ -2543,7 +2532,7 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
25432532
}
25442533

25452534
if (!ignored) {
2546-
move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL, null, null, true);
2535+
move(tx, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL, null, null, true);
25472536
}
25482537

25492538
return true;
@@ -2561,26 +2550,23 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
25612550
}
25622551

25632552
@Override
2564-
public synchronized boolean copyReference(final long messageID,
2565-
final SimpleString toQueue,
2566-
final Binding binding) throws Exception {
2567-
try (LinkedListIterator<MessageReference> iter = iterator()) {
2568-
while (iter.hasNext()) {
2569-
MessageReference ref = iter.next();
2570-
if (ref.getMessage().getMessageID() == messageID) {
2571-
try {
2572-
copy(null, toQueue, binding, ref);
2573-
} catch (Exception e) {
2574-
throw e;
2575-
}
2576-
return true;
2577-
}
2553+
public boolean copyReference(final long messageID,
2554+
final SimpleString toQueue,
2555+
final Binding binding) throws Exception {
2556+
2557+
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
2558+
2559+
@Override
2560+
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
2561+
copy(tx, toQueue, binding, ref);
2562+
return false;
25782563
}
2579-
return false;
2580-
}
2564+
2565+
}) == 1;
2566+
25812567
}
25822568

2583-
public synchronized int rerouteMessages(final SimpleString queueName, final Filter filter) throws Exception {
2569+
public int rerouteMessages(final SimpleString queueName, final Filter filter) throws Exception {
25842570
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
25852571
@Override
25862572
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
@@ -2649,40 +2635,33 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
26492635
}
26502636

26512637
@Override
2652-
public synchronized boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception {
2653-
try (LinkedListIterator<MessageReference> iter = iterator()) {
2638+
public boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception {
26542639

2655-
while (iter.hasNext()) {
2656-
MessageReference ref = iter.next();
2657-
if (ref.getMessage().getMessageID() == messageID) {
2658-
iter.remove();
2659-
refRemoved(ref);
2660-
ref.getMessage().setPriority(newPriority);
2661-
addTail(ref, false);
2662-
return true;
2663-
}
2640+
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
2641+
2642+
@Override
2643+
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
2644+
ref.getMessage().setPriority(newPriority);
2645+
return false;
26642646
}
26652647

2666-
return false;
2667-
}
2648+
}) == 1;
2649+
26682650
}
26692651

26702652
@Override
2671-
public synchronized int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception {
2672-
try (LinkedListIterator<MessageReference> iter = iterator()) {
2673-
int count = 0;
2674-
while (iter.hasNext()) {
2675-
MessageReference ref = iter.next();
2676-
if (filter == null || filter.match(ref.getMessage())) {
2677-
count++;
2678-
iter.remove();
2679-
refRemoved(ref);
2680-
ref.getMessage().setPriority(newPriority);
2681-
addTail(ref, false);
2682-
}
2653+
public int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception {
2654+
2655+
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
2656+
2657+
@Override
2658+
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
2659+
ref.getMessage().setPriority(newPriority);
2660+
return false;
26832661
}
2684-
return count;
2685-
}
2662+
2663+
});
2664+
26862665
}
26872666

26882667
@Override
@@ -4222,13 +4201,23 @@ public void run() {
42224201
abstract class QueueIterateAction {
42234202

42244203
protected Integer expectedHits;
4204+
protected Long messageID;
4205+
protected Filter filter1 = null;
4206+
protected Predicate<MessageReference> match;
42254207

42264208
QueueIterateAction(Integer expectedHits) {
42274209
this.expectedHits = expectedHits;
4210+
this.match = ref -> filter1 == null ? true : filter1.match(ref.getMessage());
4211+
}
4212+
4213+
QueueIterateAction(Long messageID) {
4214+
this.expectedHits = 1;
4215+
this.match = ref -> ref.getMessage().getMessageID() == messageID;
42284216
}
42294217

42304218
QueueIterateAction() {
42314219
this.expectedHits = null;
4220+
this.match = ref -> filter1 == null ? true : filter1.match(ref.getMessage());
42324221
}
42334222

42344223
/**
@@ -4243,6 +4232,15 @@ abstract class QueueIterateAction {
42434232
public boolean expectedHitsReached(int currentHits) {
42444233
return expectedHits != null && currentHits >= expectedHits.intValue();
42454234
}
4235+
4236+
public void addFilter(Filter filter1) {
4237+
this.filter1 = filter1;
4238+
}
4239+
4240+
public boolean match(MessageReference ref) {
4241+
return match.test(ref);
4242+
}
4243+
42464244
}
42474245

42484246
// For external use we need to use a synchronized version since the list is not thread safe

tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithPagingServerTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.apache.activemq.artemis.tests.integration.management;
1818

1919
import static org.junit.jupiter.api.Assertions.assertEquals;
20-
import static org.junit.jupiter.api.Assertions.assertFalse;
2120
import static org.junit.jupiter.api.Assertions.assertNotNull;
2221
import static org.junit.jupiter.api.Assertions.assertNull;
2322
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -230,15 +229,16 @@ public void testCopyMessageWhilstPaging() throws Exception {
230229

231230
long messageID = (Long) messages[99].get("messageID");
232231

233-
assertFalse(queueControl.copyMessage(messageID, otherQueue.toString()));
232+
assertTrue(queueControl.copyMessage(messageID, otherQueue.toString()));
234233

235234
messageID = (Long) messages[0].get("messageID");
236235

237236
assertTrue(queueControl.copyMessage(messageID, otherQueue.toString()));
238237

239238
Map<String, Object>[] copiedMessages = otherQueueControl.listMessages(null);
240239

241-
assertEquals(1, copiedMessages.length);
240+
//this validates copying of a paged message
241+
assertEquals(2, copiedMessages.length);
242242
}
243243

244244
@Test
@@ -281,8 +281,8 @@ public void testCopyMessageWhilstPagingSameAddress() throws Exception {
281281

282282
messageID = (Long) otherMessages[100].get("messageID");
283283

284-
//this should fail as the message was paged successfully
285-
assertFalse(otherQueueControl.copyMessage(messageID, queue.toString()));
284+
//this validates copying of a paged message
285+
assertTrue(otherQueueControl.copyMessage(messageID, queue.toString()));
286286
}
287287

288288
@Test

0 commit comments

Comments
 (0)