1818package com .rabbitmq .client .test .functional ;
1919
2020import com .rabbitmq .client .AMQP ;
21- import com .rabbitmq .client .GetResponse ;
22- import com .rabbitmq .client .QueueingConsumer ;
23- import com .rabbitmq .client .QueueingConsumer .Delivery ;
24- import com .rabbitmq .client .test .BrokerTestCase ;
21+ import com .rabbitmq .client .MessageProperties ;
2522
2623import java .io .IOException ;
2724import java .util .Collections ;
2825import java .util .Map ;
2926
30- /**
31- *
32- */
33- public class PerQueueTTL extends BrokerTestCase {
27+ public class PerQueueTTL extends TTLHandling {
3428
35- private static final String TTL_EXCHANGE = "ttl.exchange" ;
36- private static final String TTL_ARG = "x-message-ttl" ;
37- private static final String TTL_QUEUE_NAME = "queue.ttl" ;
38- private static final String TTL_INVALID_QUEUE_NAME = "invalid.queue.ttl" ;
39-
40- private static final String [] MSG = {"one" , "two" , "three" };
41-
42- @ Override
43- protected void createResources () throws IOException {
44- this .channel .exchangeDeclare (TTL_EXCHANGE , "direct" );
45- }
29+ protected static final String TTL_ARG = "x-message-ttl" ;
4630
4731 @ Override
48- protected void releaseResources () throws IOException {
49- this .channel .exchangeDelete (TTL_EXCHANGE );
50- }
51-
52- public void testCreateQueueTTLTypes () throws IOException {
53- Object [] args = {(byte ) 200 , (short ) 200 , 200 , 200L };
54- for (Object ttl : args ) {
55- try {
56- declareQueue (ttl );
57- } catch (IOException ex ) {
58- fail ("Should be able to use " + ttl .getClass ().getName () +
59- " for x-message-ttl" );
60- }
61- }
62- }
63-
64- public void testTTLAllowZero () throws Exception {
65- try {
66- declareQueue (0 );
67- } catch (IOException e ) {
68- fail ("Should be able to declare a queue with zero for x-message-ttl" );
69- }
70- }
71-
72- public void testCreateQueueWithInvalidTTL () throws Exception {
73- try {
74- declareQueue (TTL_INVALID_QUEUE_NAME , "foobar" );
75- fail ("Should not be able to declare a queue with a non-long value for x-message-ttl" );
76- } catch (IOException e ) {
77- checkShutdownSignal (AMQP .PRECONDITION_FAILED , e );
78- }
79- }
80-
81- public void testTTLMustBePositive () throws Exception {
82- try {
83- declareQueue (TTL_INVALID_QUEUE_NAME , -10 );
84- fail ("Should not be able to declare a queue with negative value for x-message-ttl" );
85- } catch (IOException e ) {
86- checkShutdownSignal (AMQP .PRECONDITION_FAILED , e );
87- }
32+ protected AMQP .Queue .DeclareOk declareQueue (String name , Object ttlValue ) throws IOException {
33+ Map <String , Object > argMap = Collections .singletonMap (TTL_ARG , ttlValue );
34+ return this .channel .queueDeclare (name , false , true , false , argMap );
8835 }
8936
90- public void testQueueRedeclareEquivalence () throws Exception {
37+ public void testQueueReDeclareEquivalence () throws Exception {
9138 declareQueue (10 );
9239 try {
9340 declareQueue (20 );
9441 fail ("Should not be able to redeclare with different x-message-ttl" );
95- } catch (IOException ex ) {
42+ } catch (IOException ex ) {
9643 checkShutdownSignal (AMQP .PRECONDITION_FAILED , ex );
9744 }
9845 }
9946
100- public void testQueueRedeclareSemanticEquivalence () throws Exception {
101- declareQueue ((byte ) 10 );
47+ public void testQueueReDeclareSemanticEquivalence () throws Exception {
48+ declareQueue ((byte )10 );
10249 declareQueue (10 );
103- declareQueue ((short ) 10 );
50+ declareQueue ((short )10 );
10451 declareQueue (10L );
10552 }
10653
@@ -109,136 +56,16 @@ public void testQueueReDeclareSemanticNonEquivalence() throws Exception {
10956 try {
11057 declareQueue (10.0 );
11158 fail ("Should not be able to redeclare with x-message-ttl argument of different type" );
112- } catch (IOException ex ) {
59+ } catch (IOException ex ) {
11360 checkShutdownSignal (AMQP .PRECONDITION_FAILED , ex );
11461 }
11562 }
11663
117- /*
118- * Test messages expire when using basic get.
119- */
120- public void testPublishAndGetWithExpiry () throws Exception {
121- declareAndBindQueue (200 );
122-
123- publish (MSG [0 ]);
124- Thread .sleep (150 );
125-
126- publish (MSG [1 ]);
127- Thread .sleep (100 );
128-
129- publish (MSG [2 ]);
130-
131- assertEquals (MSG [1 ], get ());
132- assertEquals (MSG [2 ], get ());
133- }
134-
135- /*
136- * Test get expiry for messages sent under a transaction
137- */
138- public void testTransactionalPublishWithGet () throws Exception {
139- declareAndBindQueue (100 );
140-
141- this .channel .txSelect ();
142-
143- publish (MSG [0 ]);
144- Thread .sleep (150 );
145-
146- publish (MSG [1 ]);
147- this .channel .txCommit ();
148- Thread .sleep (50 );
149-
150- assertEquals (MSG [0 ], get ());
151- Thread .sleep (80 );
152-
153- assertNull (get ());
64+ protected void publishWithExpiration (String msg , Object sessionTTL ) throws IOException {
65+ basicPublishVolatile (msg .getBytes (), TTL_EXCHANGE , TTL_QUEUE_NAME ,
66+ MessageProperties .TEXT_PLAIN
67+ .builder ()
68+ .expiration (String .valueOf (sessionTTL ))
69+ .build ());
15470 }
155-
156- /*
157- * Test expiry of re-queued messages
158- */
159- public void testExpiryWithRequeue () throws Exception {
160- declareAndBindQueue (200 );
161-
162- publish (MSG [0 ]);
163- Thread .sleep (100 );
164- publish (MSG [1 ]);
165- publish (MSG [2 ]);
166-
167- expectBodyAndRemainingMessages (MSG [0 ], 2 );
168- expectBodyAndRemainingMessages (MSG [1 ], 1 );
169-
170- closeChannel ();
171- openChannel ();
172-
173- Thread .sleep (150 );
174- expectBodyAndRemainingMessages (MSG [1 ], 1 );
175- expectBodyAndRemainingMessages (MSG [2 ], 0 );
176- }
177-
178- /*
179- * Test expiry of re-queued messages after being consumed instantly
180- */
181- public void testExpiryWithRequeueAfterConsume () throws Exception {
182- declareAndBindQueue (100 );
183- QueueingConsumer c = new QueueingConsumer (channel );
184- channel .basicConsume (TTL_QUEUE_NAME , c );
185-
186- publish (MSG [0 ]);
187- assertNotNull (c .nextDelivery (100 ));
188-
189- closeChannel ();
190- Thread .sleep (150 );
191- openChannel ();
192-
193- assertNull ("Requeued message not expired" , get ());
194- }
195-
196- public void testZeroTTLDelivery () throws Exception {
197- declareAndBindQueue (0 );
198-
199- // when there is no consumer, message should expire
200- publish (MSG [0 ]);
201- assertNull (get ());
202-
203- // when there is a consumer, message should be delivered
204- QueueingConsumer c = new QueueingConsumer (channel );
205- channel .basicConsume (TTL_QUEUE_NAME , c );
206- publish (MSG [0 ]);
207- Delivery d = c .nextDelivery (100 );
208- assertNotNull (d );
209-
210- // re-queued messages should expire
211- channel .basicReject (d .getEnvelope ().getDeliveryTag (), true );
212- assertNull (c .nextDelivery (100 ));
213- }
214-
215- private String get () throws IOException {
216- GetResponse response = basicGet (TTL_QUEUE_NAME );
217- return response == null ? null : new String (response .getBody ());
218- }
219-
220- private void publish (String msg ) throws IOException {
221- basicPublishVolatile (msg .getBytes (), TTL_EXCHANGE , TTL_QUEUE_NAME );
222- }
223-
224- private void declareAndBindQueue (Object ttlValue ) throws IOException {
225- declareQueue (ttlValue );
226- this .channel .queueBind (TTL_QUEUE_NAME , TTL_EXCHANGE , TTL_QUEUE_NAME );
227- }
228-
229- private AMQP .Queue .DeclareOk declareQueue (Object ttlValue ) throws IOException {
230- return declareQueue (TTL_QUEUE_NAME , ttlValue );
231- }
232-
233- private AMQP .Queue .DeclareOk declareQueue (String name , Object ttlValue ) throws IOException {
234- Map <String , Object > argMap = Collections .singletonMap (TTL_ARG , ttlValue );
235- return this .channel .queueDeclare (name , false , true , false , argMap );
236- }
237-
238- private void expectBodyAndRemainingMessages (String body , int messagesLeft ) throws IOException {
239- GetResponse response = channel .basicGet (TTL_QUEUE_NAME , false );
240- assertEquals (body , new String (response .getBody ()));
241- assertEquals (messagesLeft , response .getMessageCount ());
242- }
243-
244- }
71+ }
0 commit comments