Skip to content

Commit f1cbbe4

Browse files
author
Alexandru Scvortov
committed
use waitForConfirms in ConfirmDontLoseMessages
1 parent ba4d2b2 commit f1cbbe4

File tree

1 file changed

+1
-31
lines changed

1 file changed

+1
-31
lines changed

test/src/com/rabbitmq/examples/ConfirmDontLoseMessages.java

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,6 @@ public static void main(String[] args)
5050
}
5151

5252
static class Publisher implements Runnable {
53-
private volatile SortedSet<Long> unconfirmedSet =
54-
Collections.synchronizedSortedSet(new TreeSet<Long>());
55-
5653
public void run() {
5754
try {
5855
long startTime = System.currentTimeMillis();
@@ -61,43 +58,16 @@ public void run() {
6158
Connection conn = connectionFactory.newConnection();
6259
Channel ch = conn.createChannel();
6360
ch.queueDeclare(QUEUE_NAME, true, false, false, null);
64-
ch.addConfirmListener(new ConfirmListener() {
65-
public void handleAck(long seqNo, boolean multiple) {
66-
if (multiple) {
67-
unconfirmedSet.headSet(seqNo+1).clear();
68-
} else {
69-
unconfirmedSet.remove(seqNo);
70-
}
71-
}
72-
73-
public void handleNack(long seqNo, boolean multiple) {
74-
int lost = 0;
75-
if (multiple) {
76-
SortedSet<Long> nackd =
77-
unconfirmedSet.headSet(seqNo+1);
78-
lost = nackd.size();
79-
nackd.clear();
80-
} else {
81-
lost = 1;
82-
unconfirmedSet.remove(seqNo);
83-
}
84-
System.out.printf("Probably lost %d messages.\n",
85-
lost);
86-
}
87-
});
8861
ch.confirmSelect();
8962

9063
// Publish
9164
for (long i = 0; i < msgCount; ++i) {
92-
unconfirmedSet.add(ch.getNextPublishSeqNo());
9365
ch.basicPublish("", QUEUE_NAME,
9466
MessageProperties.PERSISTENT_BASIC,
9567
"nop".getBytes());
9668
}
9769

98-
// Wait
99-
while (unconfirmedSet.size() > 0)
100-
Thread.sleep(10);
70+
ch.waitForConfirmsOrDie();
10171

10272
// Cleanup
10373
ch.queueDelete(QUEUE_NAME);

0 commit comments

Comments
 (0)