1515
1616package com .rabbitmq .client .test .functional ;
1717
18- import com .rabbitmq .client .*;
1918import com .rabbitmq .client .test .BrokerTestCase ;
2019import org .junit .Test ;
20+ import static org .junit .Assert .assertEquals ;
21+ import static org .junit .Assert .assertTrue ;
22+ import static org .junit .Assert .fail ;
2123
2224import java .io .IOException ;
2325import java .util .Arrays ;
2729import java .util .concurrent .LinkedBlockingQueue ;
2830import java .util .concurrent .TimeUnit ;
2931
30- import static org .junit .Assert .assertEquals ;
31- import static org .junit .Assert .fail ;
32+ import java .util .concurrent .CountDownLatch ;
33+
34+ import com .rabbitmq .client .DefaultConsumer ;
35+ import com .rabbitmq .client .Envelope ;
36+
37+ import com .rabbitmq .client .AMQP ;
38+ import com .rabbitmq .client .Channel ;
39+ import com .rabbitmq .client .MessageProperties ;
3240
3341public class ConsumerPriorities extends BrokerTestCase {
42+
3443 @ Test public void validation () throws IOException {
3544 assertFailValidation (args ("banana" ));
3645 assertFailValidation (args (new HashMap <Object , Object >()));
@@ -50,6 +59,8 @@ private void assertFailValidation(Map<String, Object> args) throws IOException {
5059 }
5160
5261 private static final int COUNT = 10 ;
62+ private static final long DELIVERY_TIMEOUT_MS = 100 ;
63+ private static final long CANCEL_OK_TIMEOUT_MS = 10 * 1000 ;
5364
5465 @ Test public void consumerPriorities () throws Exception {
5566 String queue = channel .queueDeclare ().getQueue ();
@@ -61,13 +72,20 @@ private void assertFailValidation(Map<String, Object> args) throws IOException {
6172 channel .basicConsume (queue , true , args (-1 ), lowConsumer );
6273
6374 publish (queue , COUNT , "high" );
75+ assertContents (highConsumer , COUNT , "high" );
6476 channel .basicCancel (high );
77+ assertTrue (
78+ "High priority consumer should have been cancelled" ,
79+ highConsumer .cancelLatch .await (CANCEL_OK_TIMEOUT_MS , TimeUnit .MILLISECONDS )
80+ );
6581 publish (queue , COUNT , "med" );
82+ assertContents (medConsumer , COUNT , "med" );
6683 channel .basicCancel (med );
84+ assertTrue (
85+ "Medium priority consumer should have been cancelled" ,
86+ medConsumer .cancelLatch .await (CANCEL_OK_TIMEOUT_MS , TimeUnit .MILLISECONDS )
87+ );
6788 publish (queue , COUNT , "low" );
68-
69- assertContents (highConsumer , COUNT , "high" );
70- assertContents (medConsumer , COUNT , "med" );
7189 assertContents (lowConsumer , COUNT , "low" );
7290 }
7391
@@ -77,12 +95,12 @@ private Map<String, Object> args(Object o) {
7795 return map ;
7896 }
7997
80- private void assertContents (QueueMessageConsumer c , int count , String msg ) throws InterruptedException {
98+ private void assertContents (QueueMessageConsumer qc , int count , String msg ) throws InterruptedException {
8199 for (int i = 0 ; i < count ; i ++) {
82- byte [] body = c .nextDelivery (100 );
100+ byte [] body = qc .nextDelivery (DELIVERY_TIMEOUT_MS );
83101 assertEquals (msg , new String (body ));
84102 }
85- assertEquals (null , c .nextDelivery ());
103+ assertEquals (null , qc .nextDelivery (DELIVERY_TIMEOUT_MS ));
86104 }
87105
88106 private void publish (String queue , int count , String msg ) throws IOException {
@@ -91,10 +109,12 @@ private void publish(String queue, int count, String msg) throws IOException {
91109 }
92110 }
93111
94- class QueueMessageConsumer extends DefaultConsumer {
112+ private static class QueueMessageConsumer extends DefaultConsumer {
95113
96114 BlockingQueue <byte []> messages = new LinkedBlockingQueue <byte []>();
97115
116+ CountDownLatch cancelLatch = new CountDownLatch (1 );
117+
98118 public QueueMessageConsumer (Channel channel ) {
99119 super (channel );
100120 }
@@ -104,14 +124,14 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
104124 messages .add (body );
105125 }
106126
107- byte [] nextDelivery () {
108- return messages .poll ();
127+ @ Override
128+ public void handleCancelOk (String consumerTag ) {
129+ cancelLatch .countDown ();
109130 }
110131
111132 byte [] nextDelivery (long timeoutInMs ) throws InterruptedException {
112133 return messages .poll (timeoutInMs , TimeUnit .MILLISECONDS );
113134 }
114135
115136 }
116-
117137}
0 commit comments