Skip to content

Commit 1504215

Browse files
committed
* Change consumer dispatch concurrency value to a ushort
* Create `CreateChannelAsync` extension to use `DefaultConsumerDispatchConcurrency`
1 parent e3ced40 commit 1504215

File tree

8 files changed

+23
-10
lines changed

8 files changed

+23
-10
lines changed

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ RabbitMQ.Client.ConnectionFactory.ClientProperties.set -> void
189189
RabbitMQ.Client.ConnectionFactory.ClientProvidedName.get -> string
190190
RabbitMQ.Client.ConnectionFactory.ClientProvidedName.set -> void
191191
RabbitMQ.Client.ConnectionFactory.ConnectionFactory() -> void
192-
RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.get -> int
192+
RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.get -> ushort
193193
RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.set -> void
194194
RabbitMQ.Client.ConnectionFactory.ContinuationTimeout.get -> System.TimeSpan
195195
RabbitMQ.Client.ConnectionFactory.ContinuationTimeout.set -> void
@@ -472,7 +472,7 @@ RabbitMQ.Client.IConnectionFactory.ClientProperties.get -> System.Collections.Ge
472472
RabbitMQ.Client.IConnectionFactory.ClientProperties.set -> void
473473
RabbitMQ.Client.IConnectionFactory.ClientProvidedName.get -> string
474474
RabbitMQ.Client.IConnectionFactory.ClientProvidedName.set -> void
475-
RabbitMQ.Client.IConnectionFactory.ConsumerDispatchConcurrency.get -> int
475+
RabbitMQ.Client.IConnectionFactory.ConsumerDispatchConcurrency.get -> ushort
476476
RabbitMQ.Client.IConnectionFactory.ConsumerDispatchConcurrency.set -> void
477477
RabbitMQ.Client.IConnectionFactory.ContinuationTimeout.get -> System.TimeSpan
478478
RabbitMQ.Client.IConnectionFactory.ContinuationTimeout.set -> void
@@ -826,7 +826,6 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
826826
~RabbitMQ.Client.IChannel.WaitForConfirmsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<bool>
827827
~RabbitMQ.Client.IChannel.WaitForConfirmsOrDieAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
828828
~RabbitMQ.Client.IConnection.CloseAsync(ushort reasonCode, string reasonText, System.TimeSpan timeout, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
829-
~RabbitMQ.Client.IConnection.CreateChannelAsync(int? dispatchConsumerConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel>
830829
~RabbitMQ.Client.IConnection.UpdateSecretAsync(string newSecret, string reason, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
831830
~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>
832831
~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(System.Collections.Generic.IEnumerable<RabbitMQ.Client.AmqpTcpEndpoint> endpoints, string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>
@@ -894,3 +893,6 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client
894893
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
895894
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
896895
RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
896+
const RabbitMQ.Client.ConnectionFactory.DefaultConsumerDispatchConcurrency = 1 -> ushort
897+
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort consumerDispatchConcurrency, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
898+
static RabbitMQ.Client.IConnectionExtensions.CreateChannelAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!

projects/RabbitMQ.Client/client/api/ConnectionConfig.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
150150
ushort maxChannelCount, uint maxFrameSize, uint maxInboundMessageBodySize, bool topologyRecoveryEnabled,
151151
TopologyRecoveryFilter topologyRecoveryFilter, TopologyRecoveryExceptionHandler topologyRecoveryExceptionHandler,
152152
TimeSpan networkRecoveryInterval, TimeSpan heartbeatInterval, TimeSpan continuationTimeout, TimeSpan handshakeContinuationTimeout, TimeSpan requestedConnectionTimeout,
153-
int dispatchConsumerConcurrency, Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync)
153+
ushort dispatchConsumerConcurrency, Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync)
154154
{
155155
VirtualHost = virtualHost;
156156
UserName = userName;

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ namespace RabbitMQ.Client
9292
///hosts with an empty name are not addressable. </para></remarks>
9393
public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
9494
{
95+
/// <summary>
96+
/// Default value for consumer dispatch concurrency.
97+
/// </summary>
98+
public const ushort DefaultConsumerDispatchConcurrency = 1;
99+
95100
/// <summary>
96101
/// Default value for the desired maximum channel number. Default: 2047.
97102
/// </summary>
@@ -175,7 +180,7 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
175180
/// </summary>
176181
/// <remarks>For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
177182
/// In addition to that consumers need to be thread/concurrency safe.</remarks>
178-
public int ConsumerDispatchConcurrency { get; set; } = 1;
183+
public ushort ConsumerDispatchConcurrency { get; set; } = DefaultConsumerDispatchConcurrency;
179184

180185
/// <summary>The host to connect to.</summary>
181186
public string HostName { get; set; } = "localhost";

projects/RabbitMQ.Client/client/api/IConnection.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
234234
/// <summary>
235235
/// Asynchronously create and return a fresh channel, session, and channel.
236236
/// </summary>
237-
/// <param name="dispatchConsumerConcurrency">
237+
/// <param name="consumerDispatchConcurrency">
238238
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
239239
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
240240
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
@@ -245,6 +245,6 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
245245
/// In addition to that consumers need to be thread/concurrency safe.
246246
/// </param>
247247
/// <param name="cancellationToken">Cancellation token</param>
248-
Task<IChannel> CreateChannelAsync(int? dispatchConsumerConcurrency = null, CancellationToken cancellationToken = default);
248+
Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default);
249249
}
250250
}

projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ namespace RabbitMQ.Client
77
{
88
public static class IConnectionExtensions
99
{
10+
/// <summary>
11+
/// Asynchronously create and return a fresh channel, session, and channel.
12+
/// </summary>
13+
public static Task<IChannel> CreateChannelAsync(this IConnection connection, CancellationToken cancellationToken = default) =>
14+
connection.CreateChannelAsync(ConnectionFactory.DefaultConsumerDispatchConcurrency, cancellationToken);
15+
1016
/// <summary>
1117
/// Asynchronously close this connection and all its channels.
1218
/// </summary>

projects/RabbitMQ.Client/client/api/IConnectionFactory.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,6 @@ Task<IConnection> CreateConnectionAsync(IEnumerable<AmqpTcpEndpoint> endpoints,
194194
/// </summary>
195195
/// <remarks>For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
196196
/// In addition to that consumers need to be thread/concurrency safe.</remarks>
197-
int ConsumerDispatchConcurrency { get; set; }
197+
ushort ConsumerDispatchConcurrency { get; set; }
198198
}
199199
}

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ await CloseInnerConnectionAsync()
239239
}
240240
}
241241

242-
public async Task<IChannel> CreateChannelAsync(int? dispatchConsumerConcurrency = null, CancellationToken cancellationToken = default)
242+
public async Task<IChannel> CreateChannelAsync(ushort dispatchConsumerConcurrency, CancellationToken cancellationToken = default)
243243
{
244244
EnsureIsOpen();
245245
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(dispatchConsumerConcurrency, cancellationToken)

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ await CloseAsync(ea, true,
253253
}
254254
}
255255

256-
public Task<IChannel> CreateChannelAsync(int? dispatchConsumerConcurrency = null, CancellationToken cancellationToken = default)
256+
public Task<IChannel> CreateChannelAsync(ushort dispatchConsumerConcurrency, CancellationToken cancellationToken = default)
257257
{
258258
EnsureIsOpen();
259259
ISession session = CreateSession();

0 commit comments

Comments
 (0)