Skip to content

Commit 366fb9b

Browse files
stebetlukebakken
authored andcommitted
Fixing stuff.
1 parent c17c33a commit 366fb9b

File tree

3 files changed

+23
-5
lines changed

3 files changed

+23
-5
lines changed

projects/RabbitMQ.Client/client/impl/AsyncConsumerDispatcher.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Buffers;
23
using System.Threading.Tasks;
34

45
namespace RabbitMQ.Client.Impl
@@ -46,7 +47,9 @@ public void HandleBasicDeliver(IBasicConsumer consumer,
4647
IBasicProperties basicProperties,
4748
ReadOnlyMemory<byte> body)
4849
{
49-
ScheduleUnlessShuttingDown(new BasicDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body));
50+
IMemoryOwner<byte> bodyCopy = MemoryPool<byte>.Shared.Rent(body.Length);
51+
body.CopyTo(bodyCopy.Memory);
52+
ScheduleUnlessShuttingDown(new BasicDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, bodyCopy, body.Length));
5053
}
5154

5255
public void HandleBasicCancelOk(IBasicConsumer consumer, string consumerTag)

projects/RabbitMQ.Client/client/impl/BasicDeliver.cs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Buffers;
23
using System.Collections.Generic;
34
using System.Threading.Tasks;
45

@@ -14,7 +15,8 @@ sealed class BasicDeliver : Work
1415
readonly string _exchange;
1516
readonly string _routingKey;
1617
readonly IBasicProperties _basicProperties;
17-
readonly ReadOnlyMemory<byte> _body;
18+
readonly IMemoryOwner<byte> _body;
19+
readonly int _bodyLength;
1820

1921
public BasicDeliver(IBasicConsumer consumer,
2022
string consumerTag,
@@ -23,7 +25,8 @@ public BasicDeliver(IBasicConsumer consumer,
2325
string exchange,
2426
string routingKey,
2527
IBasicProperties basicProperties,
26-
ReadOnlyMemory<byte> body) : base(consumer)
28+
IMemoryOwner<byte> body,
29+
int bodyLength) : base(consumer)
2730
{
2831
_consumerTag = consumerTag;
2932
_deliveryTag = deliveryTag;
@@ -32,6 +35,7 @@ public BasicDeliver(IBasicConsumer consumer,
3235
_routingKey = routingKey;
3336
_basicProperties = basicProperties;
3437
_body = body;
38+
_bodyLength = bodyLength;
3539
}
3640

3741
protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
@@ -44,7 +48,7 @@ await consumer.HandleBasicDeliver(_consumerTag,
4448
_exchange,
4549
_routingKey,
4650
_basicProperties,
47-
_body).ConfigureAwait(false);
51+
_body.Memory.Slice(0, _bodyLength)).ConfigureAwait(false);
4852
}
4953
catch (Exception e)
5054
{
@@ -55,6 +59,10 @@ await consumer.HandleBasicDeliver(_consumerTag,
5559
};
5660
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
5761
}
62+
finally
63+
{
64+
_body.Dispose();
65+
}
5866
}
5967
}
6068
}

projects/RabbitMQ.Client/client/impl/ConcurrentConsumerDispatcher.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Buffers;
23
using System.Collections.Generic;
34
using System.Threading.Tasks;
45
using RabbitMQ.Client.Events;
@@ -63,6 +64,8 @@ public void HandleBasicDeliver(IBasicConsumer consumer,
6364
IBasicProperties basicProperties,
6465
ReadOnlyMemory<byte> body)
6566
{
67+
IMemoryOwner<byte> memoryCopy = MemoryPool<byte>.Shared.Rent(body.Length);
68+
body.CopyTo(memoryCopy.Memory);
6669
UnlessShuttingDown(() =>
6770
{
6871
try
@@ -73,7 +76,7 @@ public void HandleBasicDeliver(IBasicConsumer consumer,
7376
exchange,
7477
routingKey,
7578
basicProperties,
76-
body);
79+
memoryCopy.Memory.Slice(0, body.Length));
7780
}
7881
catch (Exception e)
7982
{
@@ -84,6 +87,10 @@ public void HandleBasicDeliver(IBasicConsumer consumer,
8487
};
8588
_model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
8689
}
90+
finally
91+
{
92+
memoryCopy.Dispose();
93+
}
8794
});
8895
}
8996

0 commit comments

Comments
 (0)