Skip to content

Commit 8476560

Browse files
committed
basicCancel and basicConsume honor rpc timeout.
Also make sure to remove other e2e bindings for autodeleted exchanges
1 parent 971dbab commit 8476560

File tree

3 files changed

+57
-26
lines changed

3 files changed

+57
-26
lines changed

src/main/java/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
3939

4040
private static final Logger LOGGER = LoggerFactory.getLogger(AMQChannel.class);
4141

42-
private static final int NO_RPC_TIMEOUT = 0;
42+
protected static final int NO_RPC_TIMEOUT = 0;
4343

4444
/**
4545
* Protected; used instead of synchronizing on the channel itself,
@@ -64,7 +64,7 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
6464
public volatile boolean _blockContent = false;
6565

6666
/** Timeout for RPC calls */
67-
private final int _rpcTimeout;
67+
protected final int _rpcTimeout;
6868

6969
/**
7070
* Construct a channel on the given connection, with the given channel number.
@@ -243,24 +243,38 @@ private AMQCommand privateRpc(Method m)
243243
try {
244244
return k.getReply(_rpcTimeout);
245245
} catch (TimeoutException e) {
246-
try {
247-
// clean RPC channel state
248-
nextOutstandingRpc();
249-
markRpcFinished();
250-
} catch(Exception ex) {
251-
LOGGER.warn("Error while cleaning timed out channel RPC: {}", ex.getMessage());
252-
}
253-
throw new ChannelContinuationTimeoutException(e, this, this._channelNumber, m);
246+
throw wrapTimeoutException(m, e);
254247
}
255248
}
256249
}
250+
251+
private void cleanRpcChannelState() {
252+
try {
253+
// clean RPC channel state
254+
nextOutstandingRpc();
255+
markRpcFinished();
256+
} catch (Exception ex) {
257+
LOGGER.warn("Error while cleaning timed out channel RPC: {}", ex.getMessage());
258+
}
259+
}
260+
261+
/** Cleans RPC channel state after a timeout and wraps the TimeoutException in a ChannelContinuationTimeoutException */
262+
protected ChannelContinuationTimeoutException wrapTimeoutException(final Method m, final TimeoutException e) {
263+
cleanRpcChannelState();
264+
return new ChannelContinuationTimeoutException(e, this, this._channelNumber, m);
265+
}
257266

258267
private AMQCommand privateRpc(Method m, int timeout)
259268
throws IOException, ShutdownSignalException, TimeoutException {
260269
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();
261270
rpc(m, k);
262271

263-
return k.getReply(timeout);
272+
try {
273+
return k.getReply(timeout);
274+
} catch (TimeoutException e) {
275+
cleanRpcChannelState();
276+
throw e;
277+
}
264278
}
265279

266280
public void rpc(Method m, RpcContinuation k)

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1239,18 +1239,26 @@ public String transformReply(AMQCommand replyCommand) {
12391239
}
12401240
};
12411241

1242-
rpc(new Basic.Consume.Builder()
1243-
.queue(queue)
1244-
.consumerTag(consumerTag)
1245-
.noLocal(noLocal)
1246-
.noAck(autoAck)
1247-
.exclusive(exclusive)
1248-
.arguments(arguments)
1249-
.build(),
1250-
k);
1242+
final Method m = new Basic.Consume.Builder()
1243+
.queue(queue)
1244+
.consumerTag(consumerTag)
1245+
.noLocal(noLocal)
1246+
.noAck(autoAck)
1247+
.exclusive(exclusive)
1248+
.arguments(arguments)
1249+
.build();
1250+
rpc(m, k);
12511251

12521252
try {
1253-
return k.getReply();
1253+
if(_rpcTimeout == NO_RPC_TIMEOUT) {
1254+
return k.getReply();
1255+
} else {
1256+
try {
1257+
return k.getReply(_rpcTimeout);
1258+
} catch (TimeoutException e) {
1259+
throw wrapTimeoutException(m, e);
1260+
}
1261+
}
12541262
} catch(ShutdownSignalException ex) {
12551263
throw wrap(ex);
12561264
}
@@ -1267,17 +1275,26 @@ public void basicCancel(final String consumerTag)
12671275
BlockingRpcContinuation<Consumer> k = new BlockingRpcContinuation<Consumer>() {
12681276
@Override
12691277
public Consumer transformReply(AMQCommand replyCommand) {
1270-
replyCommand.getMethod();
1278+
((Basic.CancelOk) replyCommand.getMethod()).getConsumerTag(); // just to make sure its the method expected
12711279
_consumers.remove(consumerTag); //may already have been removed
12721280
dispatcher.handleCancelOk(originalConsumer, consumerTag);
12731281
return originalConsumer;
12741282
}
12751283
};
12761284

1277-
rpc(new Basic.Cancel(consumerTag, false), k);
1278-
1285+
final Method m = new Basic.Cancel(consumerTag, false);
1286+
rpc(m, k);
1287+
12791288
try {
1280-
k.getReply(); // discard result
1289+
if(_rpcTimeout == NO_RPC_TIMEOUT) {
1290+
k.getReply(); // discard result
1291+
} else {
1292+
try {
1293+
k.getReply(_rpcTimeout);
1294+
} catch (TimeoutException e) {
1295+
throw wrapTimeoutException(m, e);
1296+
}
1297+
}
12811298
} catch(ShutdownSignalException ex) {
12821299
throw wrap(ex);
12831300
}

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -805,7 +805,7 @@ void maybeDeleteRecordedAutoDeleteExchange(String exchange) {
805805
// last binding where this exchange is the source is gone, remove recorded exchange
806806
// if it is auto-deleted. See bug 26364.
807807
if((x != null) && x.isAutoDelete()) {
808-
this.recordedExchanges.remove(exchange);
808+
deleteRecordedExchange(exchange);
809809
}
810810
}
811811
}

0 commit comments

Comments
 (0)