Skip to content

Commit 0f950e0

Browse files
author
Michael Bridgen
committed
And the magic ingredients (missing files)
1 parent 322fe42 commit 0f950e0

File tree

3 files changed

+437
-0
lines changed

3 files changed

+437
-0
lines changed
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2009 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2009 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
32+
package com.rabbitmq.client.test.functional;
33+
34+
import com.rabbitmq.client.test.BrokerTestCase;
35+
36+
import com.rabbitmq.client.AMQP;
37+
import com.rabbitmq.client.Channel;
38+
import com.rabbitmq.client.Connection;
39+
import com.rabbitmq.client.GetResponse;
40+
import com.rabbitmq.client.QueueingConsumer;
41+
import java.io.IOException;
42+
43+
/**
44+
* This tests whether bindings are created and nuked properly.
45+
*
46+
* The tests attempt to declare durable queues on a secondary node, if
47+
* present, and that node is restarted as part of the tests while the
48+
* primary node is still running. That way we exercise any node-down
49+
* handler code in the server.
50+
*
51+
*/
52+
public class BindingLifecycleBase extends BrokerTestCase {
53+
protected static final String K = "K-" + System.currentTimeMillis();
54+
protected static final int N = 1;
55+
protected static final String Q = "Q-" + System.currentTimeMillis();
56+
protected static final String X = "X-" + System.currentTimeMillis();
57+
protected static final byte[] payload = ("" + System.currentTimeMillis()).getBytes();
58+
59+
protected static String randomString() {
60+
return "-" + System.nanoTime();
61+
}
62+
public Channel secondaryChannel;
63+
public Connection secondaryConnection;
64+
65+
@Override
66+
public void openChannel() throws IOException {
67+
if (secondaryConnection != null) {
68+
secondaryChannel = secondaryConnection.createChannel();
69+
}
70+
super.openChannel();
71+
}
72+
73+
@Override
74+
public void openConnection() throws IOException {
75+
super.openConnection();
76+
if (secondaryConnection == null) {
77+
try {
78+
secondaryConnection = connectionFactory.newConnection("localhost", 5673);
79+
}
80+
catch (IOException e) {
81+
}
82+
}
83+
}
84+
85+
@Override
86+
public void closeChannel() throws IOException {
87+
if (secondaryChannel != null) {
88+
secondaryChannel.abort();
89+
secondaryChannel = null;
90+
}
91+
super.closeChannel();
92+
}
93+
94+
@Override
95+
public void closeConnection() throws IOException {
96+
if (secondaryConnection != null) {
97+
secondaryConnection.abort();
98+
secondaryConnection = null;
99+
}
100+
super.closeConnection();
101+
}
102+
103+
protected void createQueueAndBindToExchange(Binding binding, boolean durable) throws IOException {
104+
channel.exchangeDeclare(binding.x, "direct", durable);
105+
channel.queueDeclare(binding.q, durable);
106+
channel.queueBind(binding.q, binding.x, binding.k);
107+
}
108+
109+
protected void declareDurableQueue(String q) throws IOException {
110+
(secondaryChannel == null ? channel : secondaryChannel).queueDeclare(q, true);
111+
}
112+
113+
protected void deleteExchangeAndQueue(Binding binding) throws IOException {
114+
channel.queueDelete(binding.q);
115+
channel.exchangeDelete(binding.x);
116+
}
117+
118+
protected void doAutoDelete(boolean durable, int queues) throws IOException {
119+
String[] queueNames = null;
120+
Binding binding = Binding.randomBinding();
121+
channel.exchangeDeclare(binding.x, "direct", false, durable, true, null);
122+
channel.queueDeclare(binding.q, false, durable, false, true, null);
123+
channel.queueBind(binding.q, binding.x, binding.k);
124+
if (queues > 1) {
125+
int j = queues - 1;
126+
queueNames = new String[j];
127+
for (int i = 0; i < j; i++) {
128+
queueNames[i] = randomString();
129+
channel.queueDeclare(queueNames[i], false, durable, false, false, null);
130+
channel.queueBind(queueNames[i], binding.x, binding.k);
131+
channel.basicConsume(queueNames[i], true, new QueueingConsumer(channel));
132+
}
133+
}
134+
subscribeSendUnsubscribe(binding);
135+
if (durable) {
136+
restart();
137+
}
138+
if (queues > 1) {
139+
for (String s : queueNames) {
140+
channel.basicConsume(s, true, new QueueingConsumer(channel));
141+
Binding tmp = new Binding(s, binding.x, binding.k);
142+
sendUnroutable(tmp);
143+
}
144+
}
145+
channel.queueDeclare(binding.q, false, durable, true, true, null);
146+
// if (queues == 1): Because the exchange does not exist, this
147+
// bind should fail
148+
try {
149+
channel.queueBind(binding.q, binding.x, binding.k);
150+
sendRoutable(binding);
151+
}
152+
catch (Exception e) {
153+
// do nothing, this is the correct behaviour
154+
channel = null;
155+
return;
156+
}
157+
if (queues == 1) {
158+
deleteExchangeAndQueue(binding);
159+
fail("Queue bind should have failed");
160+
}
161+
// Do some cleanup
162+
if (queues > 1) {
163+
for (String q : queueNames) {
164+
channel.queueDelete(q);
165+
}
166+
}
167+
}
168+
169+
protected void restart() throws IOException {
170+
}
171+
172+
protected void sendRoutable(Binding binding) throws IOException {
173+
channel.basicPublish(binding.x, binding.k, null, payload);
174+
GetResponse response = channel.basicGet(binding.q, true);
175+
assertNotNull("The response should not be null", response);
176+
}
177+
178+
protected void sendUnroutable(Binding binding) throws IOException {
179+
channel.basicPublish(binding.x, binding.k, null, payload);
180+
GetResponse response = channel.basicGet(binding.q, true);
181+
assertNull("The response SHOULD BE null", response);
182+
}
183+
184+
protected Binding setupExchangeAndRouteMessage(boolean durable) throws IOException {
185+
Binding binding = setupExchangeBindings(durable);
186+
sendRoutable(binding);
187+
return binding;
188+
}
189+
190+
protected Binding setupExchangeBindings(boolean durable) throws IOException {
191+
Binding binding = Binding.randomBinding();
192+
createQueueAndBindToExchange(binding, durable);
193+
return binding;
194+
}
195+
196+
protected void subscribeSendUnsubscribe(Binding binding) throws IOException {
197+
String tag = channel.basicConsume(binding.q, new QueueingConsumer(channel));
198+
sendUnroutable(binding);
199+
channel.basicCancel(tag);
200+
}
201+
202+
protected static class Binding {
203+
204+
String q;
205+
String x;
206+
String k;
207+
208+
static Binding randomBinding() {
209+
return new Binding(randomString(), randomString(), randomString());
210+
}
211+
212+
protected Binding(String q, String x, String k) {
213+
this.q = q;
214+
this.x = x;
215+
this.k = k;
216+
}
217+
}
218+
219+
// A couple of tests that are common to the subclasses (which differ on
220+
// whether the broker is restarted)
221+
222+
/**
223+
*
224+
* The same thing as testExchangeAutoDelete, but with durable
225+
* queues.
226+
*
227+
* Main difference is restarting the broker to make sure that the
228+
* durable queues are blasted away.
229+
*/
230+
public void testExchangeAutoDeleteDurable() throws IOException {
231+
doAutoDelete(true, 1);
232+
}
233+
234+
/**
235+
* The same thing as testExchangeAutoDeleteManyBindings, but with
236+
* durable queues.
237+
*/
238+
public void testExchangeAutoDeleteDurableManyBindings() throws IOException {
239+
doAutoDelete(true, 10);
240+
}
241+
242+
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2009 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2009 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
32+
package com.rabbitmq.client.test.server;
33+
34+
import com.rabbitmq.client.AMQP;
35+
import com.rabbitmq.client.Channel;
36+
import com.rabbitmq.client.Connection;
37+
import com.rabbitmq.client.GetResponse;
38+
import com.rabbitmq.client.QueueingConsumer;
39+
40+
import com.rabbitmq.client.test.functional.BindingLifecycleBase;
41+
42+
import com.rabbitmq.tools.Host;
43+
44+
import java.io.IOException;
45+
46+
/**
47+
* This tests whether bindings are created and nuked properly.
48+
*
49+
* The tests attempt to declare durable queues on a secondary node, if
50+
* present, and that node is restarted as part of the tests while the
51+
* primary node is still running. That way we exercise any node-down
52+
* handler code in the server.
53+
*
54+
*/
55+
public class DurableBindingLifecycle extends BindingLifecycleBase {
56+
57+
@Override
58+
protected void restart() throws IOException {
59+
if (secondaryConnection != null) {
60+
secondaryConnection.abort();
61+
secondaryConnection = null;
62+
secondaryChannel = null;
63+
Host.executeCommand("cd ../rabbitmq-test; make restart-secondary-node");
64+
}
65+
tearDown();
66+
Host.executeCommand("cd ../rabbitmq-test; make restart-app");
67+
setUp();
68+
}
69+
70+
/**
71+
* Tests whether durable bindings are correctly recovered.
72+
*/
73+
public void testDurableBindingRecovery() throws IOException {
74+
declareDurableTopicExchange(X);
75+
declareAndBindDurableQueue(Q, X, K);
76+
77+
restart();
78+
79+
for (int i = 0; i < N; i++){
80+
basicPublishVolatile(X, K);
81+
}
82+
83+
assertDelivered(Q, N);
84+
85+
deleteQueue(Q);
86+
deleteExchange(X);
87+
}
88+
89+
/**
90+
* This tests whether the bindings attached to a durable exchange
91+
* are correctly blown away when the exhange is nuked.
92+
*
93+
* This complements a unit test for testing non-durable exhanges.
94+
* In that case, an exchange is deleted and you expect any
95+
* bindings hanging to it to be deleted as well. To verify this,
96+
* the exchange is deleted and then recreated.
97+
*
98+
* After the recreation, the old bindings should no longer exist
99+
* and hence any messages published to that exchange get routed to
100+
* /dev/null
101+
*
102+
* This test exercises the durable variable of that test, so the
103+
* main difference is that the broker has to be restarted to
104+
* verify that the durable routes have been turfed.
105+
*/
106+
public void testDurableBindingsDeletion() throws IOException {
107+
declareDurableTopicExchange(X);
108+
declareAndBindDurableQueue(Q, X, K);
109+
110+
deleteExchange(X);
111+
112+
restart();
113+
114+
declareDurableTopicExchange(X);
115+
116+
for (int i = 0; i < N; i++){
117+
basicPublishVolatile(X, K);
118+
}
119+
120+
GetResponse response = channel.basicGet(Q, true);
121+
assertNull("The initial response SHOULD BE null", response);
122+
123+
deleteQueue(Q);
124+
deleteExchange(X);
125+
}
126+
127+
128+
/**
129+
* This tests whether the default bindings for durable queues
130+
* are recovered properly.
131+
*
132+
* The idea is to create a durable queue, nuke the server and then
133+
* publish a message to it using the queue name as a routing key
134+
*/
135+
public void testDefaultBindingRecovery() throws IOException {
136+
declareDurableQueue(Q);
137+
138+
restart();
139+
140+
basicPublishVolatile("", Q);
141+
142+
GetResponse response = channel.basicGet(Q, true);
143+
assertNotNull("The initial response SHOULD NOT be null", response);
144+
145+
deleteQueue(Q);
146+
}
147+
148+
149+
}

0 commit comments

Comments
 (0)