5656import java .util .concurrent .Executors ;
5757import java .util .concurrent .Future ;
5858import java .util .concurrent .TimeUnit ;
59+ import java .util .concurrent .TimeoutException ;
5960import java .util .function .Function ;
6061
6162import static org .assertj .core .api .Assertions .assertThat ;
@@ -122,7 +123,7 @@ public void testQueueSendReceiveOnTracedThread() throws Exception {
122123 @ Test
123124 public void testQueueSendReceiveNoWaitOnTracedThread () throws Exception {
124125 final Queue queue = brokerFacade .createQueue (TEST_Q_NAME );
125- testQueueSendReceiveOnTracedThread (() -> brokerFacade .receiveNoWait (queue ), queue );
126+ testQueueSendReceiveOnTracedThread (() -> loopReceive (() -> brokerFacade .receiveNoWait (queue ), 3000 ), queue );
126127 }
127128
128129 @ Test
@@ -134,7 +135,23 @@ public void testQueueSendReceiveOnNonTracedThread() throws Exception {
134135 @ Test
135136 public void testQueueSendReceiveNoWaitOnNonTracedThread () throws Exception {
136137 final Queue queue = brokerFacade .createQueue (TEST_Q_NAME );
137- testQueueSendReceiveOnNonTracedThread (() -> brokerFacade .receiveNoWait (queue ), queue );
138+ testQueueSendReceiveOnNonTracedThread (() -> loopReceive (() -> brokerFacade .receiveNoWait (queue ), 3000 ), queue );
139+ }
140+
141+ // A utility method for testing the receiveNoWait API consistently
142+ private Message loopReceive (Callable <Message > receiveMethod , @ SuppressWarnings ("SameParameterValue" ) long timeout ) throws Exception {
143+ long start = System .currentTimeMillis ();
144+ long curr = start ;
145+ while (curr - start < timeout ) {
146+ Message ret = receiveMethod .call ();
147+ if (ret != null ) {
148+ return ret ;
149+ } else {
150+ Thread .sleep (100 );
151+ curr = System .currentTimeMillis ();
152+ }
153+ }
154+ throw new TimeoutException ();
138155 }
139156
140157 private void testQueueSendReceiveOnTracedThread (Callable <Message > receiveMethod , Queue queue ) throws Exception {
0 commit comments