Skip to content

Commit 10b3cf1

Browse files
committed
Merge with upstream
2 parents a7e2463 + 6744f31 commit 10b3cf1

File tree

12 files changed

+146
-24
lines changed

12 files changed

+146
-24
lines changed

src/com/rabbitmq/client/Connection.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,13 @@ public interface Connection extends ShutdownNotifier { // rename to AMQPConnecti
7575
ConnectionParameters getParameters();
7676

7777
/**
78-
* Get the negotiated maximum number of channels allowed.
78+
* Get the negotiated maximum channel number. Usable channel
79+
* numbers range from 1 to this number, inclusive.
7980
*
8081
* Note that this is the <i>current</i> setting, as opposed to the <i>initially-requested</i>
8182
* setting available from {@link #getParameters()}.{@link ConnectionParameters#getRequestedChannelMax()}.
8283
*
83-
* @return the maximum number of simultaneously-open channels permitted for this connection.
84+
* @return the maximum channel number permitted for this connection.
8485
*/
8586
int getChannelMax();
8687

src/com/rabbitmq/client/ConnectionParameters.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ private static String safeGetProperty(String key, String def) {
5454
/** Default virtual host */
5555
public static final String DEFAULT_VHOST = "/";
5656

57-
/** Default value for the desired maximum number of channels; zero for
57+
/** Default value for the desired maximum channel number; zero for
5858
* unlimited */
5959
public static final int DEFAULT_CHANNEL_MAX = 0;
6060

@@ -126,8 +126,8 @@ public void setVirtualHost(String virtualHost) {
126126
}
127127

128128
/**
129-
* Retrieve the requested maximum number of channels
130-
* @return the initially requested maximum number of channels; zero for unlimited
129+
* Retrieve the requested maximum channel number
130+
* @return the initially requested maximum channel number; zero for unlimited
131131
*/
132132
public int getRequestedChannelMax() {
133133
return _requestedChannelMax;
@@ -166,8 +166,8 @@ public void setRequestedHeartbeat(int requestedHeartbeat) {
166166
}
167167

168168
/**
169-
* Set the requested maximum number of channels
170-
* @param requestedChannelMax initially requested maximum number of channels; zero for unlimited
169+
* Set the requested maximum channel number
170+
* @param requestedChannelMax initially requested maximum channel number; zero for unlimited
171171
*/
172172
public void setRequestedChannelMax(int requestedChannelMax) {
173173
_requestedChannelMax = requestedChannelMax;

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ public int getChannelMax() {
219219
}
220220

221221
/**
222-
* Protected API - set the max <b>number</b> of channels available
222+
* Protected API - set the max channel <b>number</b>
223223
*/
224224
public void setChannelMax(int value) {
225225
_channelManager.setChannelMax(value);

src/com/rabbitmq/client/impl/ChannelManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class ChannelManager {
4747
/** Mapping from channel number to AMQChannel instance */
4848
private final Map<Integer, ChannelN> _channelMap = Collections.synchronizedMap(new HashMap<Integer, ChannelN>());
4949

50-
/** Maximum number of channels available on this connection. */
50+
/** Maximum channel number available on this connection. */
5151
public int _channelMax = 0;
5252

5353
public synchronized int getChannelMax() {
@@ -101,7 +101,7 @@ public synchronized int allocateChannelNumber(int maxChannels) {
101101
maxChannels = Integer.MAX_VALUE;
102102
}
103103
int channelNumber = -1;
104-
for (int candidate = 1; candidate < maxChannels; candidate++) {
104+
for (int candidate = 1; candidate <= maxChannels; candidate++) {
105105
if (!_channelMap.containsKey(candidate)) {
106106
channelNumber = candidate;
107107
break;

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,12 @@ public void releaseChannelNumber() {
193193
command,
194194
this);
195195
synchronized (_channelMutex) {
196-
processShutdownSignal(signal, true, true);
197-
quiescingTransmit(new Channel.CloseOk());
196+
try {
197+
processShutdownSignal(signal, true, false);
198+
quiescingTransmit(new Channel.CloseOk());
199+
} finally {
200+
notifyOutstandingRpc(signal);
201+
}
198202
}
199203
notifyListeners();
200204
return true;

src/com/rabbitmq/client/impl/LongString.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636

3737
/**
3838
* An object providing access to a LongString.
39-
* This might be implemeted to read directly from connection
39+
* This might be implemented to read directly from connection
4040
* socket, depending on the size of the content to be read -
4141
* long strings may contain up to 4Gb of content.
4242
*/

test/src/com/rabbitmq/client/test/functional/BindingLifecycle.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131

3232
package com.rabbitmq.client.test.functional;
3333

34+
import com.rabbitmq.client.Channel;
35+
import com.rabbitmq.client.Connection;
3436
import com.rabbitmq.client.GetResponse;
3537
import com.rabbitmq.client.QueueingConsumer;
3638

@@ -53,6 +55,29 @@ public class BindingLifecycle extends PersisterRestartBase {
5355
protected static final String X = "X-" + System.currentTimeMillis();
5456
protected static final String K = "K-" + System.currentTimeMillis();
5557

58+
/**
59+
* Create a durable queue on secondary node, if possible, falling
60+
* back on the primary node if necessary.
61+
*/
62+
@Override protected void declareDurableQueue(String q)
63+
throws IOException
64+
{
65+
Connection connection;
66+
try {
67+
connection = connectionFactory.newConnection("localhost", 5673);
68+
} catch (IOException e) {
69+
super.declareDurableQueue(q);
70+
return;
71+
}
72+
73+
Channel channel = connection.createChannel();
74+
75+
channel.queueDeclare(q, true);
76+
77+
channel.abort();
78+
connection.abort();
79+
}
80+
5681
/**
5782
* Tests whether durable bindings are correctly recovered.
5883
*/
@@ -112,7 +137,7 @@ public void testDurableBindingsDeletion() throws IOException {
112137

113138

114139
/**
115-
* This tests whether the default bindings for persistent queues
140+
* This tests whether the default bindings for durable queues
116141
* are recovered properly.
117142
*
118143
* The idea is to create a durable queue, nuke the server and then
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2009 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2009 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
32+
package com.rabbitmq.client.test.functional;
33+
34+
import com.rabbitmq.client.AMQP;
35+
import com.rabbitmq.client.ShutdownSignalException;
36+
import java.io.IOException;
37+
38+
public class DoubleDeletion extends BrokerTestCase
39+
{
40+
protected static final String Q = "DoubleDeletionQueue";
41+
protected static final String X = "DoubleDeletionExchange";
42+
43+
public void testDoubleDeletionQueue()
44+
throws IOException
45+
{
46+
channel.queueDeclare(Q);
47+
channel.queueDelete(Q);
48+
try {
49+
channel.queueDelete(Q);
50+
fail("Expected exception from double deletion of queue");
51+
} catch (IOException ee) {
52+
checkShutdownSignal(AMQP.NOT_FOUND, ee);
53+
// Pass!
54+
}
55+
}
56+
57+
public void testDoubleDeletionExchange()
58+
throws IOException
59+
{
60+
channel.exchangeDeclare(X, "direct");
61+
channel.exchangeDelete(X);
62+
try {
63+
channel.exchangeDelete(X);
64+
fail("Expected exception from double deletion of exchange");
65+
} catch (IOException ee) {
66+
checkShutdownSignal(AMQP.NOT_FOUND, ee);
67+
// Pass!
68+
}
69+
}
70+
}

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
public class FunctionalTests extends TestCase {
3838
public static TestSuite suite() {
3939
TestSuite suite = new TestSuite("functional");
40+
suite.addTestSuite(DoubleDeletion.class);
4041
suite.addTestSuite(Routing.class);
4142
suite.addTestSuite(BindingLifecycle.class);
4243
suite.addTestSuite(Transactions.class);

test/src/com/rabbitmq/client/test/functional/PersisterRestartBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ protected void declareDurableQueue(String q)
8989
protected void declareAndBindDurableQueue(String q, String x, String r)
9090
throws IOException
9191
{
92-
channel.queueDeclare(q, true);
92+
declareDurableQueue(q);
9393
channel.queueBind(q, x, r);
9494
}
9595

0 commit comments

Comments
 (0)