Skip to content

Commit 5a43710

Browse files
authored
Fix RabbitMQ consumer concurrency by ensuring safe body handling in asynchronous processing (#1767)
1 parent 6b61df5 commit 5a43710

File tree

1 file changed

+28
-28
lines changed

1 file changed

+28
-28
lines changed

src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -48,46 +48,46 @@ public override async Task HandleBasicDeliverAsync(string consumerTag, ulong del
4848
if (_usingTaskRun)
4949
{
5050
await _semaphore.WaitAsync(cancellationToken);
51-
52-
var safeBody = body.ToArray();
53-
54-
_ = Task.Run(Consume, cancellationToken).ConfigureAwait(false);
51+
// Copy of the body safe to use outside the RabbitMQ thread context
52+
ReadOnlyMemory<byte> safeBody = body.ToArray();
53+
_ = Task.Run(() => Consume(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, safeBody), cancellationToken).ConfigureAwait(false);
5554
}
5655
else
5756
{
58-
await Consume().ConfigureAwait(false);
57+
await Consume(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body).ConfigureAwait(false);
5958
}
59+
}
6060

61-
Task Consume()
62-
{
63-
var headers = new Dictionary<string, string?>();
61+
private Task Consume(string consumerTag, ulong deliveryTag, bool redelivered, string exchange,
62+
string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
63+
{
64+
var headers = new Dictionary<string, string?>();
6465

65-
if (properties.Headers != null)
66-
foreach (var header in properties.Headers)
67-
{
68-
if (header.Value is byte[] val)
69-
headers.Add(header.Key, Encoding.UTF8.GetString(val));
70-
else
71-
headers.Add(header.Key, header.Value?.ToString());
72-
}
66+
if (properties.Headers != null)
67+
foreach (var header in properties.Headers)
68+
{
69+
if (header.Value is byte[] val)
70+
headers.Add(header.Key, Encoding.UTF8.GetString(val));
71+
else
72+
headers.Add(header.Key, header.Value?.ToString());
73+
}
7374

74-
headers[Messages.Headers.Group] = _groupName;
75+
headers[Messages.Headers.Group] = _groupName;
7576

76-
if (_customHeadersBuilder != null)
77+
if (_customHeadersBuilder != null)
78+
{
79+
var e = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey,
80+
properties, body);
81+
var customHeaders = _customHeadersBuilder(e, _serviceProvider);
82+
foreach (var customHeader in customHeaders)
7783
{
78-
var e = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey,
79-
properties, body);
80-
var customHeaders = _customHeadersBuilder(e, _serviceProvider);
81-
foreach (var customHeader in customHeaders)
82-
{
83-
headers[customHeader.Key] = customHeader.Value;
84-
}
84+
headers[customHeader.Key] = customHeader.Value;
8585
}
86+
}
8687

87-
var message = new TransportMessage(headers, body);
88+
var message = new TransportMessage(headers, body);
8889

89-
return _msgCallback(message, deliveryTag);
90-
}
90+
return _msgCallback(message, deliveryTag);
9191
}
9292

9393
public async Task BasicAck(ulong deliveryTag)

0 commit comments

Comments
 (0)