Skip to content

Commit 3f27cf6

Browse files
committed
Fix NextPublishSeqNo when retrieved concurrently
Discovered while updating `rabbitmq/rabbitmq-tutorials` to version `7.0.0-rc.8` of this library.
1 parent f63c9c8 commit 3f27cf6

File tree

5 files changed

+260
-8
lines changed

5 files changed

+260
-8
lines changed

Build.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
<ProjectReference Include="projects/Test/Common/Common.csproj" />
1515
<ProjectReference Include="projects/Test/Applications/CreateChannel/CreateChannel.csproj" />
1616
<ProjectReference Include="projects/Test/Applications/MassPublish/MassPublish.csproj" />
17+
<ProjectReference Include="projects/Test/Applications/PublisherConfirms/PublisherConfirms.csproj" />
1718
<ProjectReference Include="projects/Test/Integration/Integration.csproj" />
1819
<ProjectReference Include="projects/Test/SequentialIntegration/SequentialIntegration.csproj" />
1920
<ProjectReference Include="projects/Test/Unit/Unit.csproj" />

RabbitMQDotNetClient.sln

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ToxiproxyNetCore", "project
4242
EndProject
4343
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ.Client.OpenTelemetry", "projects\RabbitMQ.Client.OpenTelemetry\RabbitMQ.Client.OpenTelemetry.csproj", "{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}"
4444
EndProject
45-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GH-1647", "projects\Test\Applications\GH-1647\GH-1647.csproj", "{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}"
45+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GH-1647", "projects\Test\Applications\GH-1647\GH-1647.csproj", "{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}"
46+
EndProject
47+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PublisherConfirms", "projects\Test\Applications\PublisherConfirms\PublisherConfirms.csproj", "{13149F73-2CDB-4ECF-BF2C-403860045751}"
4648
EndProject
4749
Global
4850
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -102,6 +104,10 @@ Global
102104
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Debug|Any CPU.Build.0 = Debug|Any CPU
103105
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Release|Any CPU.ActiveCfg = Release|Any CPU
104106
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Release|Any CPU.Build.0 = Release|Any CPU
107+
{13149F73-2CDB-4ECF-BF2C-403860045751}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
108+
{13149F73-2CDB-4ECF-BF2C-403860045751}.Debug|Any CPU.Build.0 = Debug|Any CPU
109+
{13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.ActiveCfg = Release|Any CPU
110+
{13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.Build.0 = Release|Any CPU
105111
EndGlobalSection
106112
GlobalSection(SolutionProperties) = preSolution
107113
HideSolutionNode = FALSE
@@ -117,6 +123,7 @@ Global
117123
{C11F25F4-7EA1-4874-9E25-DEB42E3A7C67} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
118124
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
119125
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
126+
{13149F73-2CDB-4ECF-BF2C-403860045751} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
120127
EndGlobalSection
121128
GlobalSection(ExtensibilityGlobals) = postSolution
122129
SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1}

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ internal abstract class ChannelBase : IChannel, IRecoverable
6060
private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue();
6161
private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim(true);
6262

63+
private ulong _nextPublishSeqNo;
6364
private SemaphoreSlim? _confirmSemaphore;
6465
private readonly LinkedList<ulong> _pendingDeliveryTags = new LinkedList<ulong>();
6566

@@ -176,7 +177,28 @@ public IAsyncBasicConsumer? DefaultConsumer
176177
[MemberNotNullWhen(false, nameof(CloseReason))]
177178
public bool IsOpen => CloseReason is null;
178179

179-
public ulong NextPublishSeqNo { get; private set; }
180+
public ulong NextPublishSeqNo
181+
{
182+
get
183+
{
184+
if (ConfirmsAreEnabled)
185+
{
186+
_confirmSemaphore.Wait();
187+
try
188+
{
189+
return _nextPublishSeqNo;
190+
}
191+
finally
192+
{
193+
_confirmSemaphore.Release();
194+
}
195+
}
196+
else
197+
{
198+
return _nextPublishSeqNo;
199+
}
200+
}
201+
}
180202

181203
public string? CurrentQueue { get; private set; }
182204

@@ -962,7 +984,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
962984
.ConfigureAwait(false);
963985
try
964986
{
965-
_pendingDeliveryTags.AddLast(NextPublishSeqNo++);
987+
_pendingDeliveryTags.AddLast(_nextPublishSeqNo++);
966988
}
967989
finally
968990
{
@@ -1005,7 +1027,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
10051027
.ConfigureAwait(false);
10061028
try
10071029
{
1008-
NextPublishSeqNo--;
1030+
_nextPublishSeqNo--;
10091031
_pendingDeliveryTags.RemoveLast();
10101032
}
10111033
finally
@@ -1029,7 +1051,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
10291051
.ConfigureAwait(false);
10301052
try
10311053
{
1032-
_pendingDeliveryTags.AddLast(NextPublishSeqNo++);
1054+
_pendingDeliveryTags.AddLast(_nextPublishSeqNo++);
10331055
}
10341056
finally
10351057
{
@@ -1072,7 +1094,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
10721094
.ConfigureAwait(false);
10731095
try
10741096
{
1075-
NextPublishSeqNo--;
1097+
_nextPublishSeqNo--;
10761098
_pendingDeliveryTags.RemoveLast();
10771099
}
10781100
finally
@@ -1166,10 +1188,10 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken)
11661188
.ConfigureAwait(false);
11671189
try
11681190
{
1169-
if (NextPublishSeqNo == 0UL)
1191+
if (_nextPublishSeqNo == 0UL)
11701192
{
11711193
_confirmsTaskCompletionSources = new List<TaskCompletionSource<bool>>();
1172-
NextPublishSeqNo = 1;
1194+
_nextPublishSeqNo = 1;
11731195
}
11741196

11751197
enqueued = Enqueue(k);
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Diagnostics;
4+
using System.Text;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using RabbitMQ.Client;
8+
9+
const int MESSAGE_COUNT = 50_000;
10+
bool debug = false;
11+
12+
await PublishMessagesIndividuallyAsync();
13+
await PublishMessagesInBatchAsync();
14+
await HandlePublishConfirmsAsynchronously();
15+
16+
static Task<IConnection> CreateConnectionAsync()
17+
{
18+
var factory = new ConnectionFactory { HostName = "localhost" };
19+
return factory.CreateConnectionAsync();
20+
}
21+
22+
static async Task PublishMessagesIndividuallyAsync()
23+
{
24+
using IConnection connection = await CreateConnectionAsync();
25+
using IChannel channel = await connection.CreateChannelAsync();
26+
27+
// declare a server-named queue
28+
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
29+
string queueName = queueDeclareResult.QueueName;
30+
await channel.ConfirmSelectAsync();
31+
32+
var sw = new Stopwatch();
33+
sw.Start();
34+
35+
for (int i = 0; i < MESSAGE_COUNT; i++)
36+
{
37+
byte[] body = Encoding.UTF8.GetBytes(i.ToString());
38+
await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body);
39+
}
40+
41+
await channel.WaitForConfirmsOrDieAsync();
42+
43+
sw.Stop();
44+
45+
Console.WriteLine($"Published {MESSAGE_COUNT:N0} messages individually in {sw.ElapsedMilliseconds:N0} ms");
46+
}
47+
48+
static async Task PublishMessagesInBatchAsync()
49+
{
50+
using IConnection connection = await CreateConnectionAsync();
51+
using IChannel channel = await connection.CreateChannelAsync();
52+
53+
// declare a server-named queue
54+
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
55+
string queueName = queueDeclareResult.QueueName;
56+
await channel.ConfirmSelectAsync();
57+
58+
int batchSize = 100;
59+
int outstandingMessageCount = 0;
60+
61+
var sw = new Stopwatch();
62+
sw.Start();
63+
64+
var publishTasks = new List<Task>();
65+
for (int i = 0; i < MESSAGE_COUNT; i++)
66+
{
67+
byte[] body = Encoding.UTF8.GetBytes(i.ToString());
68+
publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body).AsTask());
69+
outstandingMessageCount++;
70+
71+
if (outstandingMessageCount == batchSize)
72+
{
73+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
74+
await Task.WhenAll(publishTasks).WaitAsync(cts.Token);
75+
publishTasks.Clear();
76+
77+
await channel.WaitForConfirmsOrDieAsync(cts.Token);
78+
outstandingMessageCount = 0;
79+
}
80+
}
81+
82+
if (outstandingMessageCount > 0)
83+
{
84+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
85+
await channel.WaitForConfirmsOrDieAsync(cts.Token);
86+
}
87+
88+
sw.Stop();
89+
Console.WriteLine($"Published {MESSAGE_COUNT:N0} messages in batch in {sw.ElapsedMilliseconds:N0} ms");
90+
}
91+
92+
async Task HandlePublishConfirmsAsynchronously()
93+
{
94+
using IConnection connection = await CreateConnectionAsync();
95+
using IChannel channel = await connection.CreateChannelAsync();
96+
97+
// declare a server-named queue
98+
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
99+
string queueName = queueDeclareResult.QueueName;
100+
await channel.ConfirmSelectAsync();
101+
102+
bool publishingCompleted = false;
103+
var allMessagesConfirmedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
104+
var outstandingConfirms = new LinkedList<ulong>();
105+
var semaphore = new SemaphoreSlim(1, 1);
106+
void CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
107+
{
108+
if (debug)
109+
{
110+
Console.WriteLine("{0} [DEBUG] confirming message: {1} (multiple: {2})",
111+
DateTime.Now, deliveryTag, multiple);
112+
}
113+
114+
semaphore.Wait();
115+
try
116+
{
117+
if (multiple)
118+
{
119+
do
120+
{
121+
LinkedListNode<ulong>? node = outstandingConfirms.First;
122+
if (node is null)
123+
{
124+
break;
125+
}
126+
if (node.Value <= deliveryTag)
127+
{
128+
outstandingConfirms.RemoveFirst();
129+
}
130+
else
131+
{
132+
break;
133+
}
134+
} while (true);
135+
}
136+
else
137+
{
138+
outstandingConfirms.Remove(deliveryTag);
139+
}
140+
}
141+
finally
142+
{
143+
semaphore.Release();
144+
}
145+
146+
if (publishingCompleted && outstandingConfirms.Count == 0)
147+
{
148+
allMessagesConfirmedTcs.SetResult(true);
149+
}
150+
}
151+
152+
channel.BasicAcks += (sender, ea) => CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
153+
channel.BasicNacks += (sender, ea) =>
154+
{
155+
Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number: {ea.DeliveryTag} has been nacked (multiple: {ea.Multiple})");
156+
CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
157+
};
158+
159+
var sw = new Stopwatch();
160+
sw.Start();
161+
162+
var publishTasks = new List<Task>();
163+
for (int i = 0; i < MESSAGE_COUNT; i++)
164+
{
165+
string msg = i.ToString();
166+
byte[] body = Encoding.UTF8.GetBytes(msg);
167+
ulong nextPublishSeqNo = channel.NextPublishSeqNo;
168+
if ((ulong)(i + 1) != nextPublishSeqNo)
169+
{
170+
Console.WriteLine($"{DateTime.Now} [WARNING] i {i + 1} does not equal next sequence number: {nextPublishSeqNo}");
171+
}
172+
await semaphore.WaitAsync();
173+
try
174+
{
175+
outstandingConfirms.AddLast(nextPublishSeqNo);
176+
}
177+
finally
178+
{
179+
semaphore.Release();
180+
}
181+
publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body).AsTask());
182+
}
183+
184+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
185+
await Task.WhenAll(publishTasks).WaitAsync(cts.Token);
186+
publishingCompleted = true;
187+
188+
try
189+
{
190+
await allMessagesConfirmedTcs.Task.WaitAsync(cts.Token);
191+
}
192+
catch (OperationCanceledException)
193+
{
194+
Console.Error.WriteLine("{0} [ERROR] all messages could not be published and confirmed within 10 seconds", DateTime.Now);
195+
}
196+
catch (TimeoutException)
197+
{
198+
Console.Error.WriteLine("{0} [ERROR] all messages could not be published and confirmed within 10 seconds", DateTime.Now);
199+
}
200+
201+
sw.Stop();
202+
Console.WriteLine($"Published {MESSAGE_COUNT:N0} messages and handled confirm asynchronously {sw.ElapsedMilliseconds:N0} ms");
203+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net6.0</TargetFramework>
5+
<NoWarn>$(NoWarn);CA2007</NoWarn>
6+
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
7+
</PropertyGroup>
8+
9+
<PropertyGroup>
10+
<OutputType>Exe</OutputType>
11+
<Nullable>enable</Nullable>
12+
<LangVersion>9.0</LangVersion>
13+
</PropertyGroup>
14+
15+
<ItemGroup>
16+
<ProjectReference Include="../../../RabbitMQ.Client\RabbitMQ.Client.csproj" />
17+
</ItemGroup>
18+
19+
</Project>

0 commit comments

Comments
 (0)