Skip to content

Commit 7f42b03

Browse files
committed
* Add PublishException class.
* Test that non-routable messages result in `PublishException` with `IsReturn = true`
1 parent bc1204b commit 7f42b03

File tree

5 files changed

+135
-21
lines changed

5 files changed

+135
-21
lines changed
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
30+
//---------------------------------------------------------------------------
31+
32+
using System;
33+
34+
namespace RabbitMQ.Client.Exceptions
35+
{
36+
/// <summary>
37+
/// Class for exceptions related to publisher confirmations
38+
/// or the <c>mandatory</c> flag.
39+
/// </summary>
40+
public class PublishException : RabbitMQClientException
41+
{
42+
private bool _isReturn = false;
43+
private ulong _publishSequenceNumber = ulong.MinValue;
44+
45+
public PublishException(ulong publishSequenceNumber, bool isReturn) : base()
46+
{
47+
if (publishSequenceNumber == ulong.MinValue)
48+
{
49+
throw new ArgumentException($"{nameof(publishSequenceNumber)} must not be 0");
50+
}
51+
52+
_isReturn = isReturn;
53+
_publishSequenceNumber = publishSequenceNumber;
54+
}
55+
56+
/// <summary>
57+
/// <c>true</c> if this exception is due to a <c>basic.return</c>
58+
/// </summary>
59+
public bool IsReturn => _isReturn;
60+
61+
/// <summary>
62+
/// Retrieve the publish sequence number.
63+
/// </summary>
64+
public ulong PublishSequenceNumber => _publishSequenceNumber;
65+
}
66+
}

projects/RabbitMQ.Client/Impl/ChannelBase.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -632,7 +632,7 @@ await _basicNacksAsyncWrapper.InvokeAsync(this, args)
632632
.ConfigureAwait(false);
633633
}
634634

635-
await HandleNack(nack._deliveryTag, nack._multiple, cancellationToken)
635+
await HandleNack(nack._deliveryTag, nack._multiple, false, cancellationToken)
636636
.ConfigureAwait(false);
637637

638638
return true;
@@ -662,7 +662,7 @@ await _basicReturnAsyncWrapper.InvokeAsync(this, e)
662662
publishSequenceNumber = BinaryPrimitives.ReadUInt64BigEndian((byte[])maybeSeqNum);
663663
}
664664

665-
await HandleNack(publishSequenceNumber, false, cancellationToken)
665+
await HandleNack(publishSequenceNumber, multiple: false, isReturn: true, cancellationToken)
666666
.ConfigureAwait(false);
667667
}
668668

@@ -1785,7 +1785,8 @@ private Task HandleAck(ulong deliveryTag, bool multiple, CancellationToken cance
17851785
return Task.CompletedTask;
17861786
}
17871787

1788-
private Task HandleNack(ulong deliveryTag, bool multiple, CancellationToken cancellationToken = default)
1788+
private Task HandleNack(ulong deliveryTag, bool multiple, bool isReturn,
1789+
CancellationToken cancellationToken = default)
17891790
{
17901791
if (_publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled && deliveryTag > 0 && !_confirmsTaskCompletionSources.IsEmpty)
17911792
{
@@ -1795,7 +1796,7 @@ private Task HandleNack(ulong deliveryTag, bool multiple, CancellationToken canc
17951796
{
17961797
if (pair.Key <= deliveryTag)
17971798
{
1798-
pair.Value.SetException(new Exception("TODO"));
1799+
pair.Value.SetException(new PublishException(pair.Key, isReturn));
17991800
_confirmsTaskCompletionSources.Remove(pair.Key, out _);
18001801
}
18011802
}
@@ -1804,7 +1805,7 @@ private Task HandleNack(ulong deliveryTag, bool multiple, CancellationToken canc
18041805
{
18051806
if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource<bool>? tcs))
18061807
{
1807-
tcs.SetException(new Exception("TODO"));
1808+
tcs.SetException(new PublishException(deliveryTag, isReturn));
18081809
}
18091810
}
18101811
}

projects/RabbitMQ.Client/PublicAPI.Unshipped.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled.set ->
1010
RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(RabbitMQ.Client.Events.ShutdownEventArgs! reason) -> void
1111
RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(RabbitMQ.Client.Events.ShutdownEventArgs! reason, string! prefix) -> void
1212
RabbitMQ.Client.Exceptions.ProtocolViolationException.ProtocolViolationException() -> void
13+
RabbitMQ.Client.Exceptions.PublishException
14+
RabbitMQ.Client.Exceptions.PublishException.IsReturn.get -> bool
15+
RabbitMQ.Client.Exceptions.PublishException.PublishException(ulong publishSequenceNumber, bool isReturn) -> void
16+
RabbitMQ.Client.Exceptions.PublishException.PublishSequenceNumber.get -> ulong
1317
RabbitMQ.Client.Exceptions.RabbitMQClientException.RabbitMQClientException() -> void
1418
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
1519
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(string! exchange, string! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask

projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs

Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
using System.Threading.Tasks;
3838
using RabbitMQ.Client;
3939
using RabbitMQ.Client.Events;
40+
using RabbitMQ.Client.Exceptions;
4041
using Xunit;
4142
using Xunit.Abstractions;
4243

@@ -54,24 +55,21 @@ public TestConcurrentAccessWithSharedChannel(ITestOutputHelper output)
5455
[Fact]
5556
public async Task ConcurrentPublishSingleChannel()
5657
{
57-
int publishAckCount = 0;
58+
int expectedTotalMessageCount = 0;
59+
int expectedTotalReturnCount = 0;
60+
int totalNackCount = 0;
61+
int totalReturnCount = 0;
62+
int totalReceivedCount = 0;
5863

5964
_channel.BasicAcksAsync += (object sender, BasicAckEventArgs e) =>
6065
{
61-
Interlocked.Increment(ref publishAckCount);
6266
return Task.CompletedTask;
6367
};
6468

65-
_channel.BasicNacksAsync += (object sender, BasicNackEventArgs e) =>
66-
{
67-
_output.WriteLine($"channel #{_channel.ChannelNumber} saw a nack, deliveryTag: {e.DeliveryTag}, multiple: {e.Multiple}");
68-
return Task.CompletedTask;
69-
};
70-
71-
7269
await TestConcurrentOperationsAsync(async () =>
7370
{
74-
long receivedCount = 0;
71+
long thisBatchReceivedCount = 0;
72+
long thisBatchReturnedCount = 0;
7573
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
7674

7775
var msgTracker = new ConcurrentDictionary<ushort, bool>();
@@ -83,6 +81,7 @@ await TestConcurrentOperationsAsync(async () =>
8381

8482
consumer.ReceivedAsync += async (object sender, BasicDeliverEventArgs ea) =>
8583
{
84+
Interlocked.Increment(ref totalReceivedCount);
8685
try
8786
{
8887
System.Diagnostics.Debug.Assert(object.ReferenceEquals(sender, consumer));
@@ -94,7 +93,9 @@ await TestConcurrentOperationsAsync(async () =>
9493
IChannel ch = cons.Channel;
9594
await ch.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
9695

97-
if (Interlocked.Increment(ref receivedCount) == _messageCount)
96+
long receivedCountSoFar = Interlocked.Increment(ref thisBatchReceivedCount);
97+
98+
if ((receivedCountSoFar + thisBatchReturnedCount) == _messageCount)
9899
{
99100
if (msgTracker.Values.Any(v => v == false))
100101
{
@@ -117,18 +118,59 @@ await TestConcurrentOperationsAsync(async () =>
117118
var publishTasks = new List<ValueTask>();
118119
for (ushort i = 0; i < _messageCount; i++)
119120
{
120-
msgTracker[i] = false;
121+
Interlocked.Increment(ref expectedTotalMessageCount);
122+
121123
byte[] body = _encoding.GetBytes(i.ToString());
122-
publishTasks.Add(_channel.BasicPublishAsync("", q.QueueName, mandatory: true, body: body));
124+
125+
string routingKey = q.QueueName;
126+
if (i % 5 == 0)
127+
{
128+
routingKey = Guid.NewGuid().ToString();
129+
Interlocked.Increment(ref thisBatchReturnedCount);
130+
Interlocked.Increment(ref expectedTotalReturnCount);
131+
}
132+
else
133+
{
134+
msgTracker[i] = false;
135+
}
136+
137+
publishTasks.Add(_channel.BasicPublishAsync("", routingKey, mandatory: true, body: body));
123138
}
124139

125140
foreach (ValueTask pt in publishTasks)
126141
{
127-
await pt;
142+
try
143+
{
144+
await pt;
145+
}
146+
catch (PublishException ex)
147+
{
148+
if (ex.IsReturn)
149+
{
150+
Interlocked.Increment(ref totalReturnCount);
151+
}
152+
else
153+
{
154+
Interlocked.Increment(ref totalNackCount);
155+
}
156+
}
128157
}
129158

130159
Assert.True(await tcs.Task);
131160
}, Iterations);
161+
162+
if (IsVerbose)
163+
{
164+
_output.WriteLine("expectedTotalMessageCount: {0}", expectedTotalMessageCount);
165+
_output.WriteLine("expectedTotalReturnCount: {0}", expectedTotalReturnCount);
166+
_output.WriteLine("totalReceivedCount: {0}", totalReceivedCount);
167+
_output.WriteLine("totalReturnCount: {0}", totalReturnCount);
168+
_output.WriteLine("totalNackCount: {0}", totalNackCount);
169+
}
170+
171+
Assert.Equal(expectedTotalReturnCount, totalReturnCount);
172+
Assert.Equal(expectedTotalMessageCount, (totalReceivedCount + totalReturnCount));
173+
Assert.Equal(0, totalNackCount);
132174
}
133175
}
134176
}

projects/Test/Integration/TestConnectionTopologyRecovery.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,8 +228,9 @@ public async Task TestTopologyRecoveryBindingFilter()
228228

229229
Assert.True(ch.IsOpen);
230230
Assert.True(await SendAndConsumeMessageAsync(_conn, queueWithRecoveredBinding, exchange, bindingToRecover));
231-
// TODO use real exception being thrown
232-
await Assert.ThrowsAnyAsync<Exception>(() => SendAndConsumeMessageAsync(_conn, queueWithIgnoredBinding, exchange, bindingToIgnore));
231+
PublishException ex = await Assert.ThrowsAnyAsync<PublishException>(() =>
232+
SendAndConsumeMessageAsync(_conn, queueWithIgnoredBinding, exchange, bindingToIgnore));
233+
Assert.Equal((ulong)1, ex.PublishSequenceNumber);
233234
}
234235
finally
235236
{

0 commit comments

Comments
 (0)