Skip to content

Commit 0e968fb

Browse files
author
Steve Powell
committed
Merge bug24409 into default
2 parents 42356a8 + 39f59a3 commit 0e968fb

File tree

1 file changed

+1
-35
lines changed

1 file changed

+1
-35
lines changed

test/src/com/rabbitmq/examples/ConfirmDontLoseMessages.java

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,8 @@
1818
package com.rabbitmq.examples;
1919

2020
import java.io.IOException;
21-
import java.util.Collections;
22-
import java.util.SortedSet;
23-
import java.util.TreeSet;
2421

2522
import com.rabbitmq.client.Channel;
26-
import com.rabbitmq.client.ConfirmListener;
2723
import com.rabbitmq.client.Connection;
2824
import com.rabbitmq.client.ConnectionFactory;
2925
import com.rabbitmq.client.MessageProperties;
@@ -50,9 +46,6 @@ public static void main(String[] args)
5046
}
5147

5248
static class Publisher implements Runnable {
53-
private volatile SortedSet<Long> unconfirmedSet =
54-
Collections.synchronizedSortedSet(new TreeSet<Long>());
55-
5649
public void run() {
5750
try {
5851
long startTime = System.currentTimeMillis();
@@ -61,43 +54,16 @@ public void run() {
6154
Connection conn = connectionFactory.newConnection();
6255
Channel ch = conn.createChannel();
6356
ch.queueDeclare(QUEUE_NAME, true, false, false, null);
64-
ch.addConfirmListener(new ConfirmListener() {
65-
public void handleAck(long seqNo, boolean multiple) {
66-
if (multiple) {
67-
unconfirmedSet.headSet(seqNo+1).clear();
68-
} else {
69-
unconfirmedSet.remove(seqNo);
70-
}
71-
}
72-
73-
public void handleNack(long seqNo, boolean multiple) {
74-
int lost = 0;
75-
if (multiple) {
76-
SortedSet<Long> nackd =
77-
unconfirmedSet.headSet(seqNo+1);
78-
lost = nackd.size();
79-
nackd.clear();
80-
} else {
81-
lost = 1;
82-
unconfirmedSet.remove(seqNo);
83-
}
84-
System.out.printf("Probably lost %d messages.\n",
85-
lost);
86-
}
87-
});
8857
ch.confirmSelect();
8958

9059
// Publish
9160
for (long i = 0; i < msgCount; ++i) {
92-
unconfirmedSet.add(ch.getNextPublishSeqNo());
9361
ch.basicPublish("", QUEUE_NAME,
9462
MessageProperties.PERSISTENT_BASIC,
9563
"nop".getBytes());
9664
}
9765

98-
// Wait
99-
while (unconfirmedSet.size() > 0)
100-
Thread.sleep(10);
66+
ch.waitForConfirmsOrDie();
10167

10268
// Cleanup
10369
ch.queueDelete(QUEUE_NAME);

0 commit comments

Comments
 (0)