Skip to content

Commit aaf6433

Browse files
author
Emile Joubert
committed
Add test for expiry with requeue
1 parent ed2e077 commit aaf6433

File tree

1 file changed

+33
-0
lines changed

1 file changed

+33
-0
lines changed

test/src/com/rabbitmq/client/test/functional/PerQueueTTL.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,34 @@ public void testTransactionalPublishWithGet() throws Exception {
183183
assertNull(get());
184184
}
185185

186+
/*
187+
* Test expiry of requeued messages
188+
*/
189+
public void testExpiryWithRequeue() throws Exception {
190+
long ttl = 1000;
191+
declareQueue(TTL_QUEUE_NAME, ttl);
192+
this.channel.queueBind(TTL_QUEUE_NAME, TTL_EXCHANGE, TTL_QUEUE_NAME);
193+
194+
byte[] msg1 = "one".getBytes();
195+
byte[] msg2 = "two".getBytes();
196+
byte[] msg3 = "three".getBytes();
197+
198+
basicPublishVolatile(msg1, TTL_EXCHANGE, TTL_QUEUE_NAME);
199+
Thread.sleep(500);
200+
basicPublishVolatile(msg2, TTL_EXCHANGE, TTL_QUEUE_NAME);
201+
basicPublishVolatile(msg3, TTL_EXCHANGE, TTL_QUEUE_NAME);
202+
203+
expectBodyAndRemainingMessages("one", 2);
204+
expectBodyAndRemainingMessages("two", 1);
205+
206+
closeChannel();
207+
openChannel();
208+
209+
Thread.sleep(600);
210+
expectBodyAndRemainingMessages("two", 1);
211+
expectBodyAndRemainingMessages("three", 0);
212+
}
213+
186214

187215
private byte[] get() throws IOException {
188216
GetResponse response = basicGet(TTL_QUEUE_NAME);
@@ -197,5 +225,10 @@ private AMQP.Queue.DeclareOk declareQueue(String name, Object ttlValue) throws I
197225
return this.channel.queueDeclare(name, false, true, false, argMap);
198226
}
199227

228+
private void expectBodyAndRemainingMessages(String body, int messagesLeft) throws IOException {
229+
GetResponse response = channel.basicGet(TTL_QUEUE_NAME, false);
230+
assertEquals(body, new String(response.getBody()));
231+
assertEquals(messagesLeft, response.getMessageCount());
232+
}
200233

201234
}

0 commit comments

Comments
 (0)