diff --git a/RabbitMQ.AMQP.Client/IMessage.cs b/RabbitMQ.AMQP.Client/IMessage.cs index 1a459e1..ec4ac3f 100644 --- a/RabbitMQ.AMQP.Client/IMessage.cs +++ b/RabbitMQ.AMQP.Client/IMessage.cs @@ -102,6 +102,10 @@ public interface IMessage public IMessage Body(object body); + public IMessage Durable(bool durable); + + public bool Durable(); + IMessageAddressBuilder ToAddress(); } } diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs b/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs index 630ac3a..6f01ca1 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs @@ -19,19 +19,16 @@ public class AmqpMessage : IMessage { public Message NativeMessage { get; } - public AmqpMessage() - { - NativeMessage = new Message(); - } - /// /// Create a message with a body of type byte[] and BodySection of type Data. + /// Durable is set to true by default. /// /// public AmqpMessage(byte[] body) { NativeMessage = new Message(); NativeMessage.BodySection = new Data { Binary = body }; + Durable(true); } /// @@ -42,6 +39,7 @@ public AmqpMessage(string body) { NativeMessage = new Message(); NativeMessage.BodySection = new Data() { Binary = System.Text.Encoding.UTF8.GetBytes(body) }; + Durable(true); } public AmqpMessage(Message nativeMessage) @@ -275,7 +273,6 @@ public string BodyAsString() { throw new InvalidOperationException("Body is not an Application Data"); } - } public IMessage Body(object body) @@ -298,6 +295,18 @@ public IMessage Body(object body) return this; } + public IMessage Durable(bool durable) + { + EnsureHeader(); + NativeMessage.Header.Durable = durable; + return this; + } + + public bool Durable() + { + return NativeMessage.Header.Durable; + } + public IMessageAddressBuilder ToAddress() { return new MessageAddressBuilder(this); @@ -329,6 +338,11 @@ private void EnsureAnnotations() NativeMessage.MessageAnnotations ??= new MessageAnnotations(); } + private void EnsureHeader() + { + NativeMessage.Header ??= new Header(); + } + private void ThrowIfApplicationPropertiesNotSet() { if (NativeMessage.ApplicationProperties == null) diff --git a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt index 5f110a5..79912fa 100644 --- a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt @@ -256,6 +256,8 @@ RabbitMQ.AMQP.Client.IMessage.CorrelationId() -> object! RabbitMQ.AMQP.Client.IMessage.CorrelationId(object! id) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.IMessage.CreationTime() -> System.DateTime RabbitMQ.AMQP.Client.IMessage.CreationTime(System.DateTime creationTime) -> RabbitMQ.AMQP.Client.IMessage! +RabbitMQ.AMQP.Client.IMessage.Durable() -> bool +RabbitMQ.AMQP.Client.IMessage.Durable(bool durable) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.IMessage.GroupId() -> string! RabbitMQ.AMQP.Client.IMessage.GroupId(string! groupId) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.IMessage.GroupSequence() -> uint @@ -390,7 +392,6 @@ RabbitMQ.AMQP.Client.Impl.AmqpManagement.Queue(string! name) -> RabbitMQ.AMQP.Cl RabbitMQ.AMQP.Client.Impl.AmqpMessage RabbitMQ.AMQP.Client.Impl.AmqpMessage.AbsoluteExpiryTime() -> System.DateTime RabbitMQ.AMQP.Client.Impl.AmqpMessage.AbsoluteExpiryTime(System.DateTime absoluteExpiryTime) -> RabbitMQ.AMQP.Client.IMessage! -RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage() -> void RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(Amqp.Message! nativeMessage) -> void RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(byte[]! body) -> void RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(string! body) -> void @@ -407,6 +408,8 @@ RabbitMQ.AMQP.Client.Impl.AmqpMessage.CorrelationId() -> object! RabbitMQ.AMQP.Client.Impl.AmqpMessage.CorrelationId(object! id) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.Impl.AmqpMessage.CreationTime() -> System.DateTime RabbitMQ.AMQP.Client.Impl.AmqpMessage.CreationTime(System.DateTime creationTime) -> RabbitMQ.AMQP.Client.IMessage! +RabbitMQ.AMQP.Client.Impl.AmqpMessage.Durable() -> bool +RabbitMQ.AMQP.Client.Impl.AmqpMessage.Durable(bool durable) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.Impl.AmqpMessage.GroupId() -> string! RabbitMQ.AMQP.Client.Impl.AmqpMessage.GroupId(string! groupId) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.Impl.AmqpMessage.GroupSequence() -> uint diff --git a/Tests/Publisher/PublisherTests.cs b/Tests/Publisher/PublisherTests.cs index af9b141..a91b05b 100644 --- a/Tests/Publisher/PublisherTests.cs +++ b/Tests/Publisher/PublisherTests.cs @@ -3,6 +3,7 @@ // Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. using System; +using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; @@ -22,8 +23,8 @@ public async Task ValidateBuilderRaiseExceptionIfQueueOrExchangeAreNotSetCorrect Assert.NotNull(_management); await Assert.ThrowsAsync(() => - _connection.PublisherBuilder().Queue("queue_and_exchange_cant_set_together"). - Exchange("queue_and_exchange_cant_set_together").BuildAsync()); + _connection.PublisherBuilder().Queue("queue_and_exchange_cant_set_together") + .Exchange("queue_and_exchange_cant_set_together").BuildAsync()); await _connection.CloseAsync(); Assert.Empty(_connection.Publishers); @@ -192,6 +193,7 @@ public async Task PublisherSendingShouldThrowWhenExchangeHasBeenDeleted() publishOutcome = nextPublishResult.Outcome; break; } + await Task.Delay(TimeSpan.FromMilliseconds(100)); } @@ -243,6 +245,7 @@ public async Task PublisherSendingShouldThrowWhenQueueHasBeenDeleted() publishOutcome = nextPublishResult.Outcome; break; } + await Task.Delay(TimeSpan.FromMilliseconds(100)); } @@ -256,4 +259,56 @@ public async Task PublisherSendingShouldThrowWhenQueueHasBeenDeleted() await publisher.CloseAsync(); publisher.Dispose(); } + + [Theory] + [InlineData(QueueType.QUORUM)] + [InlineData(QueueType.CLASSIC)] + public async Task MessageShouldBeDurableByDefault(QueueType queueType) + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + + IQueueSpecification queueSpec = _management.Queue(_queueName).Type(queueType); + await queueSpec.DeclareAsync(); + + IPublisher publisher = await _connection.PublisherBuilder().Queue(queueSpec).BuildAsync(); + List messages = new(); + TaskCompletionSource> tcs = new(); + IConsumer consumer = await _connection.ConsumerBuilder() + .Queue(queueSpec) + .MessageHandler((context, message) => + { + messages.Add(message); + context.Accept(); + if (messages.Count == 2) + { + tcs.SetResult(messages); + } + + return Task.CompletedTask; + }).BuildAndStartAsync(); + + // the first message should be durable by default + AmqpMessage durable = new("Hello wold!"); + PublishResult pr = await publisher.PublishAsync(durable); + Assert.Equal(OutcomeState.Accepted, pr.Outcome.State); + Assert.True(durable.Durable()); + + // the second message should be not durable set by the user + + AmqpMessage notDurable = new("Hello wold!"); + notDurable.Durable(false); + PublishResult pr2 = await publisher.PublishAsync(notDurable); + Assert.Equal(OutcomeState.Accepted, pr2.Outcome.State); + Assert.False(notDurable.Durable()); + var r = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(10)); + Assert.True(r[0].Durable()); + Assert.False(r[1].Durable()); + + await consumer.CloseAsync(); + await publisher.CloseAsync(); + await queueSpec.DeleteAsync(); + + Assert.Empty(_connection.Publishers); + } }