Skip to content

Commit 9689eaa

Browse files
committed
1 parent 67454f5 commit 9689eaa

File tree

1 file changed

+122
-99
lines changed

1 file changed

+122
-99
lines changed
Lines changed: 122 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,51 @@
1-
using RabbitMQ.Client;
2-
using RabbitMQ.Client.Exceptions;
1+
using System.Buffers.Binary;
32
using System.Diagnostics;
43
using System.Text;
4+
using RabbitMQ.Client;
55

6-
const int MessageCount = 50_000;
7-
const int MaxOutstandingConfirms = 128;
6+
const ushort MAX_OUTSTANDING_CONFIRMS = 256;
87

9-
var rateLimiter = new ThrottlingRateLimiter(MaxOutstandingConfirms);
10-
var channelOptions = new CreateChannelOptions(
8+
const int MESSAGE_COUNT = 50_000;
9+
bool debug = false;
10+
11+
var channelOpts = new CreateChannelOptions(
1112
publisherConfirmationsEnabled: true,
1213
publisherConfirmationTrackingEnabled: true,
13-
outstandingPublisherConfirmationsRateLimiter: rateLimiter
14+
outstandingPublisherConfirmationsRateLimiter: new ThrottlingRateLimiter(MAX_OUTSTANDING_CONFIRMS)
1415
);
1516

1617
var props = new BasicProperties
1718
{
1819
Persistent = true
1920
};
2021

22+
string hostname = "localhost";
23+
if (args.Length > 0)
24+
{
25+
if (false == string.IsNullOrWhiteSpace(args[0]))
26+
{
27+
hostname = args[0];
28+
}
29+
}
30+
31+
#pragma warning disable CS8321 // Local function is declared but never used
32+
2133
await PublishMessagesIndividuallyAsync();
2234
await PublishMessagesInBatchAsync();
2335
await HandlePublishConfirmsAsynchronously();
2436

25-
static Task<IConnection> CreateConnectionAsync()
37+
Task<IConnection> CreateConnectionAsync()
2638
{
27-
var factory = new ConnectionFactory { HostName = "localhost" };
39+
var factory = new ConnectionFactory { HostName = hostname };
2840
return factory.CreateConnectionAsync();
2941
}
3042

3143
async Task PublishMessagesIndividuallyAsync()
3244
{
33-
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MessageCount:N0} messages individually " +
34-
"and handling confirms individually (i.e., the slowest way)");
45+
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms per-message");
3546

36-
using IConnection connection = await CreateConnectionAsync();
37-
using IChannel channel = await connection.CreateChannelAsync(channelOptions);
47+
await using IConnection connection = await CreateConnectionAsync();
48+
await using IChannel channel = await connection.CreateChannelAsync(channelOpts);
3849

3950
// declare a server-named queue
4051
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
@@ -43,127 +54,112 @@ async Task PublishMessagesIndividuallyAsync()
4354
var sw = new Stopwatch();
4455
sw.Start();
4556

46-
for (int i = 0; i < MessageCount; i++)
57+
for (int i = 0; i < MESSAGE_COUNT; i++)
4758
{
4859
byte[] body = Encoding.UTF8.GetBytes(i.ToString());
4960
try
5061
{
51-
await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body,
52-
mandatory: true, basicProperties: props);
53-
}
54-
catch (PublishException pubEx)
55-
{
56-
Console.Error.WriteLine("{0} [ERROR] publish exception: {1}", DateTime.Now, pubEx);
62+
await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body, basicProperties: props, mandatory: true);
5763
}
5864
catch (Exception ex)
5965
{
60-
Console.Error.WriteLine("{0} [ERROR] other exception: {1}", DateTime.Now, ex);
66+
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: {ex}");
6167
}
6268
}
6369

6470
sw.Stop();
6571

66-
Console.WriteLine($"{DateTime.Now} [INFO] published {MessageCount:N0} messages individually " +
67-
$"in {sw.ElapsedMilliseconds:N0} ms");
72+
Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages individually in {sw.ElapsedMilliseconds:N0} ms");
6873
}
6974

7075
async Task PublishMessagesInBatchAsync()
7176
{
72-
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MessageCount:N0} messages and handling " +
73-
$"confirms in batches");
77+
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms in batches");
7478

75-
using IConnection connection = await CreateConnectionAsync();
76-
using IChannel channel = await connection.CreateChannelAsync(channelOptions);
79+
await using IConnection connection = await CreateConnectionAsync();
80+
await using IChannel channel = await connection.CreateChannelAsync(channelOpts);
7781

7882
// declare a server-named queue
7983
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
8084
string queueName = queueDeclareResult.QueueName;
8185

82-
/*
83-
* Note: since throttling happens when 50% of the outstanding confirms are reached,
84-
* each batch size should not be greater than this value
85-
*/
86-
int batchSize = MaxOutstandingConfirms / 2;
86+
int batchSize = MAX_OUTSTANDING_CONFIRMS / 2;
8787
int outstandingMessageCount = 0;
8888

8989
var sw = new Stopwatch();
9090
sw.Start();
9191

92-
static async Task AwaitPublishTasks(IEnumerable<ValueTask> publishTasks)
93-
{
94-
foreach (ValueTask pt in publishTasks)
95-
{
96-
try
97-
{
98-
await pt;
99-
}
100-
catch (PublishException pubEx)
101-
{
102-
Console.Error.WriteLine("{0} [ERROR] publish exception: {1}", DateTime.Now, pubEx);
103-
}
104-
catch (Exception ex)
105-
{
106-
Console.Error.WriteLine("{0} [ERROR] other exception: {1}", DateTime.Now, ex);
107-
}
108-
}
109-
}
110-
11192
var publishTasks = new List<ValueTask>();
112-
for (int i = 0; i < MessageCount; i++)
93+
for (int i = 0; i < MESSAGE_COUNT; i++)
11394
{
11495
byte[] body = Encoding.UTF8.GetBytes(i.ToString());
115-
116-
var pt0 = channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body,
117-
mandatory: true, basicProperties: props);
118-
publishTasks.Add(pt0);
119-
96+
publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body, mandatory: true, basicProperties: props));
12097
outstandingMessageCount++;
12198

12299
if (outstandingMessageCount == batchSize)
123100
{
124-
await AwaitPublishTasks(publishTasks);
101+
foreach (ValueTask pt in publishTasks)
102+
{
103+
try
104+
{
105+
await pt;
106+
}
107+
catch (Exception ex)
108+
{
109+
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'");
110+
}
111+
}
125112
publishTasks.Clear();
126113
outstandingMessageCount = 0;
127114
}
128115
}
129116

130117
if (publishTasks.Count > 0)
131118
{
132-
await AwaitPublishTasks(publishTasks);
119+
foreach (ValueTask pt in publishTasks)
120+
{
121+
try
122+
{
123+
await pt;
124+
}
125+
catch (Exception ex)
126+
{
127+
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'");
128+
}
129+
}
130+
publishTasks.Clear();
131+
outstandingMessageCount = 0;
133132
}
134133

135134
sw.Stop();
136-
Console.WriteLine($"{DateTime.Now} [INFO] published {MessageCount:N0} messages in batch in " +
137-
$"{sw.ElapsedMilliseconds:N0} ms");
135+
Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages in batch in {sw.ElapsedMilliseconds:N0} ms");
138136
}
139137

140138
async Task HandlePublishConfirmsAsynchronously()
141139
{
142-
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MessageCount:N0} messages and " +
143-
$"handling confirms asynchronously");
140+
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms asynchronously");
144141

145-
// NOTE: setting publisherConfirmationTrackingEnabled
146-
// to false because this method is tracking them itself.
147-
channelOptions = new CreateChannelOptions(
148-
publisherConfirmationsEnabled: true,
149-
publisherConfirmationTrackingEnabled: false,
150-
outstandingPublisherConfirmationsRateLimiter: null
151-
);
142+
await using IConnection connection = await CreateConnectionAsync();
152143

153-
using IConnection connection = await CreateConnectionAsync();
154-
using IChannel channel = await connection.CreateChannelAsync(channelOptions);
144+
channelOpts = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: false);
145+
await using IChannel channel = await connection.CreateChannelAsync(channelOpts);
155146

156147
// declare a server-named queue
157148
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
158149
string queueName = queueDeclareResult.QueueName;
159150

160-
bool publishingCompleted = false;
161-
var allMessagesConfirmedTcs =
162-
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
151+
var allMessagesConfirmedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
163152
var outstandingConfirms = new LinkedList<ulong>();
164153
var semaphore = new SemaphoreSlim(1, 1);
154+
int confirmedCount = 0;
165155
async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
166156
{
157+
if (debug)
158+
{
159+
Console.WriteLine("{0} [DEBUG] confirming message: {1} (multiple: {2})",
160+
DateTime.Now, deliveryTag, multiple);
161+
}
162+
167163
await semaphore.WaitAsync();
168164
try
169165
{
@@ -184,10 +180,13 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
184180
{
185181
break;
186182
}
183+
184+
confirmedCount++;
187185
} while (true);
188186
}
189187
else
190188
{
189+
confirmedCount++;
191190
outstandingConfirms.Remove(deliveryTag);
192191
}
193192
}
@@ -196,35 +195,49 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
196195
semaphore.Release();
197196
}
198197

199-
if (publishingCompleted && outstandingConfirms.Count == 0)
198+
if (outstandingConfirms.Count == 0 || confirmedCount == MESSAGE_COUNT)
200199
{
201200
allMessagesConfirmedTcs.SetResult(true);
202201
}
203202
}
204203

205-
channel.BasicAcksAsync += (sender, ea) =>
206-
CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
204+
channel.BasicReturnAsync += (sender, ea) =>
205+
{
206+
ulong sequenceNumber = 0;
207+
208+
IReadOnlyBasicProperties props = ea.BasicProperties;
209+
if (props.Headers is not null)
210+
{
211+
object? maybeSeqNum = props.Headers[Constants.PublishSequenceNumberHeader];
212+
if (maybeSeqNum is not null)
213+
{
214+
sequenceNumber = BinaryPrimitives.ReadUInt64BigEndian((byte[])maybeSeqNum);
215+
}
216+
}
207217

218+
Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number {sequenceNumber} has been basic.return-ed");
219+
return CleanOutstandingConfirms(sequenceNumber, false);
220+
};
221+
222+
channel.BasicAcksAsync += (sender, ea) => CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
208223
channel.BasicNacksAsync += (sender, ea) =>
209224
{
210-
Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number: {ea.DeliveryTag} " +
211-
$"has been nacked (multiple: {ea.Multiple})");
225+
Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number: {ea.DeliveryTag} has been nacked (multiple: {ea.Multiple})");
212226
return CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
213227
};
214228

215229
var sw = new Stopwatch();
216230
sw.Start();
217231

218-
var publishTasks = new List<ValueTask>();
219-
for (int i = 0; i < MessageCount; i++)
232+
var publishTasks = new List<ValueTuple<ulong, ValueTask>>();
233+
for (int i = 0; i < MESSAGE_COUNT; i++)
220234
{
221235
string msg = i.ToString();
222236
byte[] body = Encoding.UTF8.GetBytes(msg);
223237
ulong nextPublishSeqNo = await channel.GetNextPublishSequenceNumberAsync();
224238
if ((ulong)(i + 1) != nextPublishSeqNo)
225239
{
226-
Console.WriteLine($"{DateTime.Now} [WARNING] i {i + 1} does not equal next sequence " +
227-
$"number: {nextPublishSeqNo}");
240+
Console.WriteLine($"{DateTime.Now} [WARNING] i {i + 1} does not equal next sequence number: {nextPublishSeqNo}");
228241
}
229242
await semaphore.WaitAsync();
230243
try
@@ -235,35 +248,45 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
235248
{
236249
semaphore.Release();
237250
}
238-
var pt = channel.BasicPublishAsync(exchange: string.Empty,
239-
routingKey: queueName, body: body);
240-
publishTasks.Add(pt);
251+
252+
string rk = queueName;
253+
if (i % 1000 == 0)
254+
{
255+
// This will cause a basic.return, for fun
256+
rk = Guid.NewGuid().ToString();
257+
}
258+
(ulong, ValueTask) data =
259+
(nextPublishSeqNo, channel.BasicPublishAsync(exchange: string.Empty, routingKey: rk, body: body, mandatory: true, basicProperties: props));
260+
publishTasks.Add(data);
241261
}
242262

243263
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
244-
245-
try
264+
// await Task.WhenAll(publishTasks).WaitAsync(cts.Token);
265+
foreach ((ulong SeqNo, ValueTask PublishTask) datum in publishTasks)
246266
{
247-
foreach (ValueTask pt in publishTasks)
267+
try
268+
{
269+
await datum.PublishTask;
270+
}
271+
catch (Exception ex)
248272
{
249-
await pt;
250-
cts.Token.ThrowIfCancellationRequested();
273+
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack, seqNo: '{datum.SeqNo}', ex: '{ex}'");
251274
}
252-
publishingCompleted = true;
275+
}
276+
277+
try
278+
{
253279
await allMessagesConfirmedTcs.Task.WaitAsync(cts.Token);
254280
}
255281
catch (OperationCanceledException)
256282
{
257-
Console.Error.WriteLine("{0} [ERROR] all messages could not be published and confirmed " +
258-
"within 10 seconds", DateTime.Now);
283+
Console.Error.WriteLine("{0} [ERROR] all messages could not be published and confirmed within 10 seconds", DateTime.Now);
259284
}
260285
catch (TimeoutException)
261286
{
262-
Console.Error.WriteLine("{0} [ERROR] all messages could not be published and confirmed " +
263-
"within 10 seconds", DateTime.Now);
287+
Console.Error.WriteLine("{0} [ERROR] all messages could not be published and confirmed within 10 seconds", DateTime.Now);
264288
}
265289

266290
sw.Stop();
267-
Console.WriteLine($"{DateTime.Now} [INFO] published {MessageCount:N0} messages and handled " +
268-
$"confirm asynchronously {sw.ElapsedMilliseconds:N0} ms");
291+
Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages and handled confirm asynchronously {sw.ElapsedMilliseconds:N0} ms");
269292
}

0 commit comments

Comments
 (0)