2727import java .util .HashMap ;
2828import java .util .Map ;
2929import java .util .Map .Entry ;
30+ import java .util .concurrent .TimeoutException ;
3031
3132import com .rabbitmq .client .impl .MethodArgumentReader ;
3233import com .rabbitmq .client .impl .MethodArgumentWriter ;
@@ -45,6 +46,10 @@ public class RpcClient {
4546 private final String _exchange ;
4647 /** Routing key to use for requests */
4748 private final String _routingKey ;
49+ /** timeout to use on call responses */
50+ private final int _timeout ;
51+ /** NO_TIMEOUT value must match convention on {@link BlockingCell#uninterruptibleGet(int)} */
52+ protected final static int NO_TIMEOUT = -1 ;
4853
4954 /** Map from request correlation ID to continuation BlockingCell */
5055 private final Map <String , BlockingCell <Object >> _continuationMap = new HashMap <String , BlockingCell <Object >>();
@@ -59,24 +64,44 @@ public class RpcClient {
5964 /**
6065 * Construct a new RpcClient that will communicate on the given channel, sending
6166 * requests to the given exchange with the given routing key.
62- * <p>
67+ * <p/ >
6368 * Causes the creation of a temporary private autodelete queue.
6469 * @param channel the channel to use for communication
6570 * @param exchange the exchange to connect to
6671 * @param routingKey the routing key
72+ * @param timeout milliseconds before timing out on wait for response
6773 * @throws IOException if an error is encountered
6874 * @see #setupReplyQueue
6975 */
70- public RpcClient (Channel channel , String exchange , String routingKey ) throws IOException {
76+ public RpcClient (Channel channel , String exchange , String routingKey , int timeout ) throws IOException {
7177 _channel = channel ;
7278 _exchange = exchange ;
7379 _routingKey = routingKey ;
80+ if (timeout < NO_TIMEOUT ) throw new IllegalArgumentException ("Timeout arguument must be NO_TIMEOUT(-1) or non-negative." );
81+ _timeout = timeout ;
7482 _correlationId = 0 ;
7583
7684 _replyQueue = setupReplyQueue ();
7785 _consumer = setupConsumer ();
7886 }
7987
88+ /**
89+ * Construct a new RpcClient that will communicate on the given channel, sending
90+ * requests to the given exchange with the given routing key.
91+ * <p/>
92+ * Causes the creation of a temporary private autodelete queue.
93+ * <p/>
94+ * Waits forever for responses (that is, no timeout).
95+ * @param channel the channel to use for communication
96+ * @param exchange the exchange to connect to
97+ * @param routingKey the routing key
98+ * @throws IOException if an error is encountered
99+ * @see #setupReplyQueue
100+ */
101+ public RpcClient (Channel channel , String exchange , String routingKey ) throws IOException {
102+ this (channel , exchange , routingKey , NO_TIMEOUT );
103+ }
104+
80105 /**
81106 * Private API - ensures the RpcClient is correctly open.
82107 * @throws IOException if an error is encountered
@@ -151,7 +176,7 @@ public void publish(AMQP.BasicProperties props, byte[] message)
151176 }
152177
153178 public byte [] primitiveCall (AMQP .BasicProperties props , byte [] message )
154- throws IOException , ShutdownSignalException
179+ throws IOException , ShutdownSignalException , TimeoutException
155180 {
156181 checkConsumer ();
157182 BlockingCell <Object > k = new BlockingCell <Object >();
@@ -163,7 +188,7 @@ public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message)
163188 _continuationMap .put (replyId , k );
164189 }
165190 publish (props , message );
166- Object reply = k .uninterruptibleGet ();
191+ Object reply = k .uninterruptibleGet (_timeout );
167192 if (reply instanceof ShutdownSignalException ) {
168193 ShutdownSignalException sig = (ShutdownSignalException ) reply ;
169194 ShutdownSignalException wrapper =
@@ -184,9 +209,10 @@ public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message)
184209 * @return the byte array response received
185210 * @throws ShutdownSignalException if the connection dies during our wait
186211 * @throws IOException if an error is encountered
212+ * @throws TimeoutException if a response is not received within the configured timeout
187213 */
188214 public byte [] primitiveCall (byte [] message )
189- throws IOException , ShutdownSignalException {
215+ throws IOException , ShutdownSignalException , TimeoutException {
190216 return primitiveCall (null , message );
191217 }
192218
@@ -196,9 +222,10 @@ public byte[] primitiveCall(byte[] message)
196222 * @return the string response received
197223 * @throws ShutdownSignalException if the connection dies during our wait
198224 * @throws IOException if an error is encountered
225+ * @throws TimeoutException if a timeout occurs before the response is received
199226 */
200227 public String stringCall (String message )
201- throws IOException , ShutdownSignalException
228+ throws IOException , ShutdownSignalException , TimeoutException
202229 {
203230 return new String (primitiveCall (message .getBytes ()));
204231 }
@@ -214,9 +241,10 @@ public String stringCall(String message)
214241 * @return the table received
215242 * @throws ShutdownSignalException if the connection dies during our wait
216243 * @throws IOException if an error is encountered
244+ * @throws TimeoutException if a timeout occurs before a response is received
217245 */
218246 public Map <String , Object > mapCall (Map <String , Object > message )
219- throws IOException , ShutdownSignalException
247+ throws IOException , ShutdownSignalException , TimeoutException
220248 {
221249 ByteArrayOutputStream buffer = new ByteArrayOutputStream ();
222250 MethodArgumentWriter writer = new MethodArgumentWriter (new DataOutputStream (buffer ));
@@ -239,9 +267,10 @@ public Map<String, Object> mapCall(Map<String, Object> message)
239267 * @return the table received
240268 * @throws ShutdownSignalException if the connection dies during our wait
241269 * @throws IOException if an error is encountered
270+ * @throws TimeoutException if a timeout occurs before a response is received
242271 */
243272 public Map <String , Object > mapCall (Object [] keyValuePairs )
244- throws IOException , ShutdownSignalException
273+ throws IOException , ShutdownSignalException , TimeoutException
245274 {
246275 Map <String , Object > message = new HashMap <String , Object >();
247276 for (int i = 0 ; i < keyValuePairs .length ; i += 2 ) {
0 commit comments