Skip to content

Commit 8d02a94

Browse files
committed
* Standardize on ConsumerDispatchConcurrency name
1 parent 1504215 commit 8d02a94

File tree

11 files changed

+28
-21
lines changed

11 files changed

+28
-21
lines changed

projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class BasicDeliverConsumerDispatching : ConsumerDispatcherBase
3535
public int Count { get; set; }
3636

3737
[Params(1, 2)]
38-
public int Concurrency { get; set; }
38+
public ushort Concurrency { get; set; }
3939

4040
[GlobalSetup(Target = nameof(AsyncConsumerDispatcher))]
4141
public async Task SetUpAsyncConsumer()

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -700,7 +700,6 @@ readonly RabbitMQ.Client.ConnectionConfig.AuthMechanisms -> System.Collections.G
700700
readonly RabbitMQ.Client.ConnectionConfig.ClientProperties -> System.Collections.Generic.IDictionary<string, object>
701701
readonly RabbitMQ.Client.ConnectionConfig.ClientProvidedName -> string
702702
readonly RabbitMQ.Client.ConnectionConfig.ContinuationTimeout -> System.TimeSpan
703-
readonly RabbitMQ.Client.ConnectionConfig.DispatchConsumerConcurrency -> int
704703
readonly RabbitMQ.Client.ConnectionConfig.HandshakeContinuationTimeout -> System.TimeSpan
705704
readonly RabbitMQ.Client.ConnectionConfig.HeartbeatInterval -> System.TimeSpan
706705
readonly RabbitMQ.Client.ConnectionConfig.MaxChannelCount -> ushort
@@ -896,3 +895,4 @@ RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, Syst
896895
const RabbitMQ.Client.ConnectionFactory.DefaultConsumerDispatchConcurrency = 1 -> ushort
897896
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort consumerDispatchConcurrency, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
898897
static RabbitMQ.Client.IConnectionExtensions.CreateChannelAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
898+
readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort

projects/RabbitMQ.Client/client/api/ConnectionConfig.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public sealed class ConnectionConfig
139139
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
140140
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
141141
/// </summary>
142-
public readonly int DispatchConsumerConcurrency;
142+
public readonly ushort ConsumerDispatchConcurrency;
143143

144144
internal readonly Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> FrameHandlerFactoryAsync;
145145

@@ -150,7 +150,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
150150
ushort maxChannelCount, uint maxFrameSize, uint maxInboundMessageBodySize, bool topologyRecoveryEnabled,
151151
TopologyRecoveryFilter topologyRecoveryFilter, TopologyRecoveryExceptionHandler topologyRecoveryExceptionHandler,
152152
TimeSpan networkRecoveryInterval, TimeSpan heartbeatInterval, TimeSpan continuationTimeout, TimeSpan handshakeContinuationTimeout, TimeSpan requestedConnectionTimeout,
153-
ushort dispatchConsumerConcurrency, Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync)
153+
ushort consumerDispatchConcurrency, Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync)
154154
{
155155
VirtualHost = virtualHost;
156156
UserName = userName;
@@ -170,7 +170,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
170170
ContinuationTimeout = continuationTimeout;
171171
HandshakeContinuationTimeout = handshakeContinuationTimeout;
172172
RequestedConnectionTimeout = requestedConnectionTimeout;
173-
DispatchConsumerConcurrency = dispatchConsumerConcurrency;
173+
ConsumerDispatchConcurrency = consumerDispatchConcurrency;
174174
FrameHandlerFactoryAsync = frameHandlerFactoryAsync;
175175
}
176176
}

projects/RabbitMQ.Client/client/framing/Channel.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ namespace RabbitMQ.Client.Framing.Impl
3838
{
3939
internal class Channel : ChannelBase
4040
{
41-
public Channel(ConnectionConfig config, ISession session, int? dispatchConsumerConcurrency = null) : base(config, session, dispatchConsumerConcurrency)
41+
public Channel(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
42+
: base(config, session, consumerDispatchConcurrency)
4243
{
4344
}
4445

projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable
5252
private bool _usesPublisherConfirms;
5353
private bool _tracksPublisherConfirmations;
5454
private bool _usesTransactions;
55+
private ushort _consumerDispatchConcurrency;
5556

5657
internal IConsumerDispatcher ConsumerDispatcher => InnerChannel.ConsumerDispatcher;
5758

@@ -70,10 +71,12 @@ public TimeSpan ContinuationTimeout
7071
set => InnerChannel.ContinuationTimeout = value;
7172
}
7273

73-
public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel innerChannel)
74+
public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel innerChannel,
75+
ushort consumerDispatchConcurrency)
7476
{
7577
_connection = conn;
7678
_innerChannel = innerChannel;
79+
_consumerDispatchConcurrency = consumerDispatchConcurrency;
7780
}
7881

7982
public event EventHandler<BasicAckEventArgs> BasicAcks
@@ -160,7 +163,8 @@ internal async Task<bool> AutomaticallyRecoverAsync(AutorecoveringConnection con
160163

161164
_connection = conn;
162165

163-
RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(cancellationToken: cancellationToken)
166+
RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(_consumerDispatchConcurrency,
167+
cancellationToken: cancellationToken)
164168
.ConfigureAwait(false);
165169
newChannel.TakeOver(_innerChannel);
166170

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -173,10 +173,11 @@ public event EventHandler<RecoveringConsumerEventArgs> RecoveringConsumer
173173

174174
public IProtocol Protocol => Endpoint.Protocol;
175175

176-
public async ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(int? dispatchConsumerConcurrency = null, CancellationToken cancellationToken = default)
176+
public async ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(ushort consumerDispatchConcurrency,
177+
CancellationToken cancellationToken = default)
177178
{
178179
ISession session = InnerConnection.CreateSession();
179-
var result = new RecoveryAwareChannel(_config, session, dispatchConsumerConcurrency);
180+
var result = new RecoveryAwareChannel(_config, session, consumerDispatchConcurrency);
180181
return (RecoveryAwareChannel)await result.OpenAsync(cancellationToken).ConfigureAwait(false);
181182
}
182183

@@ -239,12 +240,12 @@ await CloseInnerConnectionAsync()
239240
}
240241
}
241242

242-
public async Task<IChannel> CreateChannelAsync(ushort dispatchConsumerConcurrency, CancellationToken cancellationToken = default)
243+
public async Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default)
243244
{
244245
EnsureIsOpen();
245-
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(dispatchConsumerConcurrency, cancellationToken)
246+
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(consumerDispatchConcurrency, cancellationToken)
246247
.ConfigureAwait(false);
247-
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel);
248+
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, consumerDispatchConcurrency);
248249
await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
249250
.ConfigureAwait(false);
250251
return channel;

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,10 @@ internal abstract class ChannelBase : IChannel, IRecoverable
7373

7474
internal readonly IConsumerDispatcher ConsumerDispatcher;
7575

76-
protected ChannelBase(ConnectionConfig config, ISession session, int? dispatchConsumerConcurrency = null)
76+
protected ChannelBase(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
7777
{
7878
ContinuationTimeout = config.ContinuationTimeout;
79-
ConsumerDispatcher = new AsyncConsumerDispatcher(this, dispatchConsumerConcurrency.GetValueOrDefault(config.DispatchConsumerConcurrency));
79+
ConsumerDispatcher = new AsyncConsumerDispatcher(this, consumerDispatchConcurrency);
8080
Action<Exception, string> onException = (exception, context) =>
8181
OnCallbackException(CallbackExceptionEventArgs.Build(exception, context));
8282
_basicAcksWrapper = new EventingWrapper<BasicAckEventArgs>("OnBasicAck", onException);

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler)
7272

7373
_sessionManager = new SessionManager(this, 0, config.MaxInboundMessageBodySize);
7474
_session0 = new MainSession(this, config.MaxInboundMessageBodySize);
75-
_channel0 = new Channel(_config, _session0); ;
75+
_channel0 = new Channel(_config, _session0, ConnectionFactory.DefaultConsumerDispatchConcurrency); ;
7676

7777
ClientProperties = new Dictionary<string, object?>(_config.ClientProperties)
7878
{
@@ -253,11 +253,11 @@ await CloseAsync(ea, true,
253253
}
254254
}
255255

256-
public Task<IChannel> CreateChannelAsync(ushort dispatchConsumerConcurrency, CancellationToken cancellationToken = default)
256+
public Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default)
257257
{
258258
EnsureIsOpen();
259259
ISession session = CreateSession();
260-
var channel = new Channel(_config, session, dispatchConsumerConcurrency);
260+
var channel = new Channel(_config, session, consumerDispatchConcurrency);
261261
return channel.OpenAsync(cancellationToken);
262262
}
263263

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ namespace RabbitMQ.Client.ConsumerDispatching
88
{
99
internal sealed class AsyncConsumerDispatcher : ConsumerDispatcherChannelBase
1010
{
11-
internal AsyncConsumerDispatcher(ChannelBase channel, int concurrency)
11+
internal AsyncConsumerDispatcher(ChannelBase channel, ushort concurrency)
1212
: base(channel, concurrency)
1313
{
1414
}

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase,
1717
private bool _quiesce = false;
1818
private bool _disposed;
1919

20-
internal ConsumerDispatcherChannelBase(ChannelBase channel, int concurrency)
20+
internal ConsumerDispatcherChannelBase(ChannelBase channel, ushort concurrency)
2121
{
2222
_channel = channel;
2323
var workChannel = Channel.CreateUnbounded<WorkStruct>(new UnboundedChannelOptions

0 commit comments

Comments
 (0)