Skip to content

Commit 4b6e7d5

Browse files
committed
Follow-up to #1669 - per-channel dispatch concurrency
PR #1669 by @danielmarbach adds the ability to configure consumer dispatch on a per-channel basis. * Test that consumer dispatch concurrency is set on the dispatcher.
1 parent 624cf2e commit 4b6e7d5

20 files changed

+110
-57
lines changed

projects/Benchmarks/Networking/Networking_BasicDeliver_Commons.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ namespace Benchmarks.Networking
99
[MemoryDiagnoser]
1010
public class Networking_BasicDeliver_Commons
1111
{
12-
public static async Task Publish_Hello_World(IConnection connection, uint messageCount, byte[] body)
12+
public static async Task Publish_Hello_World(IConnection connection,
13+
uint messageCount, byte[] body, ushort consumerDispatchConcurrency = 1)
1314
{
14-
using (IChannel channel = await connection.CreateChannelAsync())
15+
using (IChannel channel = await connection.CreateChannelAsync(consumerDispatchConcurrency))
1516
{
1617
QueueDeclareOk queue = await channel.QueueDeclareAsync();
1718
var consumer = new CountingConsumer(channel, messageCount);

projects/Benchmarks/Networking/Networking_BasicDeliver_ConnectionChurn.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,17 @@ public void GlobalCleanup()
2929
[Benchmark(Baseline = true)]
3030
public async Task Publish_Hello_World()
3131
{
32-
var cf = new ConnectionFactory { ConsumerDispatchConcurrency = 2 };
32+
var cf = new ConnectionFactory();
3333
using (IConnection connection = await cf.CreateConnectionAsync())
3434
{
35-
await Publish_Hello_World(connection);
35+
await Publish_Hello_World(connection, 2);
3636
}
3737
}
3838

39-
public static async Task Publish_Hello_World(IConnection connection)
39+
public static async Task Publish_Hello_World(IConnection connection, ushort consumerDispatchConcurrency)
4040
{
41-
await Networking_BasicDeliver_Commons.Publish_Hello_World(connection, messageCount, _body);
41+
await Networking_BasicDeliver_Commons.Publish_Hello_World(connection, messageCount, _body,
42+
consumerDispatchConcurrency);
4243
}
4344
}
4445
}

projects/Benchmarks/Networking/Networking_BasicDeliver_LongLivedConnection.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ public class Networking_BasicDeliver_LongLivedConnection
1919
public void GlobalSetup()
2020
{
2121
_container = RabbitMQBroker.Start();
22-
23-
var cf = new ConnectionFactory { ConsumerDispatchConcurrency = 2 };
22+
var cf = new ConnectionFactory();
2423
// NOTE: https://github.com/dotnet/BenchmarkDotNet/issues/1738
2524
_connection = EnsureCompleted(cf.CreateConnectionAsync());
2625
}
@@ -35,7 +34,8 @@ public void GlobalCleanup()
3534
[Benchmark(Baseline = true)]
3635
public Task Publish_Hello_World()
3736
{
38-
return Networking_BasicDeliver_Commons.Publish_Hello_World(_connection, messageCount, _body);
37+
return Networking_BasicDeliver_Commons.Publish_Hello_World(_connection, messageCount, _body,
38+
consumerDispatchConcurrency: 2);
3939
}
4040

4141
private static T EnsureCompleted<T>(Task<T> task) => task.GetAwaiter().GetResult();

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,6 @@ RabbitMQ.Client.ConnectionFactory.ClientProperties.set -> void
189189
RabbitMQ.Client.ConnectionFactory.ClientProvidedName.get -> string
190190
RabbitMQ.Client.ConnectionFactory.ClientProvidedName.set -> void
191191
RabbitMQ.Client.ConnectionFactory.ConnectionFactory() -> void
192-
RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.get -> ushort
193-
RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.set -> void
194192
RabbitMQ.Client.ConnectionFactory.ContinuationTimeout.get -> System.TimeSpan
195193
RabbitMQ.Client.ConnectionFactory.ContinuationTimeout.set -> void
196194
RabbitMQ.Client.ConnectionFactory.Endpoint.get -> RabbitMQ.Client.AmqpTcpEndpoint
@@ -472,8 +470,6 @@ RabbitMQ.Client.IConnectionFactory.ClientProperties.get -> System.Collections.Ge
472470
RabbitMQ.Client.IConnectionFactory.ClientProperties.set -> void
473471
RabbitMQ.Client.IConnectionFactory.ClientProvidedName.get -> string
474472
RabbitMQ.Client.IConnectionFactory.ClientProvidedName.set -> void
475-
RabbitMQ.Client.IConnectionFactory.ConsumerDispatchConcurrency.get -> ushort
476-
RabbitMQ.Client.IConnectionFactory.ConsumerDispatchConcurrency.set -> void
477473
RabbitMQ.Client.IConnectionFactory.ContinuationTimeout.get -> System.TimeSpan
478474
RabbitMQ.Client.IConnectionFactory.ContinuationTimeout.set -> void
479475
RabbitMQ.Client.IConnectionFactory.HandshakeContinuationTimeout.get -> System.TimeSpan
@@ -892,7 +888,4 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client
892888
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
893889
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
894890
RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
895-
const RabbitMQ.Client.ConnectionFactory.DefaultConsumerDispatchConcurrency = 1 -> ushort
896-
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort consumerDispatchConcurrency, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
897-
static RabbitMQ.Client.IConnectionExtensions.CreateChannelAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
898-
readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort
891+
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort consumerDispatchConcurrency = 1, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
const RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency = 1 -> ushort
2+
RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.get -> ushort
3+
RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.set -> void
4+
RabbitMQ.Client.IConnectionFactory.ConsumerDispatchConcurrency.get -> ushort
5+
RabbitMQ.Client.IConnectionFactory.ConsumerDispatchConcurrency.set -> void
6+
readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort

projects/RabbitMQ.Client/client/api/ConnectionConfig.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
150150
ushort maxChannelCount, uint maxFrameSize, uint maxInboundMessageBodySize, bool topologyRecoveryEnabled,
151151
TopologyRecoveryFilter topologyRecoveryFilter, TopologyRecoveryExceptionHandler topologyRecoveryExceptionHandler,
152152
TimeSpan networkRecoveryInterval, TimeSpan heartbeatInterval, TimeSpan continuationTimeout, TimeSpan handshakeContinuationTimeout, TimeSpan requestedConnectionTimeout,
153-
ushort consumerDispatchConcurrency, Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync)
153+
ushort consumerDispatchConcurrency,
154+
Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync)
154155
{
155156
VirtualHost = virtualHost;
156157
UserName = userName;

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,6 @@ namespace RabbitMQ.Client
9292
///hosts with an empty name are not addressable. </para></remarks>
9393
public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
9494
{
95-
/// <summary>
96-
/// Default value for consumer dispatch concurrency.
97-
/// </summary>
98-
public const ushort DefaultConsumerDispatchConcurrency = 1;
99-
10095
/// <summary>
10196
/// Default value for the desired maximum channel number. Default: 2047.
10297
/// </summary>
@@ -180,7 +175,8 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
180175
/// </summary>
181176
/// <remarks>For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
182177
/// In addition to that consumers need to be thread/concurrency safe.</remarks>
183-
public ushort ConsumerDispatchConcurrency { get; set; } = DefaultConsumerDispatchConcurrency;
178+
public ushort ConsumerDispatchConcurrency { get; set; } = Constants.DefaultConsumerDispatchConcurrency;
179+
184180

185181
/// <summary>The host to connect to.</summary>
186182
public string HostName { get; set; } = "localhost";

projects/RabbitMQ.Client/client/api/IConnection.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,12 +239,13 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
239239
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
240240
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
241241
///
242-
/// Defaults to <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>.
242+
/// The default value is 1.
243243
///
244244
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
245245
/// In addition to that consumers need to be thread/concurrency safe.
246246
/// </param>
247247
/// <param name="cancellationToken">Cancellation token</param>
248-
Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default);
248+
Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency,
249+
CancellationToken cancellationToken = default);
249250
}
250251
}

projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,6 @@ namespace RabbitMQ.Client
77
{
88
public static class IConnectionExtensions
99
{
10-
/// <summary>
11-
/// Asynchronously create and return a fresh channel, session, and channel.
12-
/// </summary>
13-
public static Task<IChannel> CreateChannelAsync(this IConnection connection, CancellationToken cancellationToken = default) =>
14-
connection.CreateChannelAsync(ConnectionFactory.DefaultConsumerDispatchConcurrency, cancellationToken);
15-
1610
/// <summary>
1711
/// Asynchronously close this connection and all its channels.
1812
/// </summary>

projects/RabbitMQ.Client/client/framing/Channel.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ namespace RabbitMQ.Client.Framing.Impl
3838
{
3939
internal class Channel : ChannelBase
4040
{
41-
public Channel(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
41+
public Channel(ConnectionConfig config, ISession session,
42+
ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency)
4243
: base(config, session, consumerDispatchConcurrency)
4344
{
4445
}

0 commit comments

Comments
 (0)