Skip to content

Commit 9404246

Browse files
Merge pull request #804 from rabbitmq/rabbitmq-dotnet-client-802
Fix issue #802
2 parents 337f96c + aeb1dc9 commit 9404246

File tree

6 files changed

+120
-38
lines changed

6 files changed

+120
-38
lines changed

projects/RabbitMQ.Client/client/impl/AsyncConsumerDispatcher.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Buffers;
23
using System.Threading.Tasks;
34

45
namespace RabbitMQ.Client.Impl
@@ -46,7 +47,9 @@ public void HandleBasicDeliver(IBasicConsumer consumer,
4647
IBasicProperties basicProperties,
4748
ReadOnlyMemory<byte> body)
4849
{
49-
ScheduleUnlessShuttingDown(new BasicDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body));
50+
IMemoryOwner<byte> bodyCopy = MemoryPool<byte>.Shared.Rent(body.Length);
51+
body.CopyTo(bodyCopy.Memory);
52+
ScheduleUnlessShuttingDown(new BasicDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, bodyCopy, body.Length));
5053
}
5154

5255
public void HandleBasicCancelOk(IBasicConsumer consumer, string consumerTag)

projects/RabbitMQ.Client/client/impl/BasicDeliver.cs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Buffers;
23
using System.Collections.Generic;
34
using System.Threading.Tasks;
45

@@ -14,7 +15,8 @@ sealed class BasicDeliver : Work
1415
readonly string _exchange;
1516
readonly string _routingKey;
1617
readonly IBasicProperties _basicProperties;
17-
readonly ReadOnlyMemory<byte> _body;
18+
readonly IMemoryOwner<byte> _body;
19+
readonly int _bodyLength;
1820

1921
public BasicDeliver(IBasicConsumer consumer,
2022
string consumerTag,
@@ -23,7 +25,8 @@ public BasicDeliver(IBasicConsumer consumer,
2325
string exchange,
2426
string routingKey,
2527
IBasicProperties basicProperties,
26-
ReadOnlyMemory<byte> body) : base(consumer)
28+
IMemoryOwner<byte> body,
29+
int bodyLength) : base(consumer)
2730
{
2831
_consumerTag = consumerTag;
2932
_deliveryTag = deliveryTag;
@@ -32,6 +35,7 @@ public BasicDeliver(IBasicConsumer consumer,
3235
_routingKey = routingKey;
3336
_basicProperties = basicProperties;
3437
_body = body;
38+
_bodyLength = bodyLength;
3539
}
3640

3741
protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
@@ -44,7 +48,7 @@ await consumer.HandleBasicDeliver(_consumerTag,
4448
_exchange,
4549
_routingKey,
4650
_basicProperties,
47-
_body).ConfigureAwait(false);
51+
_body.Memory.Slice(0, _bodyLength)).ConfigureAwait(false);
4852
}
4953
catch (Exception e)
5054
{
@@ -55,6 +59,10 @@ await consumer.HandleBasicDeliver(_consumerTag,
5559
};
5660
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
5761
}
62+
finally
63+
{
64+
_body.Dispose();
65+
}
5866
}
5967
}
6068
}

projects/RabbitMQ.Client/client/impl/ConcurrentConsumerDispatcher.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Buffers;
23
using System.Collections.Generic;
34
using System.Threading.Tasks;
45
using RabbitMQ.Client.Events;
@@ -63,6 +64,8 @@ public void HandleBasicDeliver(IBasicConsumer consumer,
6364
IBasicProperties basicProperties,
6465
ReadOnlyMemory<byte> body)
6566
{
67+
IMemoryOwner<byte> memoryCopy = MemoryPool<byte>.Shared.Rent(body.Length);
68+
body.CopyTo(memoryCopy.Memory);
6669
UnlessShuttingDown(() =>
6770
{
6871
try
@@ -73,7 +76,7 @@ public void HandleBasicDeliver(IBasicConsumer consumer,
7376
exchange,
7477
routingKey,
7578
basicProperties,
76-
body);
79+
memoryCopy.Memory.Slice(0, body.Length));
7780
}
7881
catch (Exception e)
7982
{
@@ -84,6 +87,10 @@ public void HandleBasicDeliver(IBasicConsumer consumer,
8487
};
8588
_model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
8689
}
90+
finally
91+
{
92+
memoryCopy.Dispose();
93+
}
8794
});
8895
}
8996

projects/Unit/TestBasicPublish.cs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,12 @@ public void TestBasicRoundtripArray()
3030
};
3131
string tag = m.BasicConsume(q.QueueName, true, consumer);
3232

33-
3433
m.BasicPublish("", q.QueueName, bp, sendBody);
3534
bool waitResFalse = are.WaitOne(2000);
3635
m.BasicCancel(tag);
3736

38-
3937
Assert.IsTrue(waitResFalse);
40-
Assert.AreEqual(sendBody, consumeBody);
38+
CollectionAssert.AreEqual(sendBody, consumeBody);
4139
}
4240
}
4341

@@ -62,14 +60,12 @@ public void TestBasicRoundtripReadOnlyMemory()
6260
};
6361
string tag = m.BasicConsume(q.QueueName, true, consumer);
6462

65-
6663
m.BasicPublish("", q.QueueName, bp, new ReadOnlyMemory<byte>(sendBody));
6764
bool waitResFalse = are.WaitOne(2000);
6865
m.BasicCancel(tag);
6966

70-
7167
Assert.IsTrue(waitResFalse);
72-
Assert.AreEqual(sendBody, consumeBody);
68+
CollectionAssert.AreEqual(sendBody, consumeBody);
7369
}
7470
}
7571
}

projects/Unit/TestFloodPublishing.cs

Lines changed: 94 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -38,52 +38,120 @@
3838
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
3939
//---------------------------------------------------------------------------
4040

41-
using System;
42-
4341
using NUnit.Framework;
44-
//using System.Timers;
42+
using RabbitMQ.Client.Events;
43+
using System;
44+
using System.Text;
45+
using System.Threading;
46+
using System.Threading.Tasks;
4547

4648
namespace RabbitMQ.Client.Unit
4749
{
4850
[TestFixture]
49-
public class TestFloodPublishing : IntegrationFixture
51+
public class TestFloodPublishing
5052
{
51-
[SetUp]
52-
public override void Init()
53+
private readonly byte[] _body = new byte[2048];
54+
private readonly TimeSpan _tenSeconds = TimeSpan.FromSeconds(10);
55+
56+
[Test]
57+
public void TestUnthrottledFloodPublishing()
5358
{
5459
var connFactory = new ConnectionFactory()
5560
{
5661
RequestedHeartbeat = TimeSpan.FromSeconds(60),
5762
AutomaticRecoveryEnabled = false
5863
};
59-
Conn = connFactory.CreateConnection();
60-
Model = Conn.CreateModel();
61-
}
6264

63-
[Test, Category("LongRunning")]
64-
public void TestUnthrottledFloodPublishing()
65-
{
66-
Conn.ConnectionShutdown += (_, args) =>
65+
using (var conn = connFactory.CreateConnection())
6766
{
68-
if (args.Initiator != ShutdownInitiator.Application)
67+
using (var model = conn.CreateModel())
6968
{
70-
Assert.Fail("Unexpected connection shutdown!");
69+
conn.ConnectionShutdown += (_, args) =>
70+
{
71+
if (args.Initiator != ShutdownInitiator.Application)
72+
{
73+
Assert.Fail("Unexpected connection shutdown!");
74+
}
75+
};
76+
77+
bool shouldStop = false;
78+
DateTime now = DateTime.Now;
79+
DateTime stopTime = DateTime.Now.Add(_tenSeconds);
80+
for (int i = 0; i < 65535*64; i++)
81+
{
82+
if (i % 65536 == 0)
83+
{
84+
now = DateTime.Now;
85+
shouldStop = DateTime.Now > stopTime;
86+
if (shouldStop)
87+
{
88+
break;
89+
}
90+
}
91+
model.BasicPublish("", "", null, _body);
92+
}
93+
Assert.IsTrue(conn.IsOpen);
7194
}
95+
}
96+
}
97+
98+
[Test]
99+
public void TestMultithreadFloodPublishing()
100+
{
101+
string testName = TestContext.CurrentContext.Test.FullName;
102+
string message = string.Format("Hello from test {0}", testName);
103+
byte[] sendBody = Encoding.UTF8.GetBytes(message);
104+
int publishCount = 4096;
105+
int receivedCount = 0;
106+
AutoResetEvent autoResetEvent = new AutoResetEvent(false);
107+
108+
var cf = new ConnectionFactory()
109+
{
110+
RequestedHeartbeat = TimeSpan.FromSeconds(60),
111+
AutomaticRecoveryEnabled = false
72112
};
73113

74-
bool elapsed = false;
75-
var t = new System.Threading.Timer((_obj) => { elapsed = true; }, null, 1000 * 185, -1);
76-
/*
77-
t.Elapsed += (_sender, _args) => { elapsed = true; };
78-
t.AutoReset = false;
79-
t.Start();
80-
*/
81-
while (!elapsed)
114+
using (IConnection c = cf.CreateConnection())
82115
{
83-
Model.BasicPublish("", "", null, new byte[2048]);
116+
string queueName = null;
117+
using (IModel m = c.CreateModel())
118+
{
119+
QueueDeclareOk q = m.QueueDeclare();
120+
queueName = q.QueueName;
121+
}
122+
123+
Task pub = Task.Run(() =>
124+
{
125+
using (IModel m = c.CreateModel())
126+
{
127+
IBasicProperties bp = m.CreateBasicProperties();
128+
for (int i = 0; i < publishCount; i++)
129+
{
130+
m.BasicPublish(string.Empty, queueName, bp, sendBody);
131+
}
132+
}
133+
});
134+
135+
using (IModel consumerModel = c.CreateModel())
136+
{
137+
var consumer = new EventingBasicConsumer(consumerModel);
138+
consumer.Received += (o, a) =>
139+
{
140+
string receivedMessage = Encoding.UTF8.GetString(a.Body.ToArray());
141+
Assert.AreEqual(message, receivedMessage);
142+
Interlocked.Increment(ref receivedCount);
143+
if (receivedCount == publishCount)
144+
{
145+
autoResetEvent.Set();
146+
}
147+
};
148+
consumerModel.BasicConsume(queueName, true, consumer);
149+
Assert.IsTrue(pub.Wait(_tenSeconds));
150+
Assert.IsTrue(autoResetEvent.WaitOne(_tenSeconds));
151+
}
152+
153+
Assert.AreEqual(publishCount, receivedCount);
84154
}
85-
Assert.IsTrue(Conn.IsOpen);
86-
t.Dispose();
87155
}
88156
}
89157
}

projects/Unit/TestRecoverAfterCancel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public void TestRecoverAfterCancel_()
9696
Channel.BasicConsume(Queue, false, Consumer2);
9797
BasicDeliverEventArgs Event2 = EventQueue2.Dequeue();
9898

99-
Assert.AreEqual(Event.Body, Event2.Body);
99+
CollectionAssert.AreEqual(Event.Body.ToArray(), Event2.Body.ToArray());
100100
Assert.IsFalse(Event.Redelivered);
101101
Assert.IsTrue(Event2.Redelivered);
102102
}

0 commit comments

Comments
 (0)