Skip to content

Commit 60440d6

Browse files
authored
RabbitMQ pipeline improvements (#824)
## Motivation and Context (Why the change? What's the scenario?) Allow to configure the RabbitMQ pipeline threads usage and the delay between failures. Add "KernelMemory" application name to RabbitMQ logs. ## High level description (Approach, Design) New configuration settings: * ConcurrentThreads: how many threads to use for each queue to process messages. * PrefetchCount: how many messages to prefetch from each queue. The value is recommended to be greater than ConcurrentThreads. * DelayBeforeRetryingMsecs: how long to wait before retrying to process a message that failed. The delay is applied in the client code, not using delayed exchanges.
1 parent 8f9a165 commit 60440d6

File tree

5 files changed

+91
-14
lines changed

5 files changed

+91
-14
lines changed

extensions/RabbitMQ/RabbitMQ.TestApplication/Program.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,12 @@ public static async Task Main()
4545

4646
ListenToDeadLetterQueue(rabbitMQConfig);
4747

48-
await pipeline.EnqueueAsync($"test {DateTimeOffset.Now:T}");
48+
// Change ConcurrentThreads and PrefetchCount to 1 to see
49+
// how they affect total execution time
50+
for (int i = 1; i <= 3; i++)
51+
{
52+
await pipeline.EnqueueAsync($"test #{i} {DateTimeOffset.Now:T}");
53+
}
4954

5055
while (true)
5156
{

extensions/RabbitMQ/RabbitMQ.TestApplication/appsettings.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,14 @@
1414
"VirtualHost": "/",
1515
"MessageTTLSecs": 3600,
1616
"SslEnabled": false,
17+
// How many messages to process asynchronously at a time, in each queue
18+
"ConcurrentThreads": 3,
19+
// How many messages to fetch at a time from each queue
20+
// The value should be higher than ConcurrentThreads
21+
"PrefetchCount": 6,
1722
// How many times to dequeue a messages and process before moving it to a poison queue
1823
"MaxRetriesBeforePoisonQueue": 5,
24+
"DelayBeforeRetryingMsecs": 750,
1925
// Suffix used for the poison queues.
2026
"PoisonQueueSuffix": "-poison"
2127
}

extensions/RabbitMQ/RabbitMQ/RabbitMQConfig.cs

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) Microsoft. All rights reserved.
22

33
using System.Text;
4+
using Microsoft.Extensions.Logging;
45

56
#pragma warning disable IDE0130 // reduce number of "using" statements
67
// ReSharper disable once CheckNamespace - reduce number of "using" statements
@@ -16,7 +17,7 @@ public class RabbitMQConfig
1617
/// <summary>
1718
/// TCP port for the connection, e.g. 5672
1819
/// </summary>
19-
public int Port { get; set; } = 0;
20+
public int Port { get; set; } = 5672;
2021

2122
/// <summary>
2223
/// Authentication username
@@ -46,6 +47,22 @@ public class RabbitMQConfig
4647
/// </summary>
4748
public bool SslEnabled { get; set; } = false;
4849

50+
/// <summary>
51+
/// How many messages to process asynchronously at a time, in each queue.
52+
/// Note that this applies to each queue, and each queue is used
53+
/// for a specific pipeline step.
54+
/// </summary>
55+
public ushort ConcurrentThreads { get; set; } = 2;
56+
57+
/// <summary>
58+
/// How many messages to fetch at a time from each queue.
59+
/// The value should be higher than ConcurrentThreads to make sure each
60+
/// thread has some work to do.
61+
/// Note that this applies to each queue, and each queue is used
62+
/// for a specific pipeline step.
63+
/// </summary>
64+
public ushort PrefetchCount { get; set; } = 3;
65+
4966
/// <summary>
5067
/// How many times to retry processing a message before moving it to a poison queue.
5168
/// Example: a value of 20 means that a message will be processed up to 21 times.
@@ -54,6 +71,14 @@ public class RabbitMQConfig
5471
/// </summary>
5572
public int MaxRetriesBeforePoisonQueue { get; set; } = 20;
5673

74+
/// <summary>
75+
/// How long to wait before putting a message back to the queue in case of failure.
76+
/// Note: currently a basic strategy not based on RabbitMQ exchanges, potentially
77+
/// affecting the pipeline concurrency performance: consumers hold
78+
/// messages for N msecs, slowing down the delivery of other messages.
79+
/// </summary>
80+
public int DelayBeforeRetryingMsecs { get; set; } = 500;
81+
5782
/// <summary>
5883
/// Suffix used for the poison queues.
5984
/// </summary>
@@ -62,7 +87,7 @@ public class RabbitMQConfig
6287
/// <summary>
6388
/// Verify that the current state is valid.
6489
/// </summary>
65-
public void Validate()
90+
public void Validate(ILogger? log = null)
6691
{
6792
const int MinTTLSecs = 5;
6893

@@ -81,6 +106,11 @@ public void Validate()
81106
throw new ConfigurationException($"RabbitMQ: {nameof(this.MessageTTLSecs)} value {this.MessageTTLSecs} is too low, cannot be less than {MinTTLSecs}");
82107
}
83108

109+
if (this.ConcurrentThreads < 1)
110+
{
111+
throw new ConfigurationException($"RabbitMQ: {nameof(this.ConcurrentThreads)} value cannot be less than 1");
112+
}
113+
84114
if (string.IsNullOrWhiteSpace(this.PoisonQueueSuffix) || this.PoisonQueueSuffix != $"{this.PoisonQueueSuffix}".Trim())
85115
{
86116
throw new ConfigurationException($"RabbitMQ: {nameof(this.PoisonQueueSuffix)} cannot be empty or have leading or trailing spaces");
@@ -102,5 +132,12 @@ public void Validate()
102132
{
103133
throw new ConfigurationException($"RabbitMQ: {nameof(this.PoisonQueueSuffix)} can be up to 60 characters length");
104134
}
135+
136+
#pragma warning disable CA2254
137+
if (this.PrefetchCount < this.ConcurrentThreads)
138+
{
139+
log?.LogWarning($"The value of {nameof(this.PrefetchCount)} ({this.PrefetchCount}) should not be lower than the value of {nameof(this.ConcurrentThreads)} ({this.ConcurrentThreads})");
140+
}
141+
#pragma warning restore CA2254
105142
}
106143
}

extensions/RabbitMQ/RabbitMQ/RabbitMQPipeline.cs

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ public sealed class RabbitMQPipeline : IQueue
2424
private readonly AsyncEventingBasicConsumer _consumer;
2525
private readonly RabbitMQConfig _config;
2626
private readonly int _messageTTLMsecs;
27+
private readonly int _delayBeforeRetryingMsecs;
28+
private readonly int _maxAttempts;
2729
private string _queueName = string.Empty;
2830
private string _poisonQueueName = string.Empty;
2931

@@ -32,20 +34,22 @@ public sealed class RabbitMQPipeline : IQueue
3234
/// </summary>
3335
public RabbitMQPipeline(RabbitMQConfig config, ILoggerFactory? loggerFactory = null)
3436
{
35-
this._config = config;
36-
this._config.Validate();
37-
3837
this._log = (loggerFactory ?? DefaultLogger.Factory).CreateLogger<RabbitMQPipeline>();
3938

39+
this._config = config;
40+
this._config.Validate(this._log);
41+
4042
// see https://www.rabbitmq.com/dotnet-api-guide.html#consuming-async
4143
var factory = new ConnectionFactory
4244
{
45+
ClientProvidedName = "KernelMemory",
4346
HostName = config.Host,
4447
Port = config.Port,
4548
UserName = config.Username,
4649
Password = config.Password,
4750
VirtualHost = !string.IsNullOrWhiteSpace(config.VirtualHost) ? config.VirtualHost : "/",
4851
DispatchConsumersAsync = true,
52+
ConsumerDispatchConcurrency = config.ConcurrentThreads,
4953
Ssl = new SslOption
5054
{
5155
Enabled = config.SslEnabled,
@@ -56,8 +60,11 @@ public RabbitMQPipeline(RabbitMQConfig config, ILoggerFactory? loggerFactory = n
5660
this._messageTTLMsecs = config.MessageTTLSecs * 1000;
5761
this._connection = factory.CreateConnection();
5862
this._channel = this._connection.CreateModel();
59-
this._channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
63+
this._channel.BasicQos(prefetchSize: 0, prefetchCount: config.PrefetchCount, global: false);
6064
this._consumer = new AsyncEventingBasicConsumer(this._channel);
65+
66+
this._delayBeforeRetryingMsecs = Math.Max(0, this._config.DelayBeforeRetryingMsecs);
67+
this._maxAttempts = Math.Max(0, this._config.MaxRetriesBeforePoisonQueue) + 1;
6168
}
6269

6370
/// <inheritdoc />
@@ -171,7 +178,6 @@ public void OnDequeue(Func<string, Task<bool>> processMessageAction)
171178
this._consumer.Received += async (object sender, BasicDeliverEventArgs args) =>
172179
{
173180
// Just for logging, extract the attempt number from the message headers
174-
var maxAttempts = this._config.MaxRetriesBeforePoisonQueue + 1;
175181
var attemptNumber = 1;
176182
if (args.BasicProperties?.Headers != null && args.BasicProperties.Headers.TryGetValue("x-delivery-count", out object? value))
177183
{
@@ -181,7 +187,7 @@ public void OnDequeue(Func<string, Task<bool>> processMessageAction)
181187
try
182188
{
183189
this._log.LogDebug("Message '{0}' received, expires after {1}ms, attempt {2} of {3}",
184-
args.BasicProperties?.MessageId, args.BasicProperties?.Expiration, attemptNumber, maxAttempts);
190+
args.BasicProperties?.MessageId, args.BasicProperties?.Expiration, attemptNumber, this._maxAttempts);
185191

186192
byte[] body = args.Body.ToArray();
187193
string message = Encoding.UTF8.GetString(body);
@@ -194,15 +200,19 @@ public void OnDequeue(Func<string, Task<bool>> processMessageAction)
194200
}
195201
else
196202
{
197-
if (attemptNumber < maxAttempts)
203+
if (attemptNumber < this._maxAttempts)
198204
{
199205
this._log.LogWarning("Message '{0}' failed to process (attempt {1} of {2}), putting message back in the queue",
200-
args.BasicProperties?.MessageId, attemptNumber, maxAttempts);
206+
args.BasicProperties?.MessageId, attemptNumber, this._maxAttempts);
207+
if (this._delayBeforeRetryingMsecs > 0)
208+
{
209+
await Task.Delay(TimeSpan.FromMilliseconds(this._delayBeforeRetryingMsecs)).ConfigureAwait(false);
210+
}
201211
}
202212
else
203213
{
204214
this._log.LogError("Message '{0}' failed to process (attempt {1} of {2}), moving message to dead letter queue",
205-
args.BasicProperties?.MessageId, attemptNumber, maxAttempts);
215+
args.BasicProperties?.MessageId, attemptNumber, this._maxAttempts);
206216
}
207217

208218
// Note: if "requeue == false" the message would be moved to the dead letter exchange
@@ -217,8 +227,20 @@ public void OnDequeue(Func<string, Task<bool>> processMessageAction)
217227
// - failed to delete message from queue
218228
// - failed to unlock message in the queue
219229

220-
this._log.LogWarning(e, "Message '{0}' processing failed with exception (attempt {1} of {2}), putting message back in the queue",
221-
args.BasicProperties?.MessageId, attemptNumber, maxAttempts);
230+
if (attemptNumber < this._maxAttempts)
231+
{
232+
this._log.LogWarning(e, "Message '{0}' processing failed with exception (attempt {1} of {2}), putting message back in the queue",
233+
args.BasicProperties?.MessageId, attemptNumber, this._maxAttempts);
234+
if (this._delayBeforeRetryingMsecs > 0)
235+
{
236+
await Task.Delay(TimeSpan.FromMilliseconds(this._delayBeforeRetryingMsecs)).ConfigureAwait(false);
237+
}
238+
}
239+
else
240+
{
241+
this._log.LogError(e, "Message '{0}' processing failed with exception (attempt {1} of {2}), putting message back in the queue",
242+
args.BasicProperties?.MessageId, attemptNumber, this._maxAttempts);
243+
}
222244

223245
// TODO: verify and document what happens if this fails. RabbitMQ should automatically unlock messages.
224246
// Note: if "requeue == false" the message would be moved to the dead letter exchange

service/Service/appsettings.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,10 +539,17 @@
539539
"VirtualHost": "/",
540540
"MessageTTLSecs": 3600,
541541
"SslEnabled": false,
542+
// How many messages to process asynchronously at a time, in each queue
543+
"ConcurrentThreads": 4,
544+
// How many messages to fetch at a time from each queue
545+
// The value should be higher than ConcurrentThreads
546+
"PrefetchCount": 8,
542547
// How many times to dequeue a messages and process before moving it to a poison queue
543548
// Note: this value cannot be changed after queues have been created. In such case
544549
// you might need to drain all queues, delete them, and restart the ingestion service(s).
545550
"MaxRetriesBeforePoisonQueue": 20,
551+
// How long to wait before putting a message back to the queue in case of failure
552+
"DelayBeforeRetryingMsecs": 500,
546553
// Suffix used for the poison queues.
547554
"PoisonQueueSuffix": "-poison"
548555
},

0 commit comments

Comments
 (0)