1717package com .rabbitmq .client .test .functional ;
1818
1919import java .io .IOException ;
20+ import java .util .concurrent .ArrayBlockingQueue ;
21+ import java .util .concurrent .BlockingQueue ;
2022
2123import com .rabbitmq .client .Consumer ;
2224import com .rabbitmq .client .ConsumerCancelledException ;
@@ -28,81 +30,55 @@ public class ConsumerCancelNotificiation extends BrokerTestCase {
2830
2931 private final String queue = "cancel_notification_queue" ;
3032
31- private final Object lock = new Object ();
33+ public void testConsumerCancellationNotification () throws IOException ,
34+ InterruptedException {
35+ final BlockingQueue <Boolean > result = new ArrayBlockingQueue <Boolean >(1 );
3236
33- private boolean notified = false ;
34-
35- private boolean failed = false ;
36-
37- public void testConsumerCancellationNotification () throws IOException {
38- synchronized (lock ) {
39- notified = false ;
40- }
4137 channel .queueDeclare (queue , false , true , false , null );
4238 Consumer consumer = new QueueingConsumer (channel ) {
4339 @ Override
4440 public void handleCancel (String consumerTag ) throws IOException {
45- synchronized (lock ) {
46- notified = true ;
47- lock .notifyAll ();
41+ try {
42+ result .put (true );
43+ } catch (InterruptedException e ) {
44+ fail ();
4845 }
4946 }
5047 };
5148 channel .basicConsume (queue , consumer );
5249 channel .queueDelete (queue );
53- synchronized (lock ) {
54- if (!notified ) {
55- try {
56- lock .wait ();
57- } catch (InterruptedException e ) {
58- }
59- }
60- assertTrue (notified );
61- }
50+ assertTrue (result .take ());
6251 }
6352
6453 public void testConsumerCancellationInterruptsQueuingConsumerWait ()
6554 throws IOException , InterruptedException {
66- synchronized (lock ) {
67- notified = false ;
68- failed = false ;
69- }
55+ final BlockingQueue <Boolean > result = new ArrayBlockingQueue <Boolean >(1 );
7056 channel .queueDeclare (queue , false , true , false , null );
7157 final QueueingConsumer consumer = new QueueingConsumer (channel );
7258 Runnable receiver = new Runnable () {
7359
7460 @ Override
7561 public void run () {
7662 try {
77- consumer .nextDelivery ();
78- } catch (ConsumerCancelledException e ) {
79- synchronized (lock ) {
80- notified = true ;
81- lock .notifyAll ();
82- return ; // avoid fall through to failure
63+ try {
64+ consumer .nextDelivery ();
65+ } catch (ConsumerCancelledException e ) {
66+ result .put (true );
67+ return ;
68+ } catch (ShutdownSignalException e ) {
69+ } catch (InterruptedException e ) {
8370 }
84- } catch ( ShutdownSignalException e ) {
71+ result . put ( false );
8572 } catch (InterruptedException e ) {
86- }
87- synchronized (lock ) {
88- failed = true ;
89- lock .notifyAll ();
73+ fail ();
9074 }
9175 }
9276 };
9377 Thread t = new Thread (receiver );
9478 t .start ();
9579 channel .basicConsume (queue , consumer );
9680 channel .queueDelete (queue );
97- synchronized (lock ) {
98- if (!(notified || failed )) {
99- try {
100- lock .wait ();
101- } catch (InterruptedException e ) {
102- }
103- }
104- assertTrue (notified );
105- }
81+ assertTrue (result .take ());
10682 t .join ();
10783 }
10884}
0 commit comments