Skip to content

Commit f6847f9

Browse files
authored
Merge pull request #1638 from bollhals/AsyncConsumerOnly
use async consumer only
2 parents 5b45d25 + 00801a2 commit f6847f9

File tree

58 files changed

+325
-1518
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+325
-1518
lines changed

.ci/windows/versions.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
2-
"erlang": "26.2.5",
3-
"rabbitmq": "3.13.3"
2+
"erlang": "26.2.5.2",
3+
"rabbitmq": "3.13.6"
44
}

projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs

Lines changed: 6 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,10 @@
22
using System.Threading;
33
using System.Threading.Tasks;
44
using RabbitMQ.Client;
5-
using RabbitMQ.Client.Events;
65

76
namespace RabbitMQ.Benchmarks
87
{
9-
internal sealed class AsyncBasicConsumerFake : IAsyncBasicConsumer, IBasicConsumer
8+
internal sealed class AsyncBasicConsumerFake : IAsyncBasicConsumer
109
{
1110
private readonly ManualResetEventSlim _autoResetEvent;
1211
private int _current;
@@ -18,7 +17,7 @@ public AsyncBasicConsumerFake(ManualResetEventSlim autoResetEvent)
1817
_autoResetEvent = autoResetEvent;
1918
}
2019

21-
public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
20+
public Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
2221
IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
2322
{
2423
if (Interlocked.Increment(ref _current) == Count)
@@ -29,53 +28,14 @@ public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redel
2928
return Task.CompletedTask;
3029
}
3130

32-
Task IBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
33-
IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
34-
{
35-
if (Interlocked.Increment(ref _current) == Count)
36-
{
37-
_current = 0;
38-
_autoResetEvent.Set();
39-
}
40-
return Task.CompletedTask;
41-
}
31+
public Task HandleBasicCancelAsync(string consumerTag) => Task.CompletedTask;
4232

43-
public Task HandleBasicCancel(string consumerTag) => Task.CompletedTask;
33+
public Task HandleBasicCancelOkAsync(string consumerTag) => Task.CompletedTask;
4434

45-
public Task HandleBasicCancelOk(string consumerTag) => Task.CompletedTask;
35+
public Task HandleBasicConsumeOkAsync(string consumerTag) => Task.CompletedTask;
4636

47-
public Task HandleBasicConsumeOk(string consumerTag) => Task.CompletedTask;
48-
49-
public Task HandleChannelShutdown(object channel, ShutdownEventArgs reason) => Task.CompletedTask;
37+
public Task HandleChannelShutdownAsync(object channel, ShutdownEventArgs reason) => Task.CompletedTask;
5038

5139
public IChannel Channel { get; }
52-
53-
event EventHandler<ConsumerEventArgs> IBasicConsumer.ConsumerCancelled
54-
{
55-
add { }
56-
remove { }
57-
}
58-
59-
public event AsyncEventHandler<ConsumerEventArgs> ConsumerCancelled
60-
{
61-
add { }
62-
remove { }
63-
}
64-
65-
void IBasicConsumer.HandleBasicCancelOk(string consumerTag)
66-
{
67-
}
68-
69-
void IBasicConsumer.HandleBasicConsumeOk(string consumerTag)
70-
{
71-
}
72-
73-
void IBasicConsumer.HandleChannelShutdown(object channel, ShutdownEventArgs reason)
74-
{
75-
}
76-
77-
void IBasicConsumer.HandleBasicCancel(string consumerTag)
78-
{
79-
}
8040
}
8141
}

projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -52,30 +52,7 @@ public async Task AsyncConsumerDispatcher()
5252
{
5353
for (int i = 0; i < Count; i++)
5454
{
55-
await _dispatcher.HandleBasicDeliverAsync(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, body,
56-
CancellationToken.None);
57-
}
58-
_autoResetEvent.Wait();
59-
_autoResetEvent.Reset();
60-
}
61-
}
62-
63-
[GlobalSetup(Target = nameof(ConsumerDispatcher))]
64-
public async Task SetUpConsumer()
65-
{
66-
_consumer.Count = Count;
67-
_dispatcher = new ConsumerDispatcher(null, Concurrency);
68-
await _dispatcher.HandleBasicConsumeOkAsync(_consumer, _consumerTag, CancellationToken.None);
69-
}
70-
71-
[Benchmark]
72-
public async Task ConsumerDispatcher()
73-
{
74-
using (RentedMemory body = new RentedMemory(_body))
75-
{
76-
for (int i = 0; i < Count; i++)
77-
{
78-
await _dispatcher.HandleBasicDeliverAsync(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, body,
55+
await _dispatcher.HandleBasicDeliverAsync(_consumerTag, _deliveryTag, false, _exchange, _routingKey, default, body,
7956
CancellationToken.None);
8057
}
8158
_autoResetEvent.Wait();
Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
using System.Threading;
1+
using System;
2+
using System.Threading;
23
using System.Threading.Tasks;
34
using BenchmarkDotNet.Attributes;
45
using RabbitMQ.Client;
5-
using RabbitMQ.Client.Events;
66

77
namespace Benchmarks.Networking
88
{
@@ -11,29 +11,45 @@ public class Networking_BasicDeliver_Commons
1111
{
1212
public static async Task Publish_Hello_World(IConnection connection, uint messageCount, byte[] body)
1313
{
14-
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
1514
using (IChannel channel = await connection.CreateChannelAsync())
1615
{
1716
QueueDeclareOk queue = await channel.QueueDeclareAsync();
18-
int consumed = 0;
19-
var consumer = new EventingBasicConsumer(channel);
20-
consumer.Received += (s, args) =>
21-
{
22-
if (Interlocked.Increment(ref consumed) == messageCount)
23-
{
24-
tcs.SetResult(true);
25-
}
26-
};
17+
var consumer = new CountingConsumer(messageCount);
2718
await channel.BasicConsumeAsync(queue.QueueName, true, consumer);
2819

2920
for (int i = 0; i < messageCount; i++)
3021
{
3122
await channel.BasicPublishAsync("", queue.QueueName, body);
3223
}
3324

34-
await tcs.Task;
25+
await consumer.CompletedTask.ConfigureAwait(false);
3526
await channel.CloseAsync();
3627
}
3728
}
3829
}
30+
31+
internal sealed class CountingConsumer : AsyncDefaultBasicConsumer
32+
{
33+
private int _remainingCount;
34+
private readonly TaskCompletionSource<bool> _tcs;
35+
36+
public Task CompletedTask => _tcs.Task;
37+
38+
public CountingConsumer(uint messageCount)
39+
{
40+
_remainingCount = (int)messageCount;
41+
_tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
42+
}
43+
44+
/// <inheritdoc />
45+
public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
46+
{
47+
if (Interlocked.Decrement(ref _remainingCount) == 0)
48+
{
49+
_tcs.SetResult(true);
50+
}
51+
52+
return Task.CompletedTask;
53+
}
54+
}
3955
}

0 commit comments

Comments
 (0)