Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.

Commit 35d3e9d

Browse files
committed
Emit publish nack when losing connection
Only for messages that are not to be retried. Fixes #155
1 parent 3500300 commit 35d3e9d

File tree

4 files changed

+79
-3
lines changed

4 files changed

+79
-3
lines changed

src/main/java/reactor/rabbitmq/ExceptionHandlers.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.rabbitmq.client.MissedHeartbeatException;
2020
import com.rabbitmq.client.ShutdownSignalException;
2121

22+
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
2223
import java.time.Duration;
2324
import java.util.Map;
2425
import java.util.concurrent.Callable;
@@ -38,7 +39,7 @@ public static class ConnectionRecoveryTriggeringPredicate implements Predicate<T
3839
public boolean test(Throwable throwable) {
3940
if (throwable instanceof ShutdownSignalException) {
4041
ShutdownSignalException sse = (ShutdownSignalException) throwable;
41-
return !sse.isInitiatedByApplication() || (sse.getCause() instanceof MissedHeartbeatException);
42+
return AutorecoveringConnection.DEFAULT_CONNECTION_RECOVERY_TRIGGERING_CONDITION.test(sse);
4243
}
4344
return false;
4445
}

src/main/java/reactor/rabbitmq/OutboundMessage.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ public class OutboundMessage {
3434

3535
private final byte[] body;
3636

37+
private volatile boolean published = false;
38+
3739
/**
3840
* Constructs a new message which is described by the body, the target exchange and the routing key which
3941
* can be used for smart routing after the message is published to the exchange.
@@ -109,4 +111,12 @@ public String toString() {
109111
", body=" + Arrays.toString(body) +
110112
'}';
111113
}
114+
115+
void published() {
116+
this.published = true;
117+
}
118+
119+
boolean isPublished() {
120+
return published;
121+
}
112122
}

src/main/java/reactor/rabbitmq/Sender.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.rabbitmq.client.*;
2020
import com.rabbitmq.client.impl.AMQImpl;
21+
import java.util.Map.Entry;
2122
import org.reactivestreams.Publisher;
2223
import org.reactivestreams.Subscriber;
2324
import org.reactivestreams.Subscription;
@@ -969,8 +970,22 @@ private void handleAckNack(long deliveryTag, boolean multiple, boolean ack) {
969970
};
970971
channel.addConfirmListener(confirmListener);
971972

972-
973973
this.shutdownListener = sse -> {
974+
// nack outstanding messages to warn downstream
975+
Iterator<Entry<Long, OMSG>> iterator = this.unconfirmed.entrySet()
976+
.iterator();
977+
while (iterator.hasNext()) {
978+
OMSG message = iterator.next().getValue();
979+
if (!message.isPublished()) {
980+
// we deal only with messages that won't be retried here
981+
try {
982+
subscriber.onNext(new OutboundMessageResult<>(message, false, false));
983+
iterator.remove();
984+
} catch (Exception e) {
985+
LOGGER.info("Error while nacking messages after channel failure");
986+
}
987+
}
988+
}
974989
// the server is closing the channel because of some error (e.g. exchange does not exist).
975990
// sending a signal downstream
976991
if (!sse.isHardError() && !sse.isInitiatedByApplication()) {
@@ -990,7 +1005,6 @@ public void onNext(OMSG message) {
9901005
if (checkComplete(message)) {
9911006
return;
9921007
}
993-
9941008
long nextPublishSeqNo = channel.getNextPublishSeqNo();
9951009
try {
9961010
unconfirmed.putIfAbsent(nextPublishSeqNo, message);
@@ -1001,6 +1015,7 @@ public void onNext(OMSG message) {
10011015
this.propertiesProcessor.apply(message.getProperties(), nextPublishSeqNo),
10021016
message.getBody()
10031017
);
1018+
message.published();
10041019
} catch (Exception e) {
10051020
unconfirmed.remove(nextPublishSeqNo);
10061021
try {

src/test/java/reactor/rabbitmq/RabbitFluxTests.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -823,6 +823,56 @@ void publishConfirmsMaxInFlight() throws InterruptedException {
823823
assertThat(maxInflight.get()).isLessThanOrEqualTo(maxConcurrency);
824824
}
825825

826+
@Test
827+
public void publishConfirmsEmitNackForUnconfirmedMessagesOnConnectionFailure() throws Exception {
828+
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
829+
Connection mockConnection = mock(Connection.class);
830+
Channel mockChannel = mock(Channel.class);
831+
when(mockConnectionFactory.newConnection()).thenReturn(mockConnection);
832+
when(mockConnection.createChannel()).thenReturn(mockChannel);
833+
when(mockConnection.isOpen()).thenReturn(true);
834+
when(mockChannel.getConnection()).thenReturn(mockConnection);
835+
836+
AtomicLong publishSequence = new AtomicLong();
837+
when(mockChannel.getNextPublishSeqNo()).thenAnswer(invocation -> publishSequence.incrementAndGet());
838+
when(mockChannel.isOpen()).thenReturn(true);
839+
840+
CountDownLatch publishLatch = new CountDownLatch(1);
841+
doAnswer(answer -> {
842+
publishLatch.countDown();
843+
try {
844+
Thread.sleep(1000L);
845+
} catch (Exception e) {
846+
}
847+
return null;
848+
})
849+
.when(mockChannel).basicPublish(anyString(), anyString(), eq(false), nullable(AMQP.BasicProperties.class), any(byte[].class));
850+
851+
Flux<OutboundMessage> msgFlux = Flux.just(new OutboundMessage("", queue, "".getBytes()));
852+
CountDownLatch nackLatch = new CountDownLatch(1);
853+
sender = createSender(new SenderOptions().connectionFactory(mockConnectionFactory));
854+
sender
855+
.sendWithPublishConfirms(msgFlux)
856+
.subscribe(
857+
outboundMessageResult -> {
858+
if (!outboundMessageResult.isAck()) {
859+
nackLatch.countDown();
860+
}
861+
},
862+
error -> {
863+
});
864+
865+
assertThat(publishLatch.await(5, TimeUnit.SECONDS)).isTrue();
866+
867+
ArgumentCaptor<ShutdownListener> shutdownListenerArgumentCaptor = ArgumentCaptor.forClass(ShutdownListener.class);
868+
verify(mockChannel).addShutdownListener(shutdownListenerArgumentCaptor.capture());
869+
870+
ShutdownListener shutdownListener = shutdownListenerArgumentCaptor.getValue();
871+
shutdownListener.shutdownCompleted(new ShutdownSignalException(true, false, null, null));
872+
873+
assertThat(nackLatch.await(5, TimeUnit.SECONDS)).isTrue();
874+
}
875+
826876
@Test
827877
public void publishConfirmsErrorWhilePublishing() throws Exception {
828878
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);

0 commit comments

Comments
 (0)