Skip to content

Commit 024565b

Browse files
otemnovpdudnikov
andauthored
Fix RabbitMQ consuming body reusing (#1727)
* Fix rabbitMQ consuming body reusing * ArrayPool for memory management * Rollback to simply array copy --------- Co-authored-by: Pavel Dudnikov <[email protected]>
1 parent 5538ccc commit 024565b

File tree

1 file changed

+5
-2
lines changed

1 file changed

+5
-2
lines changed

src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public override async Task HandleBasicDeliverAsync(string consumerTag, ulong del
4545
string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body,
4646
CancellationToken cancellationToken = default)
4747
{
48+
var safeBody = _usingTaskRun ? body.ToArray() : body;
49+
4850
if (_usingTaskRun)
4951
{
5052
await _semaphore.WaitAsync(cancellationToken);
@@ -73,15 +75,16 @@ Task Consume()
7375

7476
if (_customHeadersBuilder != null)
7577
{
76-
var e = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
78+
var e = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey,
79+
properties, safeBody);
7780
var customHeaders = _customHeadersBuilder(e, _serviceProvider);
7881
foreach (var customHeader in customHeaders)
7982
{
8083
headers[customHeader.Key] = customHeader.Value;
8184
}
8285
}
8386

84-
var message = new TransportMessage(headers, body);
87+
var message = new TransportMessage(headers, safeBody);
8588

8689
return _msgCallback(message, deliveryTag);
8790
}

0 commit comments

Comments
 (0)