Skip to content

Commit 5801452

Browse files
author
Simon MacMullen
committed
Merge bug24229 to default
2 parents 74f47d0 + e8437e4 commit 5801452

File tree

15 files changed

+440
-562
lines changed

15 files changed

+440
-562
lines changed

codegen.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,9 @@ def printSetter(fieldType, fieldName):
370370

371371
print " }"
372372

373+
# default constructor
374+
print " public %sProperties() {}" % (jClassName)
375+
373376
#class properties
374377
print " public int getClassId() { return %i; }" % (c.index)
375378
print " public String getClassName() { return \"%s\"; }" % (c.name)

src/com/rabbitmq/client/RpcClient.java

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.HashMap;
2828
import java.util.Map;
2929
import java.util.Map.Entry;
30+
import java.util.concurrent.TimeoutException;
3031

3132
import com.rabbitmq.client.impl.MethodArgumentReader;
3233
import com.rabbitmq.client.impl.MethodArgumentWriter;
@@ -45,6 +46,10 @@ public class RpcClient {
4546
private final String _exchange;
4647
/** Routing key to use for requests */
4748
private final String _routingKey;
49+
/** timeout to use on call responses */
50+
private final int _timeout;
51+
/** NO_TIMEOUT value must match convention on {@link BlockingCell#uninterruptibleGet(int)} */
52+
protected final static int NO_TIMEOUT = -1;
4853

4954
/** Map from request correlation ID to continuation BlockingCell */
5055
private final Map<String, BlockingCell<Object>> _continuationMap = new HashMap<String, BlockingCell<Object>>();
@@ -59,24 +64,44 @@ public class RpcClient {
5964
/**
6065
* Construct a new RpcClient that will communicate on the given channel, sending
6166
* requests to the given exchange with the given routing key.
62-
* <p>
67+
* <p/>
6368
* Causes the creation of a temporary private autodelete queue.
6469
* @param channel the channel to use for communication
6570
* @param exchange the exchange to connect to
6671
* @param routingKey the routing key
72+
* @param timeout milliseconds before timing out on wait for response
6773
* @throws IOException if an error is encountered
6874
* @see #setupReplyQueue
6975
*/
70-
public RpcClient(Channel channel, String exchange, String routingKey) throws IOException {
76+
public RpcClient(Channel channel, String exchange, String routingKey, int timeout) throws IOException {
7177
_channel = channel;
7278
_exchange = exchange;
7379
_routingKey = routingKey;
80+
if (timeout < NO_TIMEOUT) throw new IllegalArgumentException("Timeout arguument must be NO_TIMEOUT(-1) or non-negative.");
81+
_timeout = timeout;
7482
_correlationId = 0;
7583

7684
_replyQueue = setupReplyQueue();
7785
_consumer = setupConsumer();
7886
}
7987

88+
/**
89+
* Construct a new RpcClient that will communicate on the given channel, sending
90+
* requests to the given exchange with the given routing key.
91+
* <p/>
92+
* Causes the creation of a temporary private autodelete queue.
93+
* <p/>
94+
* Waits forever for responses (that is, no timeout).
95+
* @param channel the channel to use for communication
96+
* @param exchange the exchange to connect to
97+
* @param routingKey the routing key
98+
* @throws IOException if an error is encountered
99+
* @see #setupReplyQueue
100+
*/
101+
public RpcClient(Channel channel, String exchange, String routingKey) throws IOException {
102+
this(channel, exchange, routingKey, NO_TIMEOUT);
103+
}
104+
80105
/**
81106
* Private API - ensures the RpcClient is correctly open.
82107
* @throws IOException if an error is encountered
@@ -151,7 +176,7 @@ public void publish(AMQP.BasicProperties props, byte[] message)
151176
}
152177

153178
public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message)
154-
throws IOException, ShutdownSignalException
179+
throws IOException, ShutdownSignalException, TimeoutException
155180
{
156181
checkConsumer();
157182
BlockingCell<Object> k = new BlockingCell<Object>();
@@ -163,7 +188,7 @@ public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message)
163188
_continuationMap.put(replyId, k);
164189
}
165190
publish(props, message);
166-
Object reply = k.uninterruptibleGet();
191+
Object reply = k.uninterruptibleGet(_timeout);
167192
if (reply instanceof ShutdownSignalException) {
168193
ShutdownSignalException sig = (ShutdownSignalException) reply;
169194
ShutdownSignalException wrapper =
@@ -184,9 +209,10 @@ public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message)
184209
* @return the byte array response received
185210
* @throws ShutdownSignalException if the connection dies during our wait
186211
* @throws IOException if an error is encountered
212+
* @throws TimeoutException if a response is not received within the configured timeout
187213
*/
188214
public byte[] primitiveCall(byte[] message)
189-
throws IOException, ShutdownSignalException {
215+
throws IOException, ShutdownSignalException, TimeoutException {
190216
return primitiveCall(null, message);
191217
}
192218

@@ -196,9 +222,10 @@ public byte[] primitiveCall(byte[] message)
196222
* @return the string response received
197223
* @throws ShutdownSignalException if the connection dies during our wait
198224
* @throws IOException if an error is encountered
225+
* @throws TimeoutException if a timeout occurs before the response is received
199226
*/
200227
public String stringCall(String message)
201-
throws IOException, ShutdownSignalException
228+
throws IOException, ShutdownSignalException, TimeoutException
202229
{
203230
return new String(primitiveCall(message.getBytes()));
204231
}
@@ -214,9 +241,10 @@ public String stringCall(String message)
214241
* @return the table received
215242
* @throws ShutdownSignalException if the connection dies during our wait
216243
* @throws IOException if an error is encountered
244+
* @throws TimeoutException if a timeout occurs before a response is received
217245
*/
218246
public Map<String, Object> mapCall(Map<String, Object> message)
219-
throws IOException, ShutdownSignalException
247+
throws IOException, ShutdownSignalException, TimeoutException
220248
{
221249
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
222250
MethodArgumentWriter writer = new MethodArgumentWriter(new DataOutputStream(buffer));
@@ -239,9 +267,10 @@ public Map<String, Object> mapCall(Map<String, Object> message)
239267
* @return the table received
240268
* @throws ShutdownSignalException if the connection dies during our wait
241269
* @throws IOException if an error is encountered
270+
* @throws TimeoutException if a timeout occurs before a response is received
242271
*/
243272
public Map<String, Object> mapCall(Object[] keyValuePairs)
244-
throws IOException, ShutdownSignalException
273+
throws IOException, ShutdownSignalException, TimeoutException
245274
{
246275
Map<String, Object> message = new HashMap<String, Object>();
247276
for (int i = 0; i < keyValuePairs.length; i += 2) {

src/com/rabbitmq/tools/jsonrpc/JsonRpcClient.java

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.lang.reflect.Proxy;
2323
import java.util.HashMap;
2424
import java.util.Map;
25+
import java.util.concurrent.TimeoutException;
2526

2627
import com.rabbitmq.client.Channel;
2728
import com.rabbitmq.client.RpcClient;
@@ -57,20 +58,27 @@
5758
*/
5859
public class JsonRpcClient extends RpcClient implements InvocationHandler {
5960
/** Holds the JSON-RPC service description for this client. */
60-
public ServiceDescription serviceDescription;
61+
private ServiceDescription serviceDescription;
6162

6263
/**
6364
* Construct a new JsonRpcClient, passing the parameters through
6465
* to RpcClient's constructor. The service description record is
6566
* retrieved from the server during construction.
67+
* @throws TimeoutException if a response is not received within the timeout specified, if any
6668
*/
67-
public JsonRpcClient(Channel channel, String exchange, String routingKey)
68-
throws IOException, JsonRpcException
69+
public JsonRpcClient(Channel channel, String exchange, String routingKey, int timeout)
70+
throws IOException, JsonRpcException, TimeoutException
6971
{
70-
super(channel, exchange, routingKey);
72+
super(channel, exchange, routingKey, timeout);
7173
retrieveServiceDescription();
7274
}
7375

76+
public JsonRpcClient(Channel channel, String exchange, String routingKey)
77+
throws IOException, JsonRpcException, TimeoutException
78+
{
79+
this(channel, exchange, routingKey, RpcClient.NO_TIMEOUT);
80+
}
81+
7482
/**
7583
* Private API - parses a JSON-RPC reply object, checking it for exceptions.
7684
* @return the result contained within the reply, if no exception is found
@@ -96,27 +104,25 @@ public static Object checkReply(Map<String, Object> reply)
96104
* waits for the response.
97105
* @return the result contained within the reply, if no exception is found
98106
* @throws JsonRpcException if the reply object contained an exception
107+
* @throws TimeoutException if a response is not received within the timeout specified, if any
99108
*/
100-
public Object call(String method, Object[] params)
101-
throws IOException, JsonRpcException
109+
public Object call(String method, Object[] params) throws IOException, JsonRpcException, TimeoutException
102110
{
103-
HashMap<String, Object> request = new HashMap<String, Object>();
104-
request.put("id", null);
105-
request.put("method", method);
106-
request.put("version", ServiceDescription.JSON_RPC_VERSION);
107-
request.put("params", (params == null) ? new Object[0] : params);
111+
HashMap<String, Object> request = new HashMap<String, Object>();
112+
request.put("id", null);
113+
request.put("method", method);
114+
request.put("version", ServiceDescription.JSON_RPC_VERSION);
115+
request.put("params", (params == null) ? new Object[0] : params);
108116
String requestStr = new JSONWriter().write(request);
109-
String replyStr;
110117
try {
111-
replyStr = this.stringCall(requestStr);
118+
String replyStr = this.stringCall(requestStr);
119+
@SuppressWarnings("unchecked")
120+
Map<String, Object> map = (Map<String, Object>) (new JSONReader().read(replyStr));
121+
return checkReply(map);
112122
} catch(ShutdownSignalException ex) {
113123
throw new IOException(ex.getMessage()); // wrap, re-throw
114124
}
115125

116-
//System.out.println(requestStr + " --->\n---> " + replyStr);
117-
@SuppressWarnings("unchecked")
118-
Map<String, Object> map = (Map<String, Object>) (new JSONReader().read(replyStr));
119-
return checkReply(map);
120126
}
121127

122128
/**
@@ -176,10 +182,11 @@ public static Object coerce(String val, String type)
176182
* @return the result contained within the reply, if no exception is found
177183
* @throws JsonRpcException if the reply object contained an exception
178184
* @throws NumberFormatException if a coercion failed
185+
* @throws TimeoutException if a response is not received within the timeout specified, if any
179186
* @see #coerce
180187
*/
181188
public Object call(String[] args)
182-
throws NumberFormatException, IOException, JsonRpcException
189+
throws NumberFormatException, IOException, JsonRpcException, TimeoutException
183190
{
184191
if (args.length == 0) {
185192
throw new IllegalArgumentException("First string argument must be method name");
@@ -210,13 +217,13 @@ public ServiceDescription getServiceDescription() {
210217
* Private API - invokes the "system.describe" method on the
211218
* server, and parses and stores the resulting service description
212219
* in this object.
220+
* TODO: Avoid calling this from the constructor.
221+
* @throws TimeoutException if a response is not received within the timeout specified, if any
213222
*/
214-
public void retrieveServiceDescription()
215-
throws IOException, JsonRpcException
223+
private void retrieveServiceDescription() throws IOException, JsonRpcException, TimeoutException
216224
{
217-
@SuppressWarnings("unchecked")
218-
Map<String, Object> rawServiceDescription = (Map<String, Object>) call("system.describe", null);
219-
//System.out.println(new JSONWriter().write(rawServiceDescription));
220-
this.serviceDescription = new ServiceDescription(rawServiceDescription);
225+
@SuppressWarnings("unchecked")
226+
Map<String, Object> rawServiceDescription = (Map<String, Object>) call("system.describe", null);
227+
serviceDescription = new ServiceDescription(rawServiceDescription);
221228
}
222229
}

test/src/com/rabbitmq/client/test/functional/AlternateExchange.java

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -111,16 +111,6 @@ protected void cleanup() throws IOException {
111111
}
112112
}
113113

114-
protected void publish(String key, boolean mandatory, boolean immediate)
115-
throws IOException {
116-
channel.basicPublish("x", key, mandatory, immediate, null,
117-
"ae-test".getBytes());
118-
}
119-
120-
protected void publish(String key) throws IOException {
121-
publish(key, false, false);
122-
}
123-
124114
/**
125115
* Perform an auto-acking 'basic.get' on each of the queues named
126116
* in {@link #resources} and check whether a message can be
@@ -161,7 +151,8 @@ protected void check(String key, boolean mandatory, boolean immediate,
161151
throws IOException {
162152

163153
gotReturn.set(false);
164-
publish(key, mandatory, immediate);
154+
channel.basicPublish("x", key, mandatory, immediate, null,
155+
"ae-test".getBytes());
165156
checkGet(expected);
166157
assertEquals(ret, gotReturn.get());
167158
}
@@ -212,19 +203,6 @@ public void testAe() throws IOException {
212203
check(k, false, true, unrouted, true);
213204
}
214205

215-
//tx
216-
channel.txSelect();
217-
for (String k : keys) {
218-
publish(k);
219-
checkGet(unrouted);
220-
channel.txRollback();
221-
checkGet(unrouted);
222-
publish(k);
223-
checkGet(unrouted);
224-
channel.txCommit();
225-
checkGet(expected(k));
226-
}
227-
228206
cleanup();
229207
}
230208

0 commit comments

Comments
 (0)