From 0d4d632c0bbd9b4cecd3228adc11ce046b388152 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 8 May 2025 05:48:30 -0700 Subject: [PATCH 1/6] Add dedicated exception for `basic.return` messages. Fixes #1831 This PR adds the `PublishReturnException` class that includes the originating exchange and routing key for a `basic.return` message. It should be backwards-compatible in the API. --- .../Exceptions/PublishException.cs | 51 +++++++++++++++++++ .../Impl/Channel.PublisherConfirms.cs | 12 +++-- .../RabbitMQ.Client/PublicAPI.Unshipped.txt | 4 ++ 3 files changed, 63 insertions(+), 4 deletions(-) diff --git a/projects/RabbitMQ.Client/Exceptions/PublishException.cs b/projects/RabbitMQ.Client/Exceptions/PublishException.cs index 42939ed23c..c15e54470b 100644 --- a/projects/RabbitMQ.Client/Exceptions/PublishException.cs +++ b/projects/RabbitMQ.Client/Exceptions/PublishException.cs @@ -63,4 +63,55 @@ public PublishException(ulong publishSequenceNumber, bool isReturn) : base() /// public ulong PublishSequenceNumber => _publishSequenceNumber; } + + /// + /// Class for exceptions related to publisher confirmations + /// or the mandatory flag, when basic.return is + /// sent from the broker. + /// + public class PublishReturnException : PublishException + { + private readonly string _exchange; + private readonly string _routingKey; + + public PublishReturnException(ulong publishSequenceNumber, string exchange, string routingKey) + : base(publishSequenceNumber, true) + { + _exchange = exchange; + _routingKey = routingKey; + } + + /// + /// Get the Exchange associated with this basic.return + /// + public string Exchange => _exchange; + + /// + /// Get the RoutingKey associated with this basic.return + /// + public string RoutingKey => _routingKey; + } + + internal static class PublishExceptionFactory + { + internal static PublishException Create(bool isReturn, + ulong deliveryTag, string? exchange = null, string? routingKey = null) + { + if (isReturn) + { + if (exchange is not null && routingKey is not null) + { + return new PublishReturnException(deliveryTag, exchange, routingKey); + } + else + { + return new PublishException(deliveryTag, isReturn); + } + } + else + { + return new PublishException(deliveryTag, isReturn); + } + } + } } diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs index d2ca45247c..31d1406b35 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -200,7 +200,8 @@ private void HandleAck(ulong deliveryTag, bool multiple) } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn) + private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn, + string? exchange = null, string? routingKey = null) { if (ShouldHandleAckOrNack(deliveryTag)) { @@ -210,7 +211,8 @@ private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn) { if (pair.Key <= deliveryTag) { - pair.Value.SetException(new PublishException(pair.Key, isReturn)); + PublishException ex = PublishExceptionFactory.Create(isReturn, pair.Key, exchange, routingKey); + pair.Value.SetException(ex); _confirmsTaskCompletionSources.Remove(pair.Key, out _); } } @@ -219,7 +221,8 @@ private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn) { if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource? tcs)) { - tcs.SetException(new PublishException(deliveryTag, isReturn)); + PublishException ex = PublishExceptionFactory.Create(isReturn, deliveryTag); + tcs.SetException(ex); } } } @@ -249,7 +252,8 @@ private void HandleReturn(BasicReturnEventArgs basicReturnEvent) } } - HandleNack(publishSequenceNumber, multiple: false, isReturn: true); + HandleNack(publishSequenceNumber, multiple: false, isReturn: true, + exchange: basicReturnEvent.Exchange, routingKey: basicReturnEvent.RoutingKey); } } diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index e69de29bb2..14a88f5dfc 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -0,0 +1,4 @@ +RabbitMQ.Client.Exceptions.PublishReturnException +RabbitMQ.Client.Exceptions.PublishReturnException.Exchange.get -> string! +RabbitMQ.Client.Exceptions.PublishReturnException.PublishReturnException(ulong publishSequenceNumber, string! exchange, string! routingKey) -> void +RabbitMQ.Client.Exceptions.PublishReturnException.RoutingKey.get -> string! \ No newline at end of file From 0c564b1c8b638f01d531f91af5bb7626a72c5e1a Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 9 May 2025 07:48:10 -0700 Subject: [PATCH 2/6] Add reply code and reply text to the new `PublishBasicException` exception type. --- .../Exceptions/PublishException.cs | 38 ++++++++++++------- .../Impl/Channel.PublisherConfirms.cs | 9 +++-- .../RabbitMQ.Client/PublicAPI.Unshipped.txt | 4 +- 3 files changed, 33 insertions(+), 18 deletions(-) diff --git a/projects/RabbitMQ.Client/Exceptions/PublishException.cs b/projects/RabbitMQ.Client/Exceptions/PublishException.cs index c15e54470b..e209e4ab4c 100644 --- a/projects/RabbitMQ.Client/Exceptions/PublishException.cs +++ b/projects/RabbitMQ.Client/Exceptions/PublishException.cs @@ -73,40 +73,50 @@ public class PublishReturnException : PublishException { private readonly string _exchange; private readonly string _routingKey; + private readonly ushort _replyCode; + private readonly string _replyText; - public PublishReturnException(ulong publishSequenceNumber, string exchange, string routingKey) + public PublishReturnException(ulong publishSequenceNumber, + string? exchange = null, string? routingKey = null, + ushort? replyCode = null, string? replyText = null) : base(publishSequenceNumber, true) { - _exchange = exchange; - _routingKey = routingKey; + _exchange = exchange ?? string.Empty; + _routingKey = routingKey ?? string.Empty; + _replyCode = replyCode ?? 0; + _replyText = replyText ?? string.Empty; } /// - /// Get the Exchange associated with this basic.return + /// Get the exchange associated with this basic.return /// public string Exchange => _exchange; /// - /// Get the RoutingKey associated with this basic.return + /// Get the routing key associated with this basic.return /// public string RoutingKey => _routingKey; + + /// + /// Get the reply code associated with this basic.return + /// + public ushort ReplyCode => _replyCode; + + /// + /// Get the reply text associated with this basic.return + /// + public string ReplyText => _replyText; } internal static class PublishExceptionFactory { internal static PublishException Create(bool isReturn, - ulong deliveryTag, string? exchange = null, string? routingKey = null) + ulong deliveryTag, string? exchange = null, string? routingKey = null, + ushort? replyCode = null, string? replyText = null) { if (isReturn) { - if (exchange is not null && routingKey is not null) - { - return new PublishReturnException(deliveryTag, exchange, routingKey); - } - else - { - return new PublishException(deliveryTag, isReturn); - } + return new PublishReturnException(deliveryTag, exchange, routingKey, replyCode, replyText); } else { diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs index 31d1406b35..e99cc56068 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -201,7 +201,8 @@ private void HandleAck(ulong deliveryTag, bool multiple) [MethodImpl(MethodImplOptions.AggressiveInlining)] private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn, - string? exchange = null, string? routingKey = null) + string? exchange = null, string? routingKey = null, + ushort? replyCode = null, string? replyText = null) { if (ShouldHandleAckOrNack(deliveryTag)) { @@ -211,7 +212,8 @@ private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn, { if (pair.Key <= deliveryTag) { - PublishException ex = PublishExceptionFactory.Create(isReturn, pair.Key, exchange, routingKey); + PublishException ex = PublishExceptionFactory.Create(isReturn, pair.Key, + exchange, routingKey, replyCode, replyText); pair.Value.SetException(ex); _confirmsTaskCompletionSources.Remove(pair.Key, out _); } @@ -253,7 +255,8 @@ private void HandleReturn(BasicReturnEventArgs basicReturnEvent) } HandleNack(publishSequenceNumber, multiple: false, isReturn: true, - exchange: basicReturnEvent.Exchange, routingKey: basicReturnEvent.RoutingKey); + exchange: basicReturnEvent.Exchange, routingKey: basicReturnEvent.RoutingKey, + replyCode: basicReturnEvent.ReplyCode, replyText: basicReturnEvent.ReplyText); } } diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index 14a88f5dfc..08f4ec3838 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -1,4 +1,6 @@ RabbitMQ.Client.Exceptions.PublishReturnException RabbitMQ.Client.Exceptions.PublishReturnException.Exchange.get -> string! -RabbitMQ.Client.Exceptions.PublishReturnException.PublishReturnException(ulong publishSequenceNumber, string! exchange, string! routingKey) -> void +RabbitMQ.Client.Exceptions.PublishReturnException.PublishReturnException(ulong publishSequenceNumber, string? exchange = null, string? routingKey = null, ushort? replyCode = null, string? replyText = null) -> void +RabbitMQ.Client.Exceptions.PublishReturnException.ReplyCode.get -> ushort +RabbitMQ.Client.Exceptions.PublishReturnException.ReplyText.get -> string! RabbitMQ.Client.Exceptions.PublishReturnException.RoutingKey.get -> string! \ No newline at end of file From 446c4b0c8041d5a6509667f8135c6ddf1589b9f6 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 9 May 2025 09:12:28 -0700 Subject: [PATCH 3/6] Fix bug in the code that did not set certain `PublishReturnException` data --- .../Impl/Channel.PublisherConfirms.cs | 3 ++- .../Test/Integration/TestBasicPublishAsync.cs | 21 ++++++++++++++++++- .../TestConcurrentAccessWithSharedChannel.cs | 7 ++++++- 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs index e99cc56068..c694d29412 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -223,7 +223,8 @@ private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn, { if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource? tcs)) { - PublishException ex = PublishExceptionFactory.Create(isReturn, deliveryTag); + PublishException ex = PublishExceptionFactory.Create(isReturn, deliveryTag, + exchange, routingKey, replyCode, replyText); tcs.SetException(ex); } } diff --git a/projects/Test/Integration/TestBasicPublishAsync.cs b/projects/Test/Integration/TestBasicPublishAsync.cs index 25de62eb70..4ce2dfc2cc 100644 --- a/projects/Test/Integration/TestBasicPublishAsync.cs +++ b/projects/Test/Integration/TestBasicPublishAsync.cs @@ -29,8 +29,10 @@ // Copyright (c) 2007-2025 Broadcom. All Rights Reserved. //--------------------------------------------------------------------------- +using System; using System.Threading.Tasks; using RabbitMQ.Client; +using RabbitMQ.Client.Exceptions; using Xunit; using Xunit.Abstractions; @@ -49,7 +51,6 @@ public async Task TestQueuePurgeAsync() var publishSyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true); var publishTask = Task.Run(async () => @@ -65,5 +66,23 @@ public async Task TestQueuePurgeAsync() Assert.True(await publishSyncSource.Task); Assert.Equal((uint)messageCount, await _channel.QueuePurgeAsync(q)); } + + [Fact] + public async Task TestBasicReturnAsync() + { + try + { + await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: Guid.NewGuid().ToString(), + mandatory: true, body: GetRandomBody()); + } + catch (PublishReturnException prex) + { + Assert.True(prex.IsReturn); + Assert.NotNull(prex.Exchange); + Assert.NotNull(prex.RoutingKey); + Assert.NotEqual(0, prex.ReplyCode); + Assert.NotNull(prex.ReplyText); + } + } } } diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs index b033839a15..36b66346bc 100644 --- a/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs @@ -145,8 +145,13 @@ await TestConcurrentOperationsAsync(async () => } catch (PublishException ex) { - if (ex.IsReturn) + if (ex is PublishReturnException prex) { + Assert.True(prex.IsReturn); + Assert.NotNull(prex.Exchange); + Assert.NotNull(prex.RoutingKey); + Assert.NotEqual(0, prex.ReplyCode); + Assert.NotNull(prex.ReplyText); Interlocked.Increment(ref totalReturnCount); } else From 5b62dff5b05944efe6ec8b571d20dc9ebe5eb92a Mon Sep 17 00:00:00 2001 From: Brandon Ording Date: Fri, 9 May 2025 14:18:07 -0400 Subject: [PATCH 4/6] Add exception messages --- .../RabbitMQ.Client/Exceptions/PublishException.cs | 13 +++++++++---- projects/RabbitMQ.Client/PublicAPI.Unshipped.txt | 3 ++- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/projects/RabbitMQ.Client/Exceptions/PublishException.cs b/projects/RabbitMQ.Client/Exceptions/PublishException.cs index e209e4ab4c..b03366c06b 100644 --- a/projects/RabbitMQ.Client/Exceptions/PublishException.cs +++ b/projects/RabbitMQ.Client/Exceptions/PublishException.cs @@ -42,7 +42,11 @@ public class PublishException : RabbitMQClientException private bool _isReturn = false; private ulong _publishSequenceNumber = ulong.MinValue; - public PublishException(ulong publishSequenceNumber, bool isReturn) : base() + public PublishException(ulong publishSequenceNumber, bool isReturn) : this(publishSequenceNumber, isReturn, "Message rejected by broker.") + { + } + + public PublishException(ulong publishSequenceNumber, bool isReturn, string message) : base(message) { if (publishSequenceNumber == ulong.MinValue) { @@ -76,10 +80,10 @@ public class PublishReturnException : PublishException private readonly ushort _replyCode; private readonly string _replyText; - public PublishReturnException(ulong publishSequenceNumber, + public PublishReturnException(ulong publishSequenceNumber, string message, string? exchange = null, string? routingKey = null, ushort? replyCode = null, string? replyText = null) - : base(publishSequenceNumber, true) + : base(publishSequenceNumber, true, message) { _exchange = exchange ?? string.Empty; _routingKey = routingKey ?? string.Empty; @@ -116,7 +120,8 @@ internal static PublishException Create(bool isReturn, { if (isReturn) { - return new PublishReturnException(deliveryTag, exchange, routingKey, replyCode, replyText); + string message = $"{replyCode} {replyText} Exchange: {exchange} Routing Key: {routingKey}"; + return new PublishReturnException(deliveryTag, message, exchange, routingKey, replyCode, replyText); } else { diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index 08f4ec3838..9a46150097 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -1,6 +1,7 @@ +RabbitMQ.Client.Exceptions.PublishException.PublishException(ulong publishSequenceNumber, bool isReturn, string! message) -> void RabbitMQ.Client.Exceptions.PublishReturnException RabbitMQ.Client.Exceptions.PublishReturnException.Exchange.get -> string! -RabbitMQ.Client.Exceptions.PublishReturnException.PublishReturnException(ulong publishSequenceNumber, string? exchange = null, string? routingKey = null, ushort? replyCode = null, string? replyText = null) -> void +RabbitMQ.Client.Exceptions.PublishReturnException.PublishReturnException(ulong publishSequenceNumber, string! message, string? exchange = null, string? routingKey = null, ushort? replyCode = null, string? replyText = null) -> void RabbitMQ.Client.Exceptions.PublishReturnException.ReplyCode.get -> ushort RabbitMQ.Client.Exceptions.PublishReturnException.ReplyText.get -> string! RabbitMQ.Client.Exceptions.PublishReturnException.RoutingKey.get -> string! \ No newline at end of file From ab717b906e3b9f37b4a363f3798b0412e918232e Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 12 May 2025 14:11:38 +0200 Subject: [PATCH 5/6] my 2cents. Add more conditions to the test Signed-off-by: Gabriele Santomaggio --- projects/Test/Integration/TestBasicPublishAsync.cs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/projects/Test/Integration/TestBasicPublishAsync.cs b/projects/Test/Integration/TestBasicPublishAsync.cs index 4ce2dfc2cc..f93ac6e7f5 100644 --- a/projects/Test/Integration/TestBasicPublishAsync.cs +++ b/projects/Test/Integration/TestBasicPublishAsync.cs @@ -70,18 +70,23 @@ public async Task TestQueuePurgeAsync() [Fact] public async Task TestBasicReturnAsync() { + string routingKey = Guid.NewGuid().ToString(); try { - await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: Guid.NewGuid().ToString(), + await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: routingKey, mandatory: true, body: GetRandomBody()); } catch (PublishReturnException prex) { Assert.True(prex.IsReturn); Assert.NotNull(prex.Exchange); + Assert.Equal(string.Empty, prex.Exchange); Assert.NotNull(prex.RoutingKey); + Assert.Equal(routingKey, prex.RoutingKey); Assert.NotEqual(0, prex.ReplyCode); Assert.NotNull(prex.ReplyText); + Assert.Equal("NO_ROUTE", prex.ReplyText); + } } } From 68f7b377193f248ec550f1593510f5ccc87d9932 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 12 May 2025 14:15:51 +0200 Subject: [PATCH 6/6] formatting Signed-off-by: Gabriele Santomaggio --- projects/Test/Integration/TestBasicPublishAsync.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/projects/Test/Integration/TestBasicPublishAsync.cs b/projects/Test/Integration/TestBasicPublishAsync.cs index f93ac6e7f5..0d0e776b49 100644 --- a/projects/Test/Integration/TestBasicPublishAsync.cs +++ b/projects/Test/Integration/TestBasicPublishAsync.cs @@ -86,7 +86,7 @@ await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: routingKey, Assert.NotEqual(0, prex.ReplyCode); Assert.NotNull(prex.ReplyText); Assert.Equal("NO_ROUTE", prex.ReplyText); - + } } }