From e1a930b848ed18771126bdf2dfe4ebe931f5d7d1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?=
<514737+acogoluegnes@users.noreply.github.com>
Date: Tue, 20 May 2025 15:35:01 +0200
Subject: [PATCH 1/2] Make message durability configurable
Outbound messages are marked as durable by default. They are marked
non-durable only if explicitly set.
---
.../com/rabbitmq/client/amqp/Message.java | 13 ++++
.../client/amqp/impl/AmqpMessage.java | 17 +++++
.../client/amqp/impl/AmqpPublisher.java | 9 ++-
.../client/amqp/impl/AmqpMessageTest.java | 23 +++++-
.../rabbitmq/client/amqp/impl/AmqpTest.java | 73 +++++++++++++++++++
.../rabbitmq/client/amqp/impl/Assertions.java | 7 ++
6 files changed, 137 insertions(+), 5 deletions(-)
diff --git a/src/main/java/com/rabbitmq/client/amqp/Message.java b/src/main/java/com/rabbitmq/client/amqp/Message.java
index 75ca29b49..3792a4f86 100644
--- a/src/main/java/com/rabbitmq/client/amqp/Message.java
+++ b/src/main/java/com/rabbitmq/client/amqp/Message.java
@@ -591,6 +591,19 @@ public interface Message {
*/
byte[] body();
+ /**
+ * Mark the message as durable or not.
+ *
+ *
Messages are durable by default, use false to make them explicitly non-durable.
+ *
+ *
Durability depends also on the queue messages end up in (e.g. quorum queues and streams
+ * always store messages durably).
+ *
+ * @param durable true for a durable message, false for a non-durable message
+ * @return the message
+ */
+ Message durable(boolean durable);
+
/**
* Whether the message is durable.
*
diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java
index 988f7cab9..17c52f78b 100644
--- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java
+++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java
@@ -35,6 +35,8 @@ final class AmqpMessage implements Message {
private final org.apache.qpid.protonj2.client.Message delegate;
+ private boolean durableIsSet = false;
+
AmqpMessage() {
this(org.apache.qpid.protonj2.client.Message.create(EMPTY_BODY));
}
@@ -455,6 +457,14 @@ public byte[] body() {
}
// header section
+
+ @Override
+ public Message durable(boolean durable) {
+ this.durableIsSet = true;
+ callOnDelegate(m -> m.durable(durable));
+ return this;
+ }
+
@Override
public boolean durable() {
return returnFromDelegate(org.apache.qpid.protonj2.client.Message::durable);
@@ -535,6 +545,13 @@ public MessageAddressBuilder replyToAddress() {
return new DefaultMessageAddressBuilder(this, DefaultMessageAddressBuilder.REPLY_TO_CALLBACK);
}
+ AmqpMessage enforceDurability() throws ClientException {
+ if (!this.durableIsSet) {
+ this.delegate.durable(true);
+ }
+ return this;
+ }
+
private static class DefaultMessageAddressBuilder
extends DefaultAddressBuilder implements MessageAddressBuilder {
diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java
index e8c72b202..91c717819 100644
--- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java
+++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java
@@ -86,9 +86,7 @@ final class AmqpPublisher extends ResourceBase implements Publisher {
this.publishCall =
msg -> {
try {
- org.apache.qpid.protonj2.client.Message> nativeMessage =
- ((AmqpMessage) msg).nativeMessage();
- return this.sender.send(nativeMessage.durable(true));
+ return this.doSend((AmqpMessage) msg);
} catch (ClientIllegalStateException e) {
// the link is closed
LOGGER.debug("Error while publishing: '{}'. Closing publisher.", e.getMessage());
@@ -154,6 +152,11 @@ private Status mapDeliveryState(DeliveryState in) {
}
}
+ private Tracker doSend(AmqpMessage msg) throws ClientException {
+ msg.enforceDurability();
+ return this.sender.send(msg.nativeMessage());
+ }
+
private static MetricsCollector.PublishDisposition mapToPublishDisposition(Status status) {
if (status == Status.ACCEPTED) {
return MetricsCollector.PublishDisposition.ACCEPTED;
diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpMessageTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpMessageTest.java
index b054204a6..7e18266a4 100644
--- a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpMessageTest.java
+++ b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpMessageTest.java
@@ -25,13 +25,32 @@ public class AmqpMessageTest {
@Test
void toShouldBePathEncoded() {
- assertThat(new AmqpMessage().toAddress().exchange("foo bar").message().to())
+ assertThat(msg().toAddress().exchange("foo bar").message().to())
.isEqualTo("/exchanges/foo%20bar");
}
@Test
void replyToShouldBePathEncoded() {
- assertThat(new AmqpMessage().replyToAddress().exchange("foo bar").message().replyTo())
+ assertThat(msg().replyToAddress().exchange("foo bar").message().replyTo())
.isEqualTo("/exchanges/foo%20bar");
}
+
+ @Test
+ void shouldBeNonDurableOnlyIfExplicitlySet() throws Exception {
+ AmqpMessage msg = msg();
+ // durable by default
+ assertThat(msg.enforceDurability().nativeMessage().durable()).isTrue();
+ // non-durable explicitly set
+ msg = msg();
+ msg.durable(false);
+ assertThat(msg.enforceDurability().nativeMessage().durable()).isFalse();
+ // durable explicitly set
+ msg = msg();
+ msg.durable(true);
+ assertThat(msg.enforceDurability().nativeMessage().durable()).isTrue();
+ }
+
+ private static AmqpMessage msg() {
+ return new AmqpMessage();
+ }
}
diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java
index 6e50c324f..d51407fde 100644
--- a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java
+++ b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java
@@ -36,6 +36,7 @@
import com.rabbitmq.client.amqp.AmqpException;
import com.rabbitmq.client.amqp.Connection;
+import com.rabbitmq.client.amqp.ConsumerBuilder;
import com.rabbitmq.client.amqp.Environment;
import com.rabbitmq.client.amqp.Management;
import com.rabbitmq.client.amqp.Message;
@@ -55,6 +56,7 @@
import org.junit.jupiter.api.*;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
@AmqpTestInfrastructure
@@ -906,4 +908,75 @@ void messageAnnotationsSupportListMapArray() {
ints = (int[]) m.annotation("x-arrayInt");
org.assertj.core.api.Assertions.assertThat(ints).containsExactly(4, 5, 6);
}
+
+ @ParameterizedTest
+ @CsvSource({
+ "CLASSIC,true",
+ "CLASSIC,false",
+ "QUORUM,true",
+ "QUORUM,false",
+ "STREAM,true",
+ "STREAM,false"
+ })
+ void explicitDurabilityShouldBeEnforced(Management.QueueType type, boolean durable) {
+ try {
+ connection.management().queue(this.name).type(type).declare();
+ Publisher p = connection.publisherBuilder().queue(this.name).build();
+ p.publish(p.message().durable(durable), ctx -> {});
+
+ Sync consumeSync = sync();
+ AtomicReference messageRef = new AtomicReference<>();
+ ConsumerBuilder builder =
+ connection
+ .consumerBuilder()
+ .queue(this.name)
+ .messageHandler(
+ (context, message) -> {
+ messageRef.set(message);
+ context.accept();
+ consumeSync.down();
+ });
+ if (type == STREAM) {
+ builder.stream().offset(ConsumerBuilder.StreamOffsetSpecification.FIRST);
+ }
+ builder.build();
+ assertThat(consumeSync).completes();
+ Message message = messageRef.get();
+ assertThat(message).isDurable(durable);
+ } finally {
+ connection.management().queueDelete(this.name);
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(Management.QueueType.class)
+ void durableByDefault(Management.QueueType type) {
+ try {
+ connection.management().queue(this.name).type(type).declare();
+ Publisher p = connection.publisherBuilder().queue(this.name).build();
+ p.publish(p.message(), ctx -> {});
+
+ Sync consumeSync = sync();
+ AtomicReference messageRef = new AtomicReference<>();
+ ConsumerBuilder builder =
+ connection
+ .consumerBuilder()
+ .queue(this.name)
+ .messageHandler(
+ (context, message) -> {
+ messageRef.set(message);
+ context.accept();
+ consumeSync.down();
+ });
+ if (type == STREAM) {
+ builder.stream().offset(ConsumerBuilder.StreamOffsetSpecification.FIRST);
+ }
+ builder.build();
+ assertThat(consumeSync).completes();
+ Message message = messageRef.get();
+ assertThat(message).isDurable(true);
+ } finally {
+ connection.management().queueDelete(this.name);
+ }
+ }
}
diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java b/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java
index bf9d1b94b..de7616def 100644
--- a/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java
+++ b/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java
@@ -359,6 +359,13 @@ private MessageAssert hasField(String fieldLabel, Object value, Object expected)
.isEqualTo(expected);
return this;
}
+
+ void isDurable(boolean durable) {
+ isNotNull();
+ if (actual.durable() != durable) {
+ fail("Message durable flag should be %s but is %s", durable, actual.durable());
+ }
+ }
}
static class ConnectionAssert extends AbstractObjectAssert {
From 38572f84ed6d09aa372936e9c5808e59735f3196 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?=
<514737+acogoluegnes@users.noreply.github.com>
Date: Tue, 20 May 2025 16:25:52 +0200
Subject: [PATCH 2/2] Restrict test to RabbitMQ 4.2+
---
src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java | 2 ++
.../java/com/rabbitmq/client/amqp/impl/TestConditions.java | 3 ++-
2 files changed, 4 insertions(+), 1 deletion(-)
diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java
index d51407fde..55a18e669 100644
--- a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java
+++ b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java
@@ -23,6 +23,7 @@
import static com.rabbitmq.client.amqp.Management.QueueType.STREAM;
import static com.rabbitmq.client.amqp.impl.Assertions.assertThat;
import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_0_3;
+import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_2_0;
import static com.rabbitmq.client.amqp.impl.TestUtils.*;
import static com.rabbitmq.client.amqp.impl.Utils.threadFactory;
import static java.nio.charset.StandardCharsets.*;
@@ -918,6 +919,7 @@ void messageAnnotationsSupportListMapArray() {
"STREAM,true",
"STREAM,false"
})
+ @BrokerVersionAtLeast(RABBITMQ_4_2_0)
void explicitDurabilityShouldBeEnforced(Management.QueueType type, boolean durable) {
try {
connection.management().queue(this.name).type(type).declare();
diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java b/src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java
index 017dda68d..9db064c3e 100644
--- a/src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java
+++ b/src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java
@@ -35,7 +35,8 @@ private TestConditions() {}
public enum BrokerVersion {
RABBITMQ_4_0_3("4.0.3"),
- RABBITMQ_4_1_0("4.1.0");
+ RABBITMQ_4_1_0("4.1.0"),
+ RABBITMQ_4_2_0("4.2.0");
final String value;