Skip to content

Commit 4ed28d7

Browse files
committed
Remove ChannelOptions internal class
Update `CreateChannelOptions` so that it provides the same internal behavior that `ChannelOptions` did.
1 parent af007e1 commit 4ed28d7

10 files changed

+98
-132
lines changed

projects/RabbitMQ.Client/ConnectionFactory.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,8 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
185185
/// </summary>
186186
public TimeSpan NetworkRecoveryInterval { get; set; } = TimeSpan.FromSeconds(5);
187187

188-
private TimeSpan _handshakeContinuationTimeout = TimeSpan.FromSeconds(10);
189-
private TimeSpan _continuationTimeout = TimeSpan.FromSeconds(20);
188+
private TimeSpan _handshakeContinuationTimeout = Constants.DefaultHandshakeContinuationTimeout;
189+
private TimeSpan _continuationTimeout = Constants.DefaultContinuationTimeout;
190190

191191
// just here to hold the value that was set through the setter
192192
private string? _clientProvidedName;

projects/RabbitMQ.Client/Constants.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
3030
//---------------------------------------------------------------------------
3131

32+
using System;
33+
3234
namespace RabbitMQ.Client
3335
{
3436
public static class Constants
@@ -97,5 +99,15 @@ public static class Constants
9799
/// <c>basic.return</c> is sent via the broker.
98100
/// </summary>
99101
public const string PublishSequenceNumberHeader = "x-dotnet-pub-seq-no";
102+
103+
/// <summary>
104+
/// The default timeout for initial AMQP handshake
105+
/// </summary>
106+
public static readonly TimeSpan DefaultHandshakeContinuationTimeout = TimeSpan.FromSeconds(10);
107+
108+
/// <summary>
109+
/// The default timeout for RPC methods
110+
/// </summary>
111+
public static readonly TimeSpan DefaultContinuationTimeout = TimeSpan.FromSeconds(20);
100112
}
101113
}

projects/RabbitMQ.Client/CreateChannelOptions.cs

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
3030
//---------------------------------------------------------------------------
3131

32+
using System;
3233
using System.Threading.RateLimiting;
33-
using RabbitMQ.Client.Impl;
3434

3535
namespace RabbitMQ.Client
3636
{
@@ -39,6 +39,9 @@ namespace RabbitMQ.Client
3939
/// </summary>
4040
public sealed class CreateChannelOptions
4141
{
42+
private ushort? _connectionConfigConsumerDispatchConcurrency;
43+
private TimeSpan _connectionConfigContinuationTimeout;
44+
4245
/// <summary>
4346
/// Enable or disable publisher confirmations on this channel. Defaults to <c>false</c>
4447
/// </summary>
@@ -70,9 +73,40 @@ public sealed class CreateChannelOptions
7073
/// </summary>
7174
public ushort? ConsumerDispatchConcurrency { get; set; } = null;
7275

73-
/// <summary>
74-
/// The default channel options.
75-
/// </summary>
76-
public static CreateChannelOptions Default { get; } = new CreateChannelOptions();
76+
public CreateChannelOptions()
77+
{
78+
}
79+
80+
internal ushort InternalConsumerDispatchConcurrency
81+
{
82+
get
83+
{
84+
if (ConsumerDispatchConcurrency is not null)
85+
{
86+
return ConsumerDispatchConcurrency.Value;
87+
}
88+
89+
if (_connectionConfigConsumerDispatchConcurrency is not null)
90+
{
91+
return _connectionConfigConsumerDispatchConcurrency.Value;
92+
}
93+
94+
return Constants.DefaultConsumerDispatchConcurrency;
95+
}
96+
}
97+
98+
internal TimeSpan ContinuationTimeout => _connectionConfigContinuationTimeout;
99+
100+
internal CreateChannelOptions(ConnectionConfig connectionConfig)
101+
{
102+
_connectionConfigConsumerDispatchConcurrency = connectionConfig.ConsumerDispatchConcurrency;
103+
_connectionConfigContinuationTimeout = connectionConfig.ContinuationTimeout;
104+
}
105+
106+
internal void WithConnectionConfig(ConnectionConfig connectionConfig)
107+
{
108+
_connectionConfigConsumerDispatchConcurrency = connectionConfig.ConsumerDispatchConcurrency;
109+
_connectionConfigContinuationTimeout = connectionConfig.ContinuationTimeout;
110+
}
77111
}
78112
}

projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ namespace RabbitMQ.Client.Impl
4242
{
4343
internal sealed class AutorecoveringChannel : IChannel, IRecoverable
4444
{
45-
private readonly ChannelOptions _channelOptions;
45+
private readonly CreateChannelOptions _createChannelOptions;
4646
private readonly List<string> _recordedConsumerTags = new List<string>();
4747

4848
private AutorecoveringConnection _connection;
@@ -73,11 +73,11 @@ public TimeSpan ContinuationTimeout
7373

7474
public AutorecoveringChannel(AutorecoveringConnection conn,
7575
RecoveryAwareChannel innerChannel,
76-
ChannelOptions channelOptions)
76+
CreateChannelOptions createChannelOptions)
7777
{
7878
_connection = conn;
7979
_innerChannel = innerChannel;
80-
_channelOptions = channelOptions;
80+
_createChannelOptions = createChannelOptions;
8181
}
8282

8383
public event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync
@@ -162,7 +162,7 @@ internal async Task<bool> AutomaticallyRecoverAsync(AutorecoveringConnection con
162162

163163
_connection = conn;
164164

165-
RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(_channelOptions, cancellationToken)
165+
RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(_createChannelOptions, cancellationToken)
166166
.ConfigureAwait(false);
167167

168168
newChannel.TakeOver(_innerChannel);

projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -185,11 +185,11 @@ public event AsyncEventHandler<RecoveringConsumerEventArgs> RecoveringConsumerAs
185185
public IProtocol Protocol => Endpoint.Protocol;
186186

187187
public ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(
188-
ChannelOptions channelOptions,
188+
CreateChannelOptions createChannelOptions,
189189
CancellationToken cancellationToken = default)
190190
{
191191
ISession session = InnerConnection.CreateSession();
192-
return RecoveryAwareChannel.CreateAndOpenAsync(session, channelOptions, cancellationToken);
192+
return RecoveryAwareChannel.CreateAndOpenAsync(session, createChannelOptions, cancellationToken);
193193
}
194194

195195
public override string ToString()
@@ -251,21 +251,24 @@ await CloseInnerConnectionAsync()
251251
}
252252
}
253253

254-
public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = default,
254+
public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? createChannelOptions = default,
255255
CancellationToken cancellationToken = default)
256256
{
257257
EnsureIsOpen();
258258

259-
options ??= CreateChannelOptions.Default;
260-
261-
ushort cdc = options.ConsumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);
262-
263-
var channelOptions = ChannelOptions.From(options, _config);
259+
if (createChannelOptions is null)
260+
{
261+
createChannelOptions = new CreateChannelOptions(_config);
262+
}
263+
else
264+
{
265+
createChannelOptions.WithConnectionConfig(_config);
266+
}
264267

265-
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(channelOptions, cancellationToken)
268+
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(createChannelOptions, cancellationToken)
266269
.ConfigureAwait(false);
267270

268-
var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, channelOptions);
271+
var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, createChannelOptions);
269272

270273
await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
271274
.ConfigureAwait(false);

projects/RabbitMQ.Client/Impl/Channel.cs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,10 @@ internal partial class Channel : IChannel, IRecoverable
6161

6262
internal readonly IConsumerDispatcher ConsumerDispatcher;
6363

64-
public Channel(ISession session, ChannelOptions channelOptions)
64+
public Channel(ISession session, CreateChannelOptions createChannelOptions)
6565
{
66-
ContinuationTimeout = channelOptions.ContinuationTimeout;
67-
ConsumerDispatcher = new AsyncConsumerDispatcher(this, channelOptions.ConsumerDispatchConcurrency);
66+
ContinuationTimeout = createChannelOptions.ContinuationTimeout;
67+
ConsumerDispatcher = new AsyncConsumerDispatcher(this, createChannelOptions.InternalConsumerDispatchConcurrency);
6868
Func<Exception, string, CancellationToken, Task> onExceptionAsync = (exception, context, cancellationToken) =>
6969
OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context, cancellationToken));
7070
_basicAcksAsyncWrapper = new AsyncEventingWrapper<BasicAckEventArgs>("OnBasicAck", onExceptionAsync);
@@ -359,12 +359,12 @@ protected bool Enqueue(IRpcContinuation k)
359359
}
360360
}
361361

362-
internal async Task<IChannel> OpenAsync(ChannelOptions channelOptions,
362+
internal async Task<IChannel> OpenAsync(CreateChannelOptions createChannelOptions,
363363
CancellationToken cancellationToken)
364364
{
365-
ConfigurePublisherConfirmations(channelOptions.PublisherConfirmationsEnabled,
366-
channelOptions.PublisherConfirmationTrackingEnabled,
367-
channelOptions.OutstandingPublisherConfirmationsRateLimiter);
365+
ConfigurePublisherConfirmations(createChannelOptions.PublisherConfirmationsEnabled,
366+
createChannelOptions.PublisherConfirmationTrackingEnabled,
367+
createChannelOptions.OutstandingPublisherConfirmationsRateLimiter);
368368

369369
bool enqueued = false;
370370
var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
@@ -1493,13 +1493,11 @@ await ModelSendAsync(in method, k.CancellationToken)
14931493
}
14941494
}
14951495

1496-
internal static Task<IChannel> CreateAndOpenAsync(CreateChannelOptions createChannelOptions,
1497-
ConnectionConfig connectionConfig, ISession session,
1496+
internal static Task<IChannel> CreateAndOpenAsync(CreateChannelOptions createChannelOptions, ISession session,
14981497
CancellationToken cancellationToken)
14991498
{
1500-
ChannelOptions channelOptions = ChannelOptions.From(createChannelOptions, connectionConfig);
1501-
var channel = new Channel(session, channelOptions);
1502-
return channel.OpenAsync(channelOptions, cancellationToken);
1499+
var channel = new Channel(session, createChannelOptions);
1500+
return channel.OpenAsync(createChannelOptions, cancellationToken);
15031501
}
15041502

15051503
/// <summary>

projects/RabbitMQ.Client/Impl/ChannelOptions.cs

Lines changed: 0 additions & 90 deletions
This file was deleted.

projects/RabbitMQ.Client/Impl/Connection.cs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler)
7878

7979
_sessionManager = new SessionManager(this, 0, config.MaxInboundMessageBodySize);
8080
_session0 = new MainSession(this, config.MaxInboundMessageBodySize);
81-
_channel0 = new Channel(_session0, ChannelOptions.From(config));
81+
_channel0 = new Channel(_session0, new CreateChannelOptions(config));
8282

8383
ClientProperties = new Dictionary<string, object?>(_config.ClientProperties)
8484
{
@@ -268,10 +268,17 @@ public Task<IChannel> CreateChannelAsync(CreateChannelOptions? createChannelOpti
268268
{
269269
EnsureIsOpen();
270270

271-
createChannelOptions ??= CreateChannelOptions.Default;
272-
ISession session = CreateSession();
271+
if (createChannelOptions is null)
272+
{
273+
createChannelOptions = new CreateChannelOptions(_config);
274+
}
275+
else
276+
{
277+
createChannelOptions.WithConnectionConfig(_config);
278+
}
273279

274-
return Channel.CreateAndOpenAsync(createChannelOptions, _config, session, cancellationToken);
280+
ISession session = CreateSession();
281+
return Channel.CreateAndOpenAsync(createChannelOptions, session, cancellationToken);
275282
}
276283

277284
internal ISession CreateSession()

projects/RabbitMQ.Client/Impl/RecoveryAwareChannel.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ namespace RabbitMQ.Client.Impl
3636
{
3737
internal sealed class RecoveryAwareChannel : Channel
3838
{
39-
public RecoveryAwareChannel(ISession session, ChannelOptions channelOptions)
40-
: base(session, channelOptions)
39+
public RecoveryAwareChannel(ISession session, CreateChannelOptions createChannelOptions)
40+
: base(session, createChannelOptions)
4141
{
4242
ActiveDeliveryTagOffset = 0;
4343
MaxSeenDeliveryTag = 0;
@@ -104,11 +104,11 @@ public override ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue,
104104
}
105105
}
106106

107-
internal static async ValueTask<RecoveryAwareChannel> CreateAndOpenAsync(ISession session, ChannelOptions channelOptions,
107+
internal static async ValueTask<RecoveryAwareChannel> CreateAndOpenAsync(ISession session, CreateChannelOptions createChannelOptions,
108108
CancellationToken cancellationToken)
109109
{
110-
var result = new RecoveryAwareChannel(session, channelOptions);
111-
return (RecoveryAwareChannel)await result.OpenAsync(channelOptions, cancellationToken)
110+
var result = new RecoveryAwareChannel(session, createChannelOptions);
111+
return (RecoveryAwareChannel)await result.OpenAsync(createChannelOptions, cancellationToken)
112112
.ConfigureAwait(false);
113113
}
114114
}

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -919,3 +919,5 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client
919919
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
920920
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
921921
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync<T>(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, T basicProperties, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
922+
static readonly RabbitMQ.Client.Constants.DefaultContinuationTimeout -> System.TimeSpan
923+
static readonly RabbitMQ.Client.Constants.DefaultHandshakeContinuationTimeout -> System.TimeSpan

0 commit comments

Comments
 (0)