22
33import com .rabbitmq .client .AMQP ;
44import com .rabbitmq .client .GetResponse ;
5+ import com .rabbitmq .client .QueueingConsumer ;
6+ import com .rabbitmq .client .QueueingConsumer .Delivery ;
57import com .rabbitmq .client .test .BrokerTestCase ;
68
79import java .io .IOException ;
@@ -22,6 +24,7 @@ public class DeadLetterExchange extends BrokerTestCase {
2224 private static final String DLQ2 = "queue.dlq2" ;
2325 private static final int MSG_COUNT = 10 ;
2426 private static final int MSG_COUNT_MANY = 1000 ;
27+ private static final int TTL = 1000 ;
2528
2629 @ Override
2730 protected void createResources () throws IOException {
@@ -84,13 +87,62 @@ public void testDeclareQueueWithRoutingKeyButNoDeadLetterExchange()
8487 }
8588
8689 public void testDeadLetterQueueTTLExpiredMessages () throws Exception {
87- ttlTest (1000 );
90+ ttlTest (TTL );
8891 }
8992
9093 public void testDeadLetterQueueZeroTTLExpiredMessages () throws Exception {
9194 ttlTest (0 );
9295 }
9396
97+ public void testDeadLetterQueueTTLPromptExpiry () throws Exception {
98+ Map <String , Object > args = new HashMap <String , Object >();
99+ args .put ("x-message-ttl" , TTL );
100+ declareQueue (TEST_QUEUE_NAME , DLX , null , args );
101+ channel .queueBind (TEST_QUEUE_NAME , "amq.direct" , "test" );
102+ channel .queueBind (DLQ , DLX , "test" );
103+
104+ //measure round-trip latency
105+ QueueingConsumer c = new QueueingConsumer (channel );
106+ String cTag = channel .basicConsume (TEST_QUEUE_NAME , true , c );
107+ long start = System .currentTimeMillis ();
108+ channel .basicPublish ("amq.direct" , "test" , null , "test" .getBytes ());
109+ Delivery d = c .nextDelivery (TTL );
110+ long stop = System .currentTimeMillis ();
111+ assertNotNull (d );
112+ channel .basicCancel (cTag );
113+ long latency = stop -start ;
114+
115+ // publish messages at regular intervals until currentTime +
116+ // 3/4th of TTL
117+ int count = 0 ;
118+ start = System .currentTimeMillis ();
119+ stop = start + TTL * 3 / 4 ;
120+ long now = start ;
121+ while (now < stop ) {
122+ channel .basicPublish ("amq.direct" , "test" , null ,
123+ Long .toString (now ).getBytes ());
124+ count ++;
125+ Thread .sleep (TTL / 100 );
126+ now = System .currentTimeMillis ();
127+ }
128+
129+ // check that each message arrives within epsilon of the
130+ // publication time + TTL + latency
131+ long epsilon = TTL / 100 ;
132+ channel .basicConsume (DLQ , true , c );
133+ while (count -- > 0 ) {
134+ d = c .nextDelivery (TTL + latency + epsilon );
135+ assertNotNull (d );
136+ now = System .currentTimeMillis ();
137+ long publishTime = Long .valueOf (new String (d .getBody ()));
138+ long targetTime = publishTime + TTL + latency ;
139+ assertTrue ("expiry outside bounds (+/- " + epsilon + "): " +
140+ (now - targetTime ),
141+ (now > targetTime - epsilon ) &&
142+ (now < targetTime + epsilon ));
143+ }
144+ }
145+
94146 public void testDeadLetterDeletedDLX () throws Exception {
95147 declareQueue (TEST_QUEUE_NAME , DLX , null , null , 1 );
96148 channel .queueBind (TEST_QUEUE_NAME , "amq.direct" , "test" );
0 commit comments