1111// The Original Code is RabbitMQ.
1212//
1313// The Initial Developer of the Original Code is VMware, Inc.
14- // Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
14+ // Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
1515//
1616
1717
1818package com .rabbitmq .client .test .functional ;
1919
20- import com .rabbitmq .client .AMQP ;
2120import com .rabbitmq .client .GetResponse ;
22- import com .rabbitmq .client .MessageProperties ;
2321import com .rabbitmq .client .test .BrokerTestCase ;
2422import java .io .IOException ;
2523import java .util .ArrayList ;
2624import java .util .HashMap ;
25+ import java .util .List ;
2726import java .util .Map ;
2827
2928/**
3029 * Test queue max length limit.
3130 */
3231public class QueueSizeLimit extends BrokerTestCase {
3332
34- private final int MAXLENGTH = 5 ;
35- private final int MAXLENGTH1 = MAXLENGTH + 1 ;
36- private final int EXPIRY_TIMEOUT = 100 ;
33+ private final int MAXMAXLENGTH = 3 ;
3734 private final String q = "queue-maxlength" ;
3835
39- @ Override
40- protected void setUp () throws IOException {
41- super .setUp ();
42- channel .confirmSelect ();
43- }
44-
45- AMQP .BasicProperties setupDlx (boolean persistent ) throws IOException {
46- channel .queueDeclare ("DLQ" , false , true , false , null );
47- channel .queueBind ("DLQ" , "amq.fanout" , "" );
48- declareQueue (persistent , true );
49- AMQP .BasicProperties props = null ;
50- if (persistent ) {
51- props = MessageProperties .MINIMAL_PERSISTENT_BASIC ;
36+ public void testQueueSize () throws IOException , InterruptedException {
37+ for (int maxLen = 0 ; maxLen <= MAXMAXLENGTH ; maxLen ++){
38+ setupNonDlxTest (maxLen , false );
39+ assertHead (maxLen , "msg2" , q );
40+ deleteQueue (q );
5241 }
53- return props ;
5442 }
5543
56- public void testQueueSize () throws IOException , InterruptedException {
57- declareQueue (false , false );
58- fill (false , false , false );
59- syncPublish (null , "msg" + MAXLENGTH1 );
60- assertHead (MAXLENGTH , "msg2" , q );
44+ public void testQueueSizeUnacked () throws IOException , InterruptedException {
45+ for (int maxLen = 0 ; maxLen <= MAXMAXLENGTH ; maxLen ++){
46+ setupNonDlxTest (maxLen , true );
47+ assertHead (maxLen > 0 ? 1 : 0 , "msg" + (maxLen + 1 ), q );
48+ deleteQueue (q );
49+ }
6150 }
6251
63- public void testQueueUnacked () throws IOException , InterruptedException {
64- declareQueue (false , false );
65- fill (false , true , false );
66- syncPublish (null , "msg" + MAXLENGTH1 );
67- assertHead (1 , "msg" + MAXLENGTH1 , q );
52+ public void testQueueSizeDlx () throws IOException , InterruptedException {
53+ for (int maxLen = 0 ; maxLen <= MAXMAXLENGTH ; maxLen ++){
54+ setupDlxTest (maxLen , false );
55+ assertHead (1 , "msg1" , "DLQ" );
56+ deleteQueue (q );
57+ deleteQueue ("DLQ" );
58+ }
6859 }
6960
70- public void testPersistent () throws IOException , InterruptedException {
71- declareQueue (true , false );
72- fill (true , true , false );
73- syncPublish (MessageProperties .MINIMAL_PERSISTENT_BASIC , "msg" + MAXLENGTH1 );
74- assertHead (1 , "msg" + MAXLENGTH1 , q );
61+ public void testQueueSizeUnackedDlx () throws IOException , InterruptedException {
62+ for (int maxLen = 0 ; maxLen <= MAXMAXLENGTH ; maxLen ++){
63+ setupDlxTest (maxLen , true );
64+ assertHead (maxLen > 0 ? 0 : 1 , "msg1" , "DLQ" );
65+ deleteQueue (q );
66+ deleteQueue ("DLQ" );
67+ }
7568 }
7669
77- public void testDlxHeadTransient () throws IOException , InterruptedException {
78- dlxHead (false );
70+ public void testRequeue () throws IOException , InterruptedException {
71+ for (int maxLen = 1 ; maxLen <= MAXMAXLENGTH ; maxLen ++) {
72+ declareQueue (maxLen , false );
73+ setupRequeueTest (maxLen );
74+ assertHead (maxLen , "msg1" , q );
75+ deleteQueue (q );
76+ }
7977 }
8078
81- public void testDlxTailTransient () throws IOException , InterruptedException {
82- dlxTail (false );
79+ public void testRequeueWithDlx () throws IOException , InterruptedException {
80+ for (int maxLen = 1 ; maxLen <= MAXMAXLENGTH ; maxLen ++) {
81+ declareQueue (maxLen , true );
82+ setupRequeueTest (maxLen );
83+ assertHead (maxLen , "msg1" , q );
84+ assertHead (maxLen , "msg1" , "DLQ" );
85+ deleteQueue (q );
86+ deleteQueue ("DLQ" );
87+ }
8388 }
8489
85- public void testDlxHeadDurable () throws IOException , InterruptedException {
86- dlxHead (true );
90+ private void setupNonDlxTest (int maxLen , boolean unAcked ) throws IOException , InterruptedException {
91+ declareQueue (maxLen , false );
92+ fill (maxLen );
93+ if (unAcked )
94+ getUnacked (maxLen , null );
95+ publish ("msg" + (maxLen + 1 ));
8796 }
8897
89- public void testDlxTailDurable () throws IOException , InterruptedException {
90- dlxTail (true );
98+ private void setupDlxTest (int maxLen , boolean unAcked ) throws IOException , InterruptedException {
99+ declareQueue (maxLen , true );
100+ fill (maxLen );
101+ if (unAcked )
102+ getUnacked (maxLen , null );
103+ publish ("msg" + (maxLen + 1 ));
104+ try {
105+ Thread .sleep (100 );
106+ } catch (InterruptedException _) { }
91107 }
92108
93- public void testMaxlenZero () throws IOException , InterruptedException {
94- Map <String , Object > args = new HashMap <String , Object >();
95- args .put ("x-max-length" , 0 );
96- channel .queueDeclare (q , false , true , true , args );
97- syncPublish (null , "msg" );
98- assertNull (channel .basicGet (q , true ));
109+ private void setupRequeueTest (int maxLen ) throws IOException , InterruptedException {
110+ List <Long > tags = new ArrayList <Long >(maxLen );
111+ fill (maxLen );
112+ getUnacked (maxLen , tags );
113+ fill (maxLen );
114+ channel .basicNack (tags .get (0 ), false , true );
115+ if (maxLen > 1 )
116+ channel .basicNack (tags .get (maxLen - 1 ), true , true );
99117 }
100118
101- public void testMaxlenOne ( ) throws IOException , InterruptedException {
119+ private void declareQueue ( int maxLen , boolean dlx ) throws IOException {
102120 Map <String , Object > args = new HashMap <String , Object >();
103- args .put ("x-max-length" , 1 );
121+ args .put ("x-max-length" , maxLen );
122+ if (dlx ) {
123+ args .put ("x-dead-letter-exchange" , "amq.fanout" );
124+ channel .queueDeclare ("DLQ" , false , true , false , null );
125+ channel .queueBind ("DLQ" , "amq.fanout" , "" );
126+ }
104127 channel .queueDeclare (q , false , true , true , args );
105-
106- AMQP .BasicProperties props = new AMQP .BasicProperties ();
107- props .setExpiration ((new Integer (EXPIRY_TIMEOUT )).toString ());
108- channel .basicPublish ("" , q , props , "msg1" .getBytes ());
109- channel .basicPublish ("" , q , null , "msg2" .getBytes ());
110-
111- channel .waitForConfirms ();
112- Thread .sleep (EXPIRY_TIMEOUT * 2 );
113- assertHead (1 , "msg2" , q );
114128 }
115129
116- public void testRequeue () throws IOException , InterruptedException {
117- declareQueue (false , false );
118- ArrayList <Long > tags = new ArrayList <Long >(MAXLENGTH );;
119- fill (false , false , false );
120- getUnacked (MAXLENGTH , tags );
121- fill (false , false , false );
122- channel .basicNack (tags .get (0 ), false , true );
123- channel .basicNack (tags .get (MAXLENGTH - 1 ), true , true );
124- assertHead (MAXLENGTH , "msg1" , q );
125- }
126-
127- public void testRequeueWithDlx () throws IOException , InterruptedException {
128- setupDlx (false );
129- ArrayList <Long > tags = new ArrayList <Long >(MAXLENGTH );;
130- fill (false , false , true );
131- getUnacked (MAXLENGTH , tags );
132- fill (false , false , true );
133- channel .basicNack (tags .get (0 ), false , true );
134- channel .basicNack (tags .get (MAXLENGTH - 1 ), true , true );
135- assertHead (MAXLENGTH , "msg1" , q );
136- assertHead (MAXLENGTH , "msg1" , "DLQ" );
137- }
138-
139- public void dlxHead (boolean persistent ) throws IOException , InterruptedException {
140- AMQP .BasicProperties props = setupDlx (persistent );
141- fill (persistent , false , true );
142- syncPublish (props , "msg" + MAXLENGTH1 );
143- assertEquals (MAXLENGTH , declareQueue (persistent , true ));
144- assertHead (1 , "msg1" , "DLQ" );
145- }
146-
147- public void dlxTail (boolean persistent ) throws IOException , InterruptedException {
148- AMQP .BasicProperties props = setupDlx (persistent );
149- fill (persistent , true , true );
150- syncPublish (props , "msg" + MAXLENGTH1 );
151- assertNull (channel .basicGet ("DLQ" , true ));
152- assertHead (1 , "msg" + MAXLENGTH1 , q );
153- }
154-
155- private void fill (boolean persistent , boolean unAcked , boolean dlx ) throws IOException , InterruptedException {
156- for (int i =1 ; i <= MAXLENGTH ; i ++){
157- syncPublish (null , "msg" + i );
158- if (unAcked ) {
159- assertNotNull (channel .basicGet (q , false ));
160- }
161- }
162- if (unAcked ) {
163- assertEquals (0 , declareQueue (persistent , dlx ));
164- } else {
165- assertEquals (MAXLENGTH , declareQueue (persistent , dlx ));
130+ private void fill (int count ) throws IOException , InterruptedException {
131+ for (int i =1 ; i <= count ; i ++){
132+ publish ("msg" + i );
166133 }
167134 }
168135
169- private void syncPublish (AMQP .BasicProperties props , String payload ) throws IOException , InterruptedException {
170- channel .basicPublish ("" , q , props , payload .getBytes ());
171- channel .waitForConfirmsOrDie ();
136+ private void publish (String payload ) throws IOException , InterruptedException {
137+ basicPublishVolatile (payload .getBytes (), q );
172138 }
173139
174- private int declareQueue (boolean persistent , boolean dlx ) throws IOException {
175- Map <String , Object > args = new HashMap <String , Object >();
176- args .put ("x-max-length" , MAXLENGTH );
177- if (dlx ) {
178- args .put ("x-dead-letter-exchange" , "amq.fanout" );
179- }
180- AMQP .Queue .DeclareOk ok = channel .queueDeclare (q , persistent , true , true , args );
181- return ok .getMessageCount ();
182- }
183-
184- private void assertHead (int expectedLength , String expectedPayload , String queueName ) throws IOException {
140+ private void assertHead (int expectedLength , String expectedHeadPayload , String queueName ) throws IOException {
185141 GetResponse head = channel .basicGet (queueName , true );
186- assertNotNull (head );
187- assertEquals (expectedPayload , new String (head .getBody ()));
188- assertEquals (expectedLength , head .getMessageCount () + 1 );
142+ if (expectedLength > 0 ) {
143+ assertNotNull (head );
144+ assertEquals (expectedHeadPayload , new String (head .getBody ()));
145+ assertEquals (expectedLength , head .getMessageCount () + 1 );
146+ } else {
147+ assertNull (head );
148+ }
189149 }
190150
191- private void getUnacked (int howMany , ArrayList <Long > acks ) throws IOException {
151+ private void getUnacked (int howMany , List <Long > acks ) throws IOException {
192152 for (;howMany > 0 ; howMany --){
193- acks .add (channel .basicGet (q , false ).getEnvelope ().getDeliveryTag ());
153+ GetResponse response = channel .basicGet (q , false );
154+ if (acks != null )
155+ acks .add (response .getEnvelope ().getDeliveryTag ());
194156 }
195157 }
196- }
158+ }
0 commit comments