@@ -329,4 +329,127 @@ public void testShuffleAcksBeforeRollback()
329329 basicAck (tags [2 ], false );
330330 txCommit ();
331331 }
332+
333+ private abstract class NackMethod {
334+ abstract public void nack (long tag , boolean requeue )
335+ throws IOException ;
336+
337+ public void nack (boolean requeue )
338+ throws IOException
339+ {
340+ nack (latestTag , requeue );
341+ }
342+
343+ public void nack ()
344+ throws IOException
345+ {
346+ nack (latestTag , true );
347+ }
348+ }
349+
350+ private NackMethod basicNack = new NackMethod () {
351+ public void nack (long tag , boolean requeue )
352+ throws IOException
353+ {
354+ channel .basicNack (tag , false , requeue );
355+ }
356+ };
357+
358+ private NackMethod basicReject = new NackMethod () {
359+ public void nack (long tag , boolean requeue )
360+ throws IOException
361+ {
362+ channel .basicReject (tag , requeue );
363+ }
364+ };
365+
366+ /*
367+ messages with nacks get requeued after the transaction commit.
368+ messages with nacks with requeue = false are not requeued.
369+ */
370+ public void commitNacks (NackMethod method )
371+ throws IOException
372+ {
373+ basicPublish ();
374+ basicPublish ();
375+ txSelect ();
376+ basicGet ();
377+ method .nack ();
378+ basicGet ();
379+ method .nack (false );
380+ assertNull (basicGet ());
381+ txCommit ();
382+ assertNotNull (basicGet ());
383+ assertNull (basicGet ());
384+ }
385+
386+ public void rollbackNacks (NackMethod method )
387+ throws IOException
388+ {
389+ basicPublish ();
390+ txSelect ();
391+ basicGet ();
392+ method .nack (true );
393+ txRollback ();
394+ assertNull (basicGet ());
395+ }
396+
397+ public void commitAcksAndNacks (NackMethod method )
398+ throws IOException
399+ {
400+ for (int i = 0 ; i < 3 ; i ++) {
401+ basicPublish ();
402+ }
403+ txSelect ();
404+ long tags [] = new long [3 ];
405+ for (int i = 0 ; i < 3 ; i ++) {
406+ tags [i ] = basicGet ().getEnvelope ().getDeliveryTag ();
407+ }
408+ basicAck (tags [1 ], false );
409+ basicAck (tags [0 ], false );
410+ method .nack (tags [2 ], false );
411+ txRollback ();
412+ basicAck (tags [2 ], false );
413+ method .nack (tags [0 ], true );
414+ method .nack (tags [1 ], false );
415+ txCommit ();
416+ assertNotNull (basicGet ());
417+ assertNull (basicGet ());
418+ }
419+
420+ public void testCommitNacks ()
421+ throws IOException
422+ {
423+ commitNacks (basicNack );
424+ }
425+
426+ public void testRollbackNacks ()
427+ throws IOException
428+ {
429+ rollbackNacks (basicNack );
430+ }
431+
432+ public void testCommitAcksAndNacks ()
433+ throws IOException
434+ {
435+ commitAcksAndNacks (basicNack );
436+ }
437+
438+ public void testCommitRejects ()
439+ throws IOException
440+ {
441+ commitNacks (basicReject );
442+ }
443+
444+ public void testRollbackRejects ()
445+ throws IOException
446+ {
447+ rollbackNacks (basicReject );
448+ }
449+
450+ public void testCommitAcksAndRejects ()
451+ throws IOException
452+ {
453+ commitAcksAndNacks (basicReject );
454+ }
332455}
0 commit comments