Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.

Commit b0cea0d

Browse files
committed
Emit publish nack when losing connection
Only for messages that are not to be retried. Fixes #155 (cherry picked from commit 35d3e9d)
1 parent 2c22f22 commit b0cea0d

File tree

4 files changed

+79
-3
lines changed

4 files changed

+79
-3
lines changed

src/main/java/reactor/rabbitmq/ExceptionHandlers.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.rabbitmq.client.MissedHeartbeatException;
2020
import com.rabbitmq.client.ShutdownSignalException;
2121

22+
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
2223
import java.time.Duration;
2324
import java.util.Map;
2425
import java.util.concurrent.Callable;
@@ -38,7 +39,7 @@ public static class ConnectionRecoveryTriggeringPredicate implements Predicate<T
3839
public boolean test(Throwable throwable) {
3940
if (throwable instanceof ShutdownSignalException) {
4041
ShutdownSignalException sse = (ShutdownSignalException) throwable;
41-
return !sse.isInitiatedByApplication() || (sse.getCause() instanceof MissedHeartbeatException);
42+
return AutorecoveringConnection.DEFAULT_CONNECTION_RECOVERY_TRIGGERING_CONDITION.test(sse);
4243
}
4344
return false;
4445
}

src/main/java/reactor/rabbitmq/OutboundMessage.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ public class OutboundMessage {
3434

3535
private final byte[] body;
3636

37+
private volatile boolean published = false;
38+
3739
/**
3840
* Constructs a new message which is described by the body, the target exchange and the routing key which
3941
* can be used for smart routing after the message is published to the exchange.
@@ -109,4 +111,12 @@ public String toString() {
109111
", body=" + Arrays.toString(body) +
110112
'}';
111113
}
114+
115+
void published() {
116+
this.published = true;
117+
}
118+
119+
boolean isPublished() {
120+
return published;
121+
}
112122
}

src/main/java/reactor/rabbitmq/Sender.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.rabbitmq.client.*;
2020
import com.rabbitmq.client.impl.AMQImpl;
21+
import java.util.Map.Entry;
2122
import org.reactivestreams.Publisher;
2223
import org.reactivestreams.Subscriber;
2324
import org.reactivestreams.Subscription;
@@ -965,8 +966,22 @@ private void handleAckNack(long deliveryTag, boolean multiple, boolean ack) {
965966
};
966967
channel.addConfirmListener(confirmListener);
967968

968-
969969
this.shutdownListener = sse -> {
970+
// nack outstanding messages to warn downstream
971+
Iterator<Entry<Long, OMSG>> iterator = this.unconfirmed.entrySet()
972+
.iterator();
973+
while (iterator.hasNext()) {
974+
OMSG message = iterator.next().getValue();
975+
if (!message.isPublished()) {
976+
// we deal only with messages that won't be retried here
977+
try {
978+
subscriber.onNext(new OutboundMessageResult<>(message, false, false));
979+
iterator.remove();
980+
} catch (Exception e) {
981+
LOGGER.info("Error while nacking messages after channel failure");
982+
}
983+
}
984+
}
970985
// the server is closing the channel because of some error (e.g. exchange does not exist).
971986
// sending a signal downstream
972987
if (!sse.isHardError() && !sse.isInitiatedByApplication()) {
@@ -986,7 +1001,6 @@ public void onNext(OMSG message) {
9861001
if (checkComplete(message)) {
9871002
return;
9881003
}
989-
9901004
long nextPublishSeqNo = channel.getNextPublishSeqNo();
9911005
try {
9921006
unconfirmed.putIfAbsent(nextPublishSeqNo, message);
@@ -997,6 +1011,7 @@ public void onNext(OMSG message) {
9971011
this.propertiesProcessor.apply(message.getProperties(), nextPublishSeqNo),
9981012
message.getBody()
9991013
);
1014+
message.published();
10001015
} catch (Exception e) {
10011016
unconfirmed.remove(nextPublishSeqNo);
10021017
try {

src/test/java/reactor/rabbitmq/RabbitFluxTests.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -823,6 +823,56 @@ void publishConfirmsMaxInFlight() throws InterruptedException {
823823
assertThat(maxInflight.get()).isLessThanOrEqualTo(maxConcurrency);
824824
}
825825

826+
@Test
827+
public void publishConfirmsEmitNackForUnconfirmedMessagesOnConnectionFailure() throws Exception {
828+
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
829+
Connection mockConnection = mock(Connection.class);
830+
Channel mockChannel = mock(Channel.class);
831+
when(mockConnectionFactory.newConnection()).thenReturn(mockConnection);
832+
when(mockConnection.createChannel()).thenReturn(mockChannel);
833+
when(mockConnection.isOpen()).thenReturn(true);
834+
when(mockChannel.getConnection()).thenReturn(mockConnection);
835+
836+
AtomicLong publishSequence = new AtomicLong();
837+
when(mockChannel.getNextPublishSeqNo()).thenAnswer(invocation -> publishSequence.incrementAndGet());
838+
when(mockChannel.isOpen()).thenReturn(true);
839+
840+
CountDownLatch publishLatch = new CountDownLatch(1);
841+
doAnswer(answer -> {
842+
publishLatch.countDown();
843+
try {
844+
Thread.sleep(1000L);
845+
} catch (Exception e) {
846+
}
847+
return null;
848+
})
849+
.when(mockChannel).basicPublish(anyString(), anyString(), eq(false), nullable(AMQP.BasicProperties.class), any(byte[].class));
850+
851+
Flux<OutboundMessage> msgFlux = Flux.just(new OutboundMessage("", queue, "".getBytes()));
852+
CountDownLatch nackLatch = new CountDownLatch(1);
853+
sender = createSender(new SenderOptions().connectionFactory(mockConnectionFactory));
854+
sender
855+
.sendWithPublishConfirms(msgFlux)
856+
.subscribe(
857+
outboundMessageResult -> {
858+
if (!outboundMessageResult.isAck()) {
859+
nackLatch.countDown();
860+
}
861+
},
862+
error -> {
863+
});
864+
865+
assertThat(publishLatch.await(5, TimeUnit.SECONDS)).isTrue();
866+
867+
ArgumentCaptor<ShutdownListener> shutdownListenerArgumentCaptor = ArgumentCaptor.forClass(ShutdownListener.class);
868+
verify(mockChannel).addShutdownListener(shutdownListenerArgumentCaptor.capture());
869+
870+
ShutdownListener shutdownListener = shutdownListenerArgumentCaptor.getValue();
871+
shutdownListener.shutdownCompleted(new ShutdownSignalException(true, false, null, null));
872+
873+
assertThat(nackLatch.await(5, TimeUnit.SECONDS)).isTrue();
874+
}
875+
826876
@Test
827877
public void publishConfirmsErrorWhilePublishing() throws Exception {
828878
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);

0 commit comments

Comments
 (0)