@@ -183,6 +183,34 @@ public void testTransactionalPublishWithGet() throws Exception {
183183 assertNull (get ());
184184 }
185185
186+ /*
187+ * Test expiry of requeued messages
188+ */
189+ public void testExpiryWithRequeue () throws Exception {
190+ long ttl = 1000 ;
191+ declareQueue (TTL_QUEUE_NAME , ttl );
192+ this .channel .queueBind (TTL_QUEUE_NAME , TTL_EXCHANGE , TTL_QUEUE_NAME );
193+
194+ byte [] msg1 = "one" .getBytes ();
195+ byte [] msg2 = "two" .getBytes ();
196+ byte [] msg3 = "three" .getBytes ();
197+
198+ basicPublishVolatile (msg1 , TTL_EXCHANGE , TTL_QUEUE_NAME );
199+ Thread .sleep (500 );
200+ basicPublishVolatile (msg2 , TTL_EXCHANGE , TTL_QUEUE_NAME );
201+ basicPublishVolatile (msg3 , TTL_EXCHANGE , TTL_QUEUE_NAME );
202+
203+ expectBodyAndRemainingMessages ("one" , 2 );
204+ expectBodyAndRemainingMessages ("two" , 1 );
205+
206+ closeChannel ();
207+ openChannel ();
208+
209+ Thread .sleep (600 );
210+ expectBodyAndRemainingMessages ("two" , 1 );
211+ expectBodyAndRemainingMessages ("three" , 0 );
212+ }
213+
186214
187215 private byte [] get () throws IOException {
188216 GetResponse response = basicGet (TTL_QUEUE_NAME );
@@ -197,5 +225,10 @@ private AMQP.Queue.DeclareOk declareQueue(String name, Object ttlValue) throws I
197225 return this .channel .queueDeclare (name , false , true , false , argMap );
198226 }
199227
228+ private void expectBodyAndRemainingMessages (String body , int messagesLeft ) throws IOException {
229+ GetResponse response = channel .basicGet (TTL_QUEUE_NAME , false );
230+ assertEquals (body , new String (response .getBody ()));
231+ assertEquals (messagesLeft , response .getMessageCount ());
232+ }
200233
201234}
0 commit comments