Skip to content

Commit c10fcf2

Browse files
authored
Merge pull request #549 from jehrenzweig-captionhealth/je/strategy_2_publishing_messages_in_batches_20250205
Refactored PublisherConfirms.cs
2 parents dc2fe75 + f8095d8 commit c10fcf2

File tree

1 file changed

+18
-29
lines changed

1 file changed

+18
-29
lines changed

dotnet/PublisherConfirms/PublisherConfirms.cs

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
using System.Buffers.Binary;
1+
using RabbitMQ.Client;
2+
using System.Buffers.Binary;
23
using System.Diagnostics;
34
using System.Text;
4-
using RabbitMQ.Client;
55

66
const ushort MAX_OUTSTANDING_CONFIRMS = 256;
77

@@ -83,38 +83,31 @@ async Task PublishMessagesInBatchAsync()
8383
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
8484
string queueName = queueDeclareResult.QueueName;
8585

86-
int batchSize = MAX_OUTSTANDING_CONFIRMS / 2;
87-
int outstandingMessageCount = 0;
86+
int batchSize = Math.Max(1, MAX_OUTSTANDING_CONFIRMS / 2);
8887

89-
var sw = new Stopwatch();
90-
sw.Start();
88+
var sw = Stopwatch.StartNew();
9189

9290
var publishTasks = new List<ValueTask>();
9391
for (int i = 0; i < MESSAGE_COUNT; i++)
9492
{
9593
byte[] body = Encoding.UTF8.GetBytes(i.ToString());
96-
publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body, mandatory: true, basicProperties: props));
97-
outstandingMessageCount++;
94+
ValueTask publishTask = channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body, mandatory: true, basicProperties: props);
95+
publishTasks.Add(publishTask);
9896

99-
if (outstandingMessageCount == batchSize)
100-
{
101-
foreach (ValueTask pt in publishTasks)
102-
{
103-
try
104-
{
105-
await pt;
106-
}
107-
catch (Exception ex)
108-
{
109-
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'");
110-
}
111-
}
112-
publishTasks.Clear();
113-
outstandingMessageCount = 0;
114-
}
97+
await MaybeAwaitPublishes(publishTasks, batchSize);
11598
}
11699

117-
if (publishTasks.Count > 0)
100+
// Await any remaining tasks in case message count was not
101+
// evenly divisible by batch size.
102+
await MaybeAwaitPublishes(publishTasks, 0);
103+
104+
sw.Stop();
105+
Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages in batch in {sw.ElapsedMilliseconds:N0} ms");
106+
}
107+
108+
static async Task MaybeAwaitPublishes(List<ValueTask> publishTasks, int batchSize)
109+
{
110+
if (publishTasks.Count >= batchSize)
118111
{
119112
foreach (ValueTask pt in publishTasks)
120113
{
@@ -128,11 +121,7 @@ async Task PublishMessagesInBatchAsync()
128121
}
129122
}
130123
publishTasks.Clear();
131-
outstandingMessageCount = 0;
132124
}
133-
134-
sw.Stop();
135-
Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages in batch in {sw.ElapsedMilliseconds:N0} ms");
136125
}
137126

138127
async Task HandlePublishConfirmsAsynchronously()

0 commit comments

Comments
 (0)