22
33import com .rabbitmq .client .AMQP ;
44import com .rabbitmq .client .GetResponse ;
5+ import com .rabbitmq .client .QueueingConsumer ;
6+ import com .rabbitmq .client .QueueingConsumer .Delivery ;
57import com .rabbitmq .client .test .BrokerTestCase ;
68
79import java .io .IOException ;
@@ -22,6 +24,7 @@ public class DeadLetterExchange extends BrokerTestCase {
2224 private static final String DLQ2 = "queue.dlq2" ;
2325 private static final int MSG_COUNT = 10 ;
2426 private static final int MSG_COUNT_MANY = 1000 ;
27+ private static final int TTL = 1000 ;
2528
2629 @ Override
2730 protected void createResources () throws IOException {
@@ -84,13 +87,66 @@ public void testDeclareQueueWithRoutingKeyButNoDeadLetterExchange()
8487 }
8588
8689 public void testDeadLetterQueueTTLExpiredMessages () throws Exception {
87- ttlTest (1000 );
90+ ttlTest (TTL );
8891 }
8992
9093 public void testDeadLetterQueueZeroTTLExpiredMessages () throws Exception {
9194 ttlTest (0 );
9295 }
9396
97+ public void testDeadLetterQueueTTLPromptExpiry () throws Exception {
98+ Map <String , Object > args = new HashMap <String , Object >();
99+ args .put ("x-message-ttl" , TTL );
100+ declareQueue (TEST_QUEUE_NAME , DLX , null , args );
101+ channel .queueBind (TEST_QUEUE_NAME , "amq.direct" , "test" );
102+ channel .queueBind (DLQ , DLX , "test" );
103+
104+ //measure round-trip latency
105+ QueueingConsumer c = new QueueingConsumer (channel );
106+ String cTag = channel .basicConsume (TEST_QUEUE_NAME , true , c );
107+ long start = System .currentTimeMillis ();
108+ publish (null , "test" );
109+ Delivery d = c .nextDelivery (TTL );
110+ long stop = System .currentTimeMillis ();
111+ assertNotNull (d );
112+ channel .basicCancel (cTag );
113+ long latency = stop -start ;
114+
115+ channel .basicConsume (DLQ , true , c );
116+
117+ // publish messages at regular intervals until currentTime +
118+ // 3/4th of TTL
119+ int count = 0 ;
120+ start = System .currentTimeMillis ();
121+ stop = start + TTL * 3 / 4 ;
122+ long now = start ;
123+ while (now < stop ) {
124+ publish (null , Long .toString (now ));
125+ count ++;
126+ Thread .sleep (TTL / 100 );
127+ now = System .currentTimeMillis ();
128+ }
129+
130+ checkPromptArrival (c , count , latency );
131+
132+ start = System .currentTimeMillis ();
133+ // publish message - which kicks off the queue's ttl timer -
134+ // and immediately fetch it in noack mode
135+ publishAt (start );
136+ basicGet (TEST_QUEUE_NAME );
137+ // publish a 2nd message and immediately fetch it in ack mode
138+ publishAt (start + TTL * 1 / 2 );
139+ GetResponse r = channel .basicGet (TEST_QUEUE_NAME , false );
140+ // publish a 3rd message
141+ publishAt (start + TTL * 3 / 4 );
142+ // reject 2nd message after the initial timer has fired but
143+ // before the message is due to expire
144+ waitUntil (start + TTL * 5 / 4 );
145+ channel .basicReject (r .getEnvelope ().getDeliveryTag (), true );
146+
147+ checkPromptArrival (c , 2 , latency );
148+ }
149+
94150 public void testDeadLetterDeletedDLX () throws Exception {
95151 declareQueue (TEST_QUEUE_NAME , DLX , null , null , 1 );
96152 channel .queueBind (TEST_QUEUE_NAME , "amq.direct" , "test" );
@@ -322,6 +378,24 @@ private void sleep(long millis) {
322378 }
323379 }
324380
381+ /* check that each message arrives within epsilon of the
382+ publication time + TTL + latency */
383+ private void checkPromptArrival (QueueingConsumer c ,
384+ int count , long latency ) throws Exception {
385+ long epsilon = TTL / 100 ;
386+ for (int i = 0 ; i < count ; i ++) {
387+ Delivery d = c .nextDelivery (TTL + TTL + latency + epsilon );
388+ assertNotNull ("message #" + i + " did not expire" , d );
389+ long now = System .currentTimeMillis ();
390+ long publishTime = Long .valueOf (new String (d .getBody ()));
391+ long targetTime = publishTime + TTL + latency ;
392+ assertTrue ("expiry outside bounds (+/- " + epsilon + "): " +
393+ (now - targetTime ),
394+ (now > targetTime - epsilon ) &&
395+ (now < targetTime + epsilon ));
396+ }
397+ }
398+
325399 private void declareQueue (Object deadLetterExchange ) throws IOException {
326400 declareQueue (TEST_QUEUE_NAME , deadLetterExchange , null , null );
327401 }
@@ -359,10 +433,23 @@ private void publishN(int n) throws IOException {
359433 private void publishN (int n , AMQP .BasicProperties props )
360434 throws IOException
361435 {
362- for (int x = 0 ; x < n ; x ++) {
363- channel .basicPublish ("amq.direct" , "test" , props ,
364- "test message" .getBytes ());
365- }
436+ for (int x = 0 ; x < n ; x ++) { publish (props , "test message" ); }
437+ }
438+
439+ private void publish (AMQP .BasicProperties props , String body )
440+ throws IOException
441+ {
442+ channel .basicPublish ("amq.direct" , "test" , props , body .getBytes ());
443+ }
444+
445+ private void publishAt (long when ) throws Exception {
446+ waitUntil (when );
447+ publish (null , Long .toString (System .currentTimeMillis ()));
448+ }
449+
450+ private void waitUntil (long when ) throws Exception {
451+ long delay = when - System .currentTimeMillis ();
452+ Thread .sleep (delay > 0 ? delay : 0 );
366453 }
367454
368455 private void consumeN (String queue , int n , WithResponse withResponse )
0 commit comments