@@ -92,7 +92,7 @@ private void publishAndGet(int count, boolean doAck)
9292
9393 /**
9494 * Test we don't requeue acknowledged messages (using get)
95- * @throws Exception test
95+ * @throws Exception untested
9696 */
9797 public void testNormal () throws Exception
9898 {
@@ -101,7 +101,7 @@ public void testNormal() throws Exception
101101
102102 /**
103103 * Test we requeue unacknowledged messages (using get)
104- * @throws Exception test
104+ * @throws Exception untested
105105 */
106106 public void testRequeueing () throws Exception
107107 {
@@ -110,7 +110,7 @@ public void testRequeueing() throws Exception
110110
111111 /**
112112 * Test we requeue unacknowledged message (using consumer)
113- * @throws Exception test
113+ * @throws Exception untested
114114 */
115115 public void testRequeueingConsumer () throws Exception
116116 {
@@ -155,7 +155,7 @@ private void publishLotsAndGet()
155155
156156 /**
157157 * Test close while consuming many messages successfully requeues unacknowledged messages
158- * @throws Exception test
158+ * @throws Exception untested
159159 */
160160 public void testRequeueInFlight () throws Exception
161161 {
@@ -166,29 +166,51 @@ public void testRequeueInFlight() throws Exception
166166
167167 /**
168168 * Test close while consuming partially not acked with cancel successfully requeues unacknowledged messages
169- * @throws Exception test
169+ * @throws Exception untested
170170 */
171171 public void testRequeueInFlightConsumerNoAck () throws Exception
172172 {
173173 for (int i = 0 ; i < 5 ; i ++) {
174- publishLotsAndConsumeSome (false );
174+ publishLotsAndConsumeSome (false , true );
175175 }
176176 }
177177
178178 /**
179179 * Test close while consuming partially acked with cancel successfully requeues unacknowledged messages
180- * @throws Exception test
180+ * @throws Exception untested
181181 */
182182 public void testRequeueInFlightConsumerAck () throws Exception
183183 {
184184 for (int i = 0 ; i < 5 ; i ++) {
185- publishLotsAndConsumeSome (true );
185+ publishLotsAndConsumeSome (true , true );
186+ }
187+ }
188+
189+ /**
190+ * Test close while consuming partially not acked without cancel successfully requeues unacknowledged messages
191+ * @throws Exception untested
192+ */
193+ public void testRequeueInFlightConsumerNoAckNoCancel () throws Exception
194+ {
195+ for (int i = 0 ; i < 5 ; i ++) {
196+ publishLotsAndConsumeSome (false , false );
197+ }
198+ }
199+
200+ /**
201+ * Test close while consuming partially acked without cancel successfully requeues unacknowledged messages
202+ * @throws Exception untested
203+ */
204+ public void testRequeueInFlightConsumerAckNoCancel () throws Exception
205+ {
206+ for (int i = 0 ; i < 5 ; i ++) {
207+ publishLotsAndConsumeSome (true , false );
186208 }
187209 }
188210
189211 private static final int MESSAGES_TO_CONSUME = 20 ;
190212
191- private void publishLotsAndConsumeSome (boolean ack )
213+ private void publishLotsAndConsumeSome (boolean ack , boolean cancelBeforeFinish )
192214 throws IOException , InterruptedException , ShutdownSignalException
193215 {
194216 openConnection ();
@@ -201,7 +223,7 @@ private void publishLotsAndConsumeSome(boolean ack)
201223 }
202224
203225 CountDownLatch latch = new CountDownLatch (1 );
204- PartialConsumer c = new PartialConsumer (channel , MESSAGES_TO_CONSUME , ack , latch );
226+ PartialConsumer c = new PartialConsumer (channel , MESSAGES_TO_CONSUME , ack , latch , cancelBeforeFinish );
205227 channel .basicConsume (Q , c );
206228 latch .await (); // wait for consumer
207229
@@ -228,13 +250,15 @@ private class PartialConsumer extends DefaultConsumer {
228250 private Channel channel ;
229251 private CountDownLatch latch ;
230252 private volatile boolean acknowledge ;
253+ private final boolean cancelBeforeFinish ;
231254
232- public PartialConsumer (Channel channel , int count , boolean acknowledge , CountDownLatch latch ) {
255+ public PartialConsumer (Channel channel , int count , boolean acknowledge , CountDownLatch latch , boolean cancelBeforeFinish ) {
233256 super (channel );
234257 this .count = count ;
235258 this .channel = channel ;
236259 this .latch = latch ;
237260 this .acknowledge = acknowledge ;
261+ this .cancelBeforeFinish = cancelBeforeFinish ;
238262 }
239263
240264 @ Override
@@ -247,7 +271,8 @@ public void handleDelivery(String consumerTag,
247271 if (this .acknowledge )
248272 this .channel .basicAck (envelope .getDeliveryTag (), false );
249273 if (--this .count == 0 ) {
250- this .channel .basicCancel (this .getConsumerTag ());
274+ if (this .cancelBeforeFinish )
275+ this .channel .basicCancel (this .getConsumerTag ());
251276 this .acknowledge = false ; // don't acknowledge any more
252277 this .latch .countDown ();
253278 }
0 commit comments