diff --git a/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs b/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs index 7fc75ffa0..1871c02c0 100644 --- a/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs +++ b/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs @@ -35,7 +35,7 @@ public class BasicDeliverConsumerDispatching : ConsumerDispatcherBase public int Count { get; set; } [Params(1, 2)] - public int Concurrency { get; set; } + public ushort Concurrency { get; set; } [GlobalSetup(Target = nameof(AsyncConsumerDispatcher))] public async Task SetUpAsyncConsumer() diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt index ee07572f5..6af99e6f9 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt @@ -189,7 +189,7 @@ RabbitMQ.Client.ConnectionFactory.ClientProperties.set -> void RabbitMQ.Client.ConnectionFactory.ClientProvidedName.get -> string RabbitMQ.Client.ConnectionFactory.ClientProvidedName.set -> void RabbitMQ.Client.ConnectionFactory.ConnectionFactory() -> void -RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.get -> int +RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.get -> ushort RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.set -> void RabbitMQ.Client.ConnectionFactory.ContinuationTimeout.get -> System.TimeSpan RabbitMQ.Client.ConnectionFactory.ContinuationTimeout.set -> void @@ -472,7 +472,7 @@ RabbitMQ.Client.IConnectionFactory.ClientProperties.get -> System.Collections.Ge RabbitMQ.Client.IConnectionFactory.ClientProperties.set -> void RabbitMQ.Client.IConnectionFactory.ClientProvidedName.get -> string RabbitMQ.Client.IConnectionFactory.ClientProvidedName.set -> void -RabbitMQ.Client.IConnectionFactory.ConsumerDispatchConcurrency.get -> int +RabbitMQ.Client.IConnectionFactory.ConsumerDispatchConcurrency.get -> ushort RabbitMQ.Client.IConnectionFactory.ConsumerDispatchConcurrency.set -> void RabbitMQ.Client.IConnectionFactory.ContinuationTimeout.get -> System.TimeSpan RabbitMQ.Client.IConnectionFactory.ContinuationTimeout.set -> void @@ -700,7 +700,6 @@ readonly RabbitMQ.Client.ConnectionConfig.AuthMechanisms -> System.Collections.G readonly RabbitMQ.Client.ConnectionConfig.ClientProperties -> System.Collections.Generic.IDictionary readonly RabbitMQ.Client.ConnectionConfig.ClientProvidedName -> string readonly RabbitMQ.Client.ConnectionConfig.ContinuationTimeout -> System.TimeSpan -readonly RabbitMQ.Client.ConnectionConfig.DispatchConsumerConcurrency -> int readonly RabbitMQ.Client.ConnectionConfig.HandshakeContinuationTimeout -> System.TimeSpan readonly RabbitMQ.Client.ConnectionConfig.HeartbeatInterval -> System.TimeSpan readonly RabbitMQ.Client.ConnectionConfig.MaxChannelCount -> ushort @@ -826,7 +825,6 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void ~RabbitMQ.Client.IChannel.WaitForConfirmsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.IChannel.WaitForConfirmsOrDieAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.IConnection.CloseAsync(ushort reasonCode, string reasonText, System.TimeSpan timeout, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task -~RabbitMQ.Client.IConnection.CreateChannelAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.IConnection.UpdateSecretAsync(string newSecret, string reason, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(System.Collections.Generic.IEnumerable endpoints, string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task @@ -894,3 +892,7 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +const RabbitMQ.Client.ConnectionFactory.DefaultConsumerDispatchConcurrency = 1 -> ushort +RabbitMQ.Client.IConnection.CreateChannelAsync(ushort consumerDispatchConcurrency, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +static RabbitMQ.Client.IConnectionExtensions.CreateChannelAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort diff --git a/projects/RabbitMQ.Client/client/api/ConnectionConfig.cs b/projects/RabbitMQ.Client/client/api/ConnectionConfig.cs index ab0a0d766..6bce2d44b 100644 --- a/projects/RabbitMQ.Client/client/api/ConnectionConfig.cs +++ b/projects/RabbitMQ.Client/client/api/ConnectionConfig.cs @@ -139,7 +139,7 @@ public sealed class ConnectionConfig /// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading. /// can handle concurrency much more efficiently due to the non-blocking nature of the consumer. /// - public readonly int DispatchConsumerConcurrency; + public readonly ushort ConsumerDispatchConcurrency; internal readonly Func> FrameHandlerFactoryAsync; @@ -150,7 +150,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password, ushort maxChannelCount, uint maxFrameSize, uint maxInboundMessageBodySize, bool topologyRecoveryEnabled, TopologyRecoveryFilter topologyRecoveryFilter, TopologyRecoveryExceptionHandler topologyRecoveryExceptionHandler, TimeSpan networkRecoveryInterval, TimeSpan heartbeatInterval, TimeSpan continuationTimeout, TimeSpan handshakeContinuationTimeout, TimeSpan requestedConnectionTimeout, - int dispatchConsumerConcurrency, Func> frameHandlerFactoryAsync) + ushort consumerDispatchConcurrency, Func> frameHandlerFactoryAsync) { VirtualHost = virtualHost; UserName = userName; @@ -170,7 +170,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password, ContinuationTimeout = continuationTimeout; HandshakeContinuationTimeout = handshakeContinuationTimeout; RequestedConnectionTimeout = requestedConnectionTimeout; - DispatchConsumerConcurrency = dispatchConsumerConcurrency; + ConsumerDispatchConcurrency = consumerDispatchConcurrency; FrameHandlerFactoryAsync = frameHandlerFactoryAsync; } } diff --git a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs index cbcc5760d..5bb86ccb4 100644 --- a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs +++ b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs @@ -92,6 +92,11 @@ namespace RabbitMQ.Client ///hosts with an empty name are not addressable. public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory { + /// + /// Default value for consumer dispatch concurrency. + /// + public const ushort DefaultConsumerDispatchConcurrency = 1; + /// /// Default value for the desired maximum channel number. Default: 2047. /// @@ -175,7 +180,7 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor /// /// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them. /// In addition to that consumers need to be thread/concurrency safe. - public int ConsumerDispatchConcurrency { get; set; } = 1; + public ushort ConsumerDispatchConcurrency { get; set; } = DefaultConsumerDispatchConcurrency; /// The host to connect to. public string HostName { get; set; } = "localhost"; diff --git a/projects/RabbitMQ.Client/client/api/IConnection.cs b/projects/RabbitMQ.Client/client/api/IConnection.cs index ba2632dda..aaad772df 100644 --- a/projects/RabbitMQ.Client/client/api/IConnection.cs +++ b/projects/RabbitMQ.Client/client/api/IConnection.cs @@ -234,8 +234,17 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo /// /// Asynchronously create and return a fresh channel, session, and channel. /// + /// + /// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one + /// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading. + /// can handle concurrency much more efficiently due to the non-blocking nature of the consumer. + /// + /// Defaults to . + /// + /// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them. + /// In addition to that consumers need to be thread/concurrency safe. + /// /// Cancellation token - Task CreateChannelAsync(CancellationToken cancellationToken = default); - + Task CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default); } } diff --git a/projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs b/projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs index 33b9bc64c..ead665a03 100644 --- a/projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs +++ b/projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs @@ -7,6 +7,12 @@ namespace RabbitMQ.Client { public static class IConnectionExtensions { + /// + /// Asynchronously create and return a fresh channel, session, and channel. + /// + public static Task CreateChannelAsync(this IConnection connection, CancellationToken cancellationToken = default) => + connection.CreateChannelAsync(ConnectionFactory.DefaultConsumerDispatchConcurrency, cancellationToken); + /// /// Asynchronously close this connection and all its channels. /// diff --git a/projects/RabbitMQ.Client/client/api/IConnectionFactory.cs b/projects/RabbitMQ.Client/client/api/IConnectionFactory.cs index c6b37ec3e..f66d8c522 100644 --- a/projects/RabbitMQ.Client/client/api/IConnectionFactory.cs +++ b/projects/RabbitMQ.Client/client/api/IConnectionFactory.cs @@ -194,6 +194,6 @@ Task CreateConnectionAsync(IEnumerable endpoints, /// /// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them. /// In addition to that consumers need to be thread/concurrency safe. - int ConsumerDispatchConcurrency { get; set; } + ushort ConsumerDispatchConcurrency { get; set; } } } diff --git a/projects/RabbitMQ.Client/client/framing/Channel.cs b/projects/RabbitMQ.Client/client/framing/Channel.cs index a54422ae2..67aca8f69 100644 --- a/projects/RabbitMQ.Client/client/framing/Channel.cs +++ b/projects/RabbitMQ.Client/client/framing/Channel.cs @@ -38,7 +38,8 @@ namespace RabbitMQ.Client.Framing.Impl { internal class Channel : ChannelBase { - public Channel(ConnectionConfig config, ISession session) : base(config, session) + public Channel(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency) + : base(config, session, consumerDispatchConcurrency) { } diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index 8d996c082..b786a288b 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -52,6 +52,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable private bool _usesPublisherConfirms; private bool _tracksPublisherConfirmations; private bool _usesTransactions; + private ushort _consumerDispatchConcurrency; internal IConsumerDispatcher ConsumerDispatcher => InnerChannel.ConsumerDispatcher; @@ -70,10 +71,12 @@ public TimeSpan ContinuationTimeout set => InnerChannel.ContinuationTimeout = value; } - public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel innerChannel) + public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel innerChannel, + ushort consumerDispatchConcurrency) { _connection = conn; _innerChannel = innerChannel; + _consumerDispatchConcurrency = consumerDispatchConcurrency; } public event EventHandler BasicAcks @@ -160,7 +163,8 @@ internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection con _connection = conn; - RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(cancellationToken) + RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(_consumerDispatchConcurrency, + cancellationToken: cancellationToken) .ConfigureAwait(false); newChannel.TakeOver(_innerChannel); diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs index 3dcf20d56..5d504ebf4 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs @@ -295,7 +295,7 @@ private async ValueTask RecoverExchangesAsync(IConnection connection, { try { - using (IChannel ch = await connection.CreateChannelAsync(cancellationToken).ConfigureAwait(false)) + using (IChannel ch = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false)) { await recordedExchange.RecoverAsync(ch, cancellationToken) .ConfigureAwait(false); @@ -347,7 +347,7 @@ private async Task RecoverQueuesAsync(IConnection connection, try { string newName = string.Empty; - using (IChannel ch = await connection.CreateChannelAsync(cancellationToken).ConfigureAwait(false)) + using (IChannel ch = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false)) { newName = await recordedQueue.RecoverAsync(ch, cancellationToken) .ConfigureAwait(false); @@ -458,7 +458,7 @@ private async ValueTask RecoverBindingsAsync(IConnection connection, { try { - using (IChannel ch = await connection.CreateChannelAsync(cancellationToken).ConfigureAwait(false)) + using (IChannel ch = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false)) { await binding.RecoverAsync(ch, cancellationToken) .ConfigureAwait(false); diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs index f4b89fc6c..1d398b9e8 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs @@ -173,10 +173,11 @@ public event EventHandler RecoveringConsumer public IProtocol Protocol => Endpoint.Protocol; - public async ValueTask CreateNonRecoveringChannelAsync(CancellationToken cancellationToken) + public async ValueTask CreateNonRecoveringChannelAsync(ushort consumerDispatchConcurrency, + CancellationToken cancellationToken = default) { ISession session = InnerConnection.CreateSession(); - var result = new RecoveryAwareChannel(_config, session); + var result = new RecoveryAwareChannel(_config, session, consumerDispatchConcurrency); return (RecoveryAwareChannel)await result.OpenAsync(cancellationToken).ConfigureAwait(false); } @@ -239,12 +240,12 @@ await CloseInnerConnectionAsync() } } - public async Task CreateChannelAsync(CancellationToken cancellationToken = default) + public async Task CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default) { EnsureIsOpen(); - RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(cancellationToken) + RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(consumerDispatchConcurrency, cancellationToken) .ConfigureAwait(false); - AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel); + AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, consumerDispatchConcurrency); await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken) .ConfigureAwait(false); return channel; diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index caaba9b79..28d606534 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -73,10 +73,10 @@ internal abstract class ChannelBase : IChannel, IRecoverable internal readonly IConsumerDispatcher ConsumerDispatcher; - protected ChannelBase(ConnectionConfig config, ISession session) + protected ChannelBase(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency) { ContinuationTimeout = config.ContinuationTimeout; - ConsumerDispatcher = new AsyncConsumerDispatcher(this, config.DispatchConsumerConcurrency); + ConsumerDispatcher = new AsyncConsumerDispatcher(this, consumerDispatchConcurrency); Action onException = (exception, context) => OnCallbackException(CallbackExceptionEventArgs.Build(exception, context)); _basicAcksWrapper = new EventingWrapper("OnBasicAck", onException); diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index ad0fce16a..09369bb07 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -72,7 +72,7 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler) _sessionManager = new SessionManager(this, 0, config.MaxInboundMessageBodySize); _session0 = new MainSession(this, config.MaxInboundMessageBodySize); - _channel0 = new Channel(_config, _session0); ; + _channel0 = new Channel(_config, _session0, ConnectionFactory.DefaultConsumerDispatchConcurrency); ; ClientProperties = new Dictionary(_config.ClientProperties) { @@ -253,11 +253,11 @@ await CloseAsync(ea, true, } } - public Task CreateChannelAsync(CancellationToken cancellationToken = default) + public Task CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default) { EnsureIsOpen(); ISession session = CreateSession(); - var channel = new Channel(_config, session); + var channel = new Channel(_config, session, consumerDispatchConcurrency); return channel.OpenAsync(cancellationToken); } diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs index 890b548a7..1949a5733 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs @@ -8,7 +8,7 @@ namespace RabbitMQ.Client.ConsumerDispatching { internal sealed class AsyncConsumerDispatcher : ConsumerDispatcherChannelBase { - internal AsyncConsumerDispatcher(ChannelBase channel, int concurrency) + internal AsyncConsumerDispatcher(ChannelBase channel, ushort concurrency) : base(channel, concurrency) { } diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs index 217f80d82..0afe13151 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs @@ -17,7 +17,7 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase, private bool _quiesce = false; private bool _disposed; - internal ConsumerDispatcherChannelBase(ChannelBase channel, int concurrency) + internal ConsumerDispatcherChannelBase(ChannelBase channel, ushort concurrency) { _channel = channel; var workChannel = Channel.CreateUnbounded(new UnboundedChannelOptions diff --git a/projects/RabbitMQ.Client/client/impl/RecoveryAwareChannel.cs b/projects/RabbitMQ.Client/client/impl/RecoveryAwareChannel.cs index 6eb456ffb..d4acbccec 100644 --- a/projects/RabbitMQ.Client/client/impl/RecoveryAwareChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/RecoveryAwareChannel.cs @@ -37,7 +37,8 @@ namespace RabbitMQ.Client.Impl { internal sealed class RecoveryAwareChannel : Channel { - public RecoveryAwareChannel(ConnectionConfig config, ISession session) : base(config, session) + public RecoveryAwareChannel(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency) + : base(config, session, consumerDispatchConcurrency) { ActiveDeliveryTagOffset = 0; MaxSeenDeliveryTag = 0;