Skip to content

Commit bbc33f1

Browse files
authored
Revert "Follow-up to #1669 - per-channel dispatch concurrency"
1 parent 735bbca commit bbc33f1

15 files changed

+44
-87
lines changed

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -892,6 +892,7 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client
892892
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
893893
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
894894
RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
895-
const RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency = 1 -> ushort
895+
const RabbitMQ.Client.ConnectionFactory.DefaultConsumerDispatchConcurrency = 1 -> ushort
896+
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort consumerDispatchConcurrency, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
897+
static RabbitMQ.Client.IConnectionExtensions.CreateChannelAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
896898
readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort
897-
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort? consumerDispatchConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ namespace RabbitMQ.Client
9292
///hosts with an empty name are not addressable. </para></remarks>
9393
public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
9494
{
95+
/// <summary>
96+
/// Default value for consumer dispatch concurrency.
97+
/// </summary>
98+
public const ushort DefaultConsumerDispatchConcurrency = 1;
99+
95100
/// <summary>
96101
/// Default value for the desired maximum channel number. Default: 2047.
97102
/// </summary>
@@ -175,7 +180,7 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
175180
/// </summary>
176181
/// <remarks>For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
177182
/// In addition to that consumers need to be thread/concurrency safe.</remarks>
178-
public ushort ConsumerDispatchConcurrency { get; set; } = Constants.DefaultConsumerDispatchConcurrency;
183+
public ushort ConsumerDispatchConcurrency { get; set; } = DefaultConsumerDispatchConcurrency;
179184

180185
/// <summary>The host to connect to.</summary>
181186
public string HostName { get; set; } = "localhost";

projects/RabbitMQ.Client/client/api/IConnection.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,13 +239,12 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
239239
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
240240
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
241241
///
242-
/// Defaults to <c>null</c>, which will use the value from <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
242+
/// Defaults to <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>.
243243
///
244244
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
245245
/// In addition to that consumers need to be thread/concurrency safe.
246246
/// </param>
247247
/// <param name="cancellationToken">Cancellation token</param>
248-
Task<IChannel> CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
249-
CancellationToken cancellationToken = default);
248+
Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default);
250249
}
251250
}

projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ namespace RabbitMQ.Client
77
{
88
public static class IConnectionExtensions
99
{
10+
/// <summary>
11+
/// Asynchronously create and return a fresh channel, session, and channel.
12+
/// </summary>
13+
public static Task<IChannel> CreateChannelAsync(this IConnection connection, CancellationToken cancellationToken = default) =>
14+
connection.CreateChannelAsync(ConnectionFactory.DefaultConsumerDispatchConcurrency, cancellationToken);
15+
1016
/// <summary>
1117
/// Asynchronously close this connection and all its channels.
1218
/// </summary>

projects/RabbitMQ.Client/client/framing/Channel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ namespace RabbitMQ.Client.Framing.Impl
3838
{
3939
internal class Channel : ChannelBase
4040
{
41-
public Channel(ConnectionConfig config, ISession session, ushort? consumerDispatchConcurrency = null)
41+
public Channel(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
4242
: base(config, session, consumerDispatchConcurrency)
4343
{
4444
}

projects/RabbitMQ.Client/client/framing/Constants.cs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,5 @@ public static class Constants
8383
public const int NotImplemented = 540;
8484
///<summary>(= 541)</summary>
8585
public const int InternalError = 541;
86-
87-
/// <summary>
88-
/// The default consumer dispatch concurrency. See <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
89-
/// to set this value for every channel created on a connection,
90-
/// and <see cref="IConnection.CreateChannelAsync(ushort?, System.Threading.CancellationToken)"/>
91-
/// for setting this value for a particular channel.
92-
/// </summary>
93-
public const ushort DefaultConsumerDispatchConcurrency = 1;
9486
}
9587
}

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -240,14 +240,12 @@ await CloseInnerConnectionAsync()
240240
}
241241
}
242242

243-
public async Task<IChannel> CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
244-
CancellationToken cancellationToken = default)
243+
public async Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default)
245244
{
246245
EnsureIsOpen();
247-
ushort cdc = consumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);
248-
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(cdc, cancellationToken)
246+
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(consumerDispatchConcurrency, cancellationToken)
249247
.ConfigureAwait(false);
250-
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc);
248+
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, consumerDispatchConcurrency);
251249
await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
252250
.ConfigureAwait(false);
253251
return channel;

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,10 @@ internal abstract class ChannelBase : IChannel, IRecoverable
7373

7474
internal readonly IConsumerDispatcher ConsumerDispatcher;
7575

76-
protected ChannelBase(ConnectionConfig config, ISession session,
77-
ushort? perChannelConsumerDispatchConcurrency = null)
76+
protected ChannelBase(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
7877
{
7978
ContinuationTimeout = config.ContinuationTimeout;
80-
ConsumerDispatcher = new AsyncConsumerDispatcher(this,
81-
perChannelConsumerDispatchConcurrency.GetValueOrDefault(config.ConsumerDispatchConcurrency));
79+
ConsumerDispatcher = new AsyncConsumerDispatcher(this, consumerDispatchConcurrency);
8280
Action<Exception, string> onException = (exception, context) =>
8381
OnCallbackException(CallbackExceptionEventArgs.Build(exception, context));
8482
_basicAcksWrapper = new EventingWrapper<BasicAckEventArgs>("OnBasicAck", onException);

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler)
7272

7373
_sessionManager = new SessionManager(this, 0, config.MaxInboundMessageBodySize);
7474
_session0 = new MainSession(this, config.MaxInboundMessageBodySize);
75-
_channel0 = new Channel(_config, _session0);
75+
_channel0 = new Channel(_config, _session0, ConnectionFactory.DefaultConsumerDispatchConcurrency); ;
7676

7777
ClientProperties = new Dictionary<string, object?>(_config.ClientProperties)
7878
{
@@ -253,8 +253,7 @@ await CloseAsync(ea, true,
253253
}
254254
}
255255

256-
public Task<IChannel> CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
257-
CancellationToken cancellationToken = default)
256+
public Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default)
258257
{
259258
EnsureIsOpen();
260259
ISession session = CreateSession();

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,42 +14,44 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase,
1414
protected readonly ChannelReader<WorkStruct> _reader;
1515
private readonly ChannelWriter<WorkStruct> _writer;
1616
private readonly Task _worker;
17-
private readonly ushort _concurrency;
1817
private bool _quiesce = false;
1918
private bool _disposed;
2019

2120
internal ConsumerDispatcherChannelBase(ChannelBase channel, ushort concurrency)
2221
{
2322
_channel = channel;
24-
_concurrency = concurrency;
2523
var workChannel = Channel.CreateUnbounded<WorkStruct>(new UnboundedChannelOptions
2624
{
27-
SingleReader = _concurrency == 1,
25+
SingleReader = concurrency == 1,
2826
SingleWriter = false,
2927
AllowSynchronousContinuations = false
3028
});
3129
_reader = workChannel.Reader;
3230
_writer = workChannel.Writer;
3331

3432
Func<Task> loopStart = ProcessChannelAsync;
35-
if (_concurrency == 1)
33+
if (concurrency == 1)
3634
{
3735
_worker = Task.Run(loopStart);
3836
}
3937
else
4038
{
41-
var tasks = new Task[_concurrency];
42-
for (int i = 0; i < _concurrency; i++)
39+
var tasks = new Task[concurrency];
40+
for (int i = 0; i < concurrency; i++)
4341
{
4442
tasks[i] = Task.Run(loopStart);
4543
}
4644
_worker = Task.WhenAll(tasks);
4745
}
4846
}
4947

50-
public bool IsShutdown => _quiesce;
51-
52-
public ushort Concurrency => _concurrency;
48+
public bool IsShutdown
49+
{
50+
get
51+
{
52+
return _quiesce;
53+
}
54+
}
5355

5456
public ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken)
5557
{

0 commit comments

Comments
 (0)