@@ -105,42 +105,46 @@ public void testDeadLetterQueueTTLPromptExpiry() throws Exception {
105105 QueueingConsumer c = new QueueingConsumer (channel );
106106 String cTag = channel .basicConsume (TEST_QUEUE_NAME , true , c );
107107 long start = System .currentTimeMillis ();
108- channel . basicPublish ( "amq.direct" , "test" , null , "test" . getBytes () );
108+ publish ( null , "test" );
109109 Delivery d = c .nextDelivery (TTL );
110110 long stop = System .currentTimeMillis ();
111111 assertNotNull (d );
112112 channel .basicCancel (cTag );
113113 long latency = stop -start ;
114114
115+ channel .basicConsume (DLQ , true , c );
116+
115117 // publish messages at regular intervals until currentTime +
116118 // 3/4th of TTL
117119 int count = 0 ;
118120 start = System .currentTimeMillis ();
119121 stop = start + TTL * 3 / 4 ;
120122 long now = start ;
121123 while (now < stop ) {
122- channel .basicPublish ("amq.direct" , "test" , null ,
123- Long .toString (now ).getBytes ());
124+ publish (null , Long .toString (now ));
124125 count ++;
125126 Thread .sleep (TTL / 100 );
126127 now = System .currentTimeMillis ();
127128 }
128129
129- // check that each message arrives within epsilon of the
130- // publication time + TTL + latency
131- long epsilon = TTL / 100 ;
132- channel .basicConsume (DLQ , true , c );
133- while (count -- > 0 ) {
134- d = c .nextDelivery (TTL + latency + epsilon );
135- assertNotNull (d );
136- now = System .currentTimeMillis ();
137- long publishTime = Long .valueOf (new String (d .getBody ()));
138- long targetTime = publishTime + TTL + latency ;
139- assertTrue ("expiry outside bounds (+/- " + epsilon + "): " +
140- (now - targetTime ),
141- (now > targetTime - epsilon ) &&
142- (now < targetTime + epsilon ));
143- }
130+ checkPromptArrival (c , count , latency );
131+
132+ start = System .currentTimeMillis ();
133+ // publish message - which kicks off the queue's ttl timer -
134+ // and immediately fetch it in noack mode
135+ publishAt (start );
136+ basicGet (TEST_QUEUE_NAME );
137+ // publish a 2nd message and immediately fetch it in ack mode
138+ publishAt (start + TTL * 1 / 2 );
139+ GetResponse r = channel .basicGet (TEST_QUEUE_NAME , false );
140+ // publish a 3rd message
141+ publishAt (start + TTL * 3 / 4 );
142+ // reject 2nd message after the initial timer has fired but
143+ // before the message is due to expire
144+ waitUntil (start + TTL * 5 / 4 );
145+ channel .basicReject (r .getEnvelope ().getDeliveryTag (), true );
146+
147+ checkPromptArrival (c , 2 , latency );
144148 }
145149
146150 public void testDeadLetterDeletedDLX () throws Exception {
@@ -374,6 +378,24 @@ private void sleep(long millis) {
374378 }
375379 }
376380
381+ /* check that each message arrives within epsilon of the
382+ publication time + TTL + latency */
383+ private void checkPromptArrival (QueueingConsumer c ,
384+ int count , long latency ) throws Exception {
385+ long epsilon = TTL / 100 ;
386+ for (int i = 0 ; i < count ; i ++) {
387+ Delivery d = c .nextDelivery (TTL + TTL + latency + epsilon );
388+ assertNotNull ("message #" + i + " did not expire" , d );
389+ long now = System .currentTimeMillis ();
390+ long publishTime = Long .valueOf (new String (d .getBody ()));
391+ long targetTime = publishTime + TTL + latency ;
392+ assertTrue ("expiry outside bounds (+/- " + epsilon + "): " +
393+ (now - targetTime ),
394+ (now > targetTime - epsilon ) &&
395+ (now < targetTime + epsilon ));
396+ }
397+ }
398+
377399 private void declareQueue (Object deadLetterExchange ) throws IOException {
378400 declareQueue (TEST_QUEUE_NAME , deadLetterExchange , null , null );
379401 }
@@ -411,10 +433,23 @@ private void publishN(int n) throws IOException {
411433 private void publishN (int n , AMQP .BasicProperties props )
412434 throws IOException
413435 {
414- for (int x = 0 ; x < n ; x ++) {
415- channel .basicPublish ("amq.direct" , "test" , props ,
416- "test message" .getBytes ());
417- }
436+ for (int x = 0 ; x < n ; x ++) { publish (props , "test message" ); }
437+ }
438+
439+ private void publish (AMQP .BasicProperties props , String body )
440+ throws IOException
441+ {
442+ channel .basicPublish ("amq.direct" , "test" , props , body .getBytes ());
443+ }
444+
445+ private void publishAt (long when ) throws Exception {
446+ waitUntil (when );
447+ publish (null , Long .toString (System .currentTimeMillis ()));
448+ }
449+
450+ private void waitUntil (long when ) throws Exception {
451+ long delay = when - System .currentTimeMillis ();
452+ Thread .sleep (delay > 0 ? delay : 0 );
418453 }
419454
420455 private void consumeN (String queue , int n , WithResponse withResponse )
0 commit comments