Skip to content

Commit c582e1a

Browse files
garyrussellartembilan
authored andcommitted
GH-849: Pub. Confirm/Return defensive code
Resolves #849 Add defensive catch blocks; we currently catch exceptions in user `handleConfirm` calls, but not for returns, or for the code that determines which listener to call. Throwing exceptions to the client causes the channel to be killed. # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/PublisherCallbackChannelImpl.java
1 parent 1317215 commit c582e1a

File tree

1 file changed

+30
-8
lines changed

1 file changed

+30
-8
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/PublisherCallbackChannelImpl.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -873,6 +873,15 @@ public void handleNack(long seq, boolean multiple)
873873
}
874874

875875
private synchronized void processAck(long seq, boolean ack, boolean multiple, boolean remove) {
876+
try {
877+
doProcessAck(seq, ack, multiple, remove);
878+
}
879+
catch (Exception e) {
880+
this.logger.error("Failed to process publisher confirm", e);
881+
}
882+
}
883+
884+
private void doProcessAck(long seq, boolean ack, boolean multiple, boolean remove) {
876885
if (multiple) {
877886
/*
878887
* Piggy-backed ack - extract all Listeners for this and earlier
@@ -905,12 +914,14 @@ private synchronized void processAck(long seq, boolean ack, boolean multiple, bo
905914
Listener listener = this.listenerForSeq.remove(seq);
906915
if (listener != null) {
907916
SortedMap<Long, PendingConfirm> confirmsForListener = this.pendingConfirms.get(listener);
908-
PendingConfirm pendingConfirm;
909-
if (remove) {
910-
pendingConfirm = confirmsForListener.remove(seq);
911-
}
912-
else {
913-
pendingConfirm = confirmsForListener.get(seq);
917+
PendingConfirm pendingConfirm = null;
918+
if (confirmsForListener != null) { // should never happen; defensive
919+
if (remove) {
920+
pendingConfirm = confirmsForListener.remove(seq);
921+
}
922+
else {
923+
pendingConfirm = confirmsForListener.get(seq);
924+
}
914925
}
915926
if (pendingConfirm != null) {
916927
doHandleConfirm(ack, listener, pendingConfirm);
@@ -957,14 +968,25 @@ public void handleReturn(int replyCode,
957968
AMQP.BasicProperties properties,
958969
byte[] body) throws IOException {
959970
String uuidObject = properties.getHeaders().get(RETURN_CORRELATION_KEY).toString();
960-
Listener listener = this.listeners.get(uuidObject);
971+
Listener listener = null;
972+
if (uuidObject != null) {
973+
listener = this.listeners.get(uuidObject);
974+
}
975+
else {
976+
this.logger.error("No 'spring_listener_return_correlation' header in returned message");
977+
}
961978
if (listener == null || !listener.isReturnListener()) {
962979
if (this.logger.isWarnEnabled()) {
963980
this.logger.warn("No Listener for returned message");
964981
}
965982
}
966983
else {
967-
listener.handleReturn(replyCode, replyText, exchange, routingKey, properties, body);
984+
try {
985+
listener.handleReturn(replyCode, replyText, exchange, routingKey, properties, body);
986+
}
987+
catch (Exception e) {
988+
this.logger.error("Exception delivering returned message ", e);
989+
}
968990
}
969991
}
970992

0 commit comments

Comments
 (0)