Skip to content

Commit 5326343

Browse files
committed
Acquiring a sequence number should be an async method
1 parent a6b7e38 commit 5326343

File tree

7 files changed

+44
-47
lines changed

7 files changed

+44
-47
lines changed

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -440,9 +440,9 @@ RabbitMQ.Client.IChannel.CurrentQueue.get -> string
440440
RabbitMQ.Client.IChannel.DefaultConsumer.get -> RabbitMQ.Client.IAsyncBasicConsumer
441441
RabbitMQ.Client.IChannel.DefaultConsumer.set -> void
442442
RabbitMQ.Client.IChannel.FlowControl -> System.EventHandler<RabbitMQ.Client.Events.FlowControlEventArgs>
443+
RabbitMQ.Client.IChannel.GetNextPublishSequenceNumberAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask<ulong>
443444
RabbitMQ.Client.IChannel.IsClosed.get -> bool
444445
RabbitMQ.Client.IChannel.IsOpen.get -> bool
445-
RabbitMQ.Client.IChannel.NextPublishSeqNo.get -> ulong
446446
RabbitMQ.Client.IChannelExtensions
447447
RabbitMQ.Client.IConnection
448448
RabbitMQ.Client.IConnection.CallbackException -> System.EventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs>

projects/RabbitMQ.Client/client/api/IChannel.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,6 @@ public interface IChannel : IDisposable
9595
/// </summary>
9696
bool IsOpen { get; }
9797

98-
/// <summary>
99-
/// When in confirm mode, return the sequence number of the next message to be published.
100-
/// </summary>
101-
ulong NextPublishSeqNo { get; }
102-
10398
/// <summary>
10499
/// The name of the last queue declared on this channel.
105100
/// </summary>
@@ -143,6 +138,11 @@ public interface IChannel : IDisposable
143138
/// </remarks>
144139
event EventHandler<ShutdownEventArgs> ChannelShutdown;
145140

141+
/// <summary>
142+
/// When in confirm mode, return the sequence number of the next message to be published.
143+
/// </summary>
144+
ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default);
145+
146146
/// <summary>Asynchronously acknknowledges one or more messages.</summary>
147147
/// <param name="deliveryTag">The delivery tag.</param>
148148
/// <param name="multiple">Ack all messages up to the delivery tag if set to <c>true</c>.</param>

projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,6 @@ public IAsyncBasicConsumer? DefaultConsumer
144144

145145
public bool IsOpen => !_disposed && _innerChannel.IsOpen;
146146

147-
public ulong NextPublishSeqNo => InnerChannel.NextPublishSeqNo;
148-
149147
public string? CurrentQueue => InnerChannel.CurrentQueue;
150148

151149
internal async Task<bool> AutomaticallyRecoverAsync(AutorecoveringConnection conn, bool recoverConsumers,
@@ -274,6 +272,8 @@ public void Dispose()
274272
_disposed = true;
275273
}
276274

275+
public ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default) => InnerChannel.GetNextPublishSequenceNumberAsync(cancellationToken);
276+
277277
public ValueTask BasicAckAsync(ulong deliveryTag, bool multiple, CancellationToken cancellationToken)
278278
=> InnerChannel.BasicAckAsync(deliveryTag, multiple, cancellationToken);
279279

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -181,29 +181,6 @@ public IAsyncBasicConsumer? DefaultConsumer
181181
[MemberNotNullWhen(false, nameof(CloseReason))]
182182
public bool IsOpen => CloseReason is null;
183183

184-
public ulong NextPublishSeqNo
185-
{
186-
get
187-
{
188-
if (ConfirmsAreEnabled)
189-
{
190-
_confirmSemaphore.Wait();
191-
try
192-
{
193-
return _nextPublishSeqNo;
194-
}
195-
finally
196-
{
197-
_confirmSemaphore.Release();
198-
}
199-
}
200-
else
201-
{
202-
return _nextPublishSeqNo;
203-
}
204-
}
205-
}
206-
207184
public string? CurrentQueue { get; private set; }
208185

209186
public ISession Session { get; private set; }
@@ -805,6 +782,26 @@ protected void HandleConnectionUnblocked()
805782
Session.Connection.HandleConnectionUnblocked();
806783
}
807784

785+
public async ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default)
786+
{
787+
if (ConfirmsAreEnabled)
788+
{
789+
await _confirmSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
790+
try
791+
{
792+
return _nextPublishSeqNo;
793+
}
794+
finally
795+
{
796+
_confirmSemaphore.Release();
797+
}
798+
}
799+
else
800+
{
801+
return _nextPublishSeqNo;
802+
}
803+
}
804+
808805
public abstract ValueTask BasicAckAsync(ulong deliveryTag, bool multiple,
809806
CancellationToken cancellationToken);
810807

projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ void CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
173173
{
174174
string msg = i.ToString();
175175
byte[] body = Encoding.UTF8.GetBytes(msg);
176-
ulong nextPublishSeqNo = channel.NextPublishSeqNo;
176+
ulong nextPublishSeqNo = await channel.GetNextPublishSequenceNumberAsync();
177177
if ((ulong)(i + 1) != nextPublishSeqNo)
178178
{
179179
Console.WriteLine($"{DateTime.Now} [WARNING] i {i + 1} does not equal next sequence number: {nextPublishSeqNo}");

projects/Test/Integration/TestConfirmSelect.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,19 +53,19 @@ ValueTask PublishAsync()
5353
}
5454

5555
await _channel.ConfirmSelectAsync();
56-
Assert.Equal(1ul, _channel.NextPublishSeqNo);
56+
Assert.Equal(1ul, await _channel.GetNextPublishSequenceNumberAsync());
5757
await PublishAsync();
58-
Assert.Equal(2ul, _channel.NextPublishSeqNo);
58+
Assert.Equal(2ul, await _channel.GetNextPublishSequenceNumberAsync());
5959
await PublishAsync();
60-
Assert.Equal(3ul, _channel.NextPublishSeqNo);
60+
Assert.Equal(3ul, await _channel.GetNextPublishSequenceNumberAsync());
6161

6262
await _channel.ConfirmSelectAsync();
6363
await PublishAsync();
64-
Assert.Equal(4ul, _channel.NextPublishSeqNo);
64+
Assert.Equal(4ul, await _channel.GetNextPublishSequenceNumberAsync());
6565
await PublishAsync();
66-
Assert.Equal(5ul, _channel.NextPublishSeqNo);
66+
Assert.Equal(5ul, await _channel.GetNextPublishSequenceNumberAsync());
6767
await PublishAsync();
68-
Assert.Equal(6ul, _channel.NextPublishSeqNo);
68+
Assert.Equal(6ul, await _channel.GetNextPublishSequenceNumberAsync());
6969
}
7070

7171
[Theory]
@@ -80,7 +80,7 @@ public async Task TestDeliveryTagDiverged_GH1043(ushort correlationIdLength)
8080
await _channel.ConfirmSelectAsync();
8181

8282
var properties = new BasicProperties();
83-
// _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo);
83+
// _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync());
8484
await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty,
8585
mandatory: false, basicProperties: properties, body: body);
8686
await _channel.WaitForConfirmsOrDieAsync();
@@ -91,7 +91,7 @@ await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty,
9191
{
9292
CorrelationId = new string('o', correlationIdLength)
9393
};
94-
// _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo);
94+
// _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync());
9595
await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body);
9696
await _channel.WaitForConfirmsOrDieAsync();
9797
}
@@ -101,7 +101,7 @@ await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty,
101101
}
102102

103103
properties = new BasicProperties();
104-
// _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo);
104+
// _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync());
105105
await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body);
106106
await _channel.WaitForConfirmsOrDieAsync();
107107
// _output.WriteLine("I'm done...");

projects/Test/Integration/TestConfirmSelectAsync.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,19 +49,19 @@ public TestConfirmSelectAsync(ITestOutputHelper output) : base(output)
4949
public async Task TestConfirmSelectIdempotency()
5050
{
5151
await _channel.ConfirmSelectAsync();
52-
Assert.Equal(1ul, _channel.NextPublishSeqNo);
52+
Assert.Equal(1ul, await _channel.GetNextPublishSequenceNumberAsync());
5353
await Publish();
54-
Assert.Equal(2ul, _channel.NextPublishSeqNo);
54+
Assert.Equal(2ul, await _channel.GetNextPublishSequenceNumberAsync());
5555
await Publish();
56-
Assert.Equal(3ul, _channel.NextPublishSeqNo);
56+
Assert.Equal(3ul, await _channel.GetNextPublishSequenceNumberAsync());
5757

5858
await _channel.ConfirmSelectAsync();
5959
await Publish();
60-
Assert.Equal(4ul, _channel.NextPublishSeqNo);
60+
Assert.Equal(4ul, await _channel.GetNextPublishSequenceNumberAsync());
6161
await Publish();
62-
Assert.Equal(5ul, _channel.NextPublishSeqNo);
62+
Assert.Equal(5ul, await _channel.GetNextPublishSequenceNumberAsync());
6363
await Publish();
64-
Assert.Equal(6ul, _channel.NextPublishSeqNo);
64+
Assert.Equal(6ul, await _channel.GetNextPublishSequenceNumberAsync());
6565
}
6666

6767
private ValueTask Publish()

0 commit comments

Comments
 (0)