Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions projects/Applications/GH-1647/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,7 @@
Password = "guest"
};

var channelOptions = new CreateChannelOptions
{
PublisherConfirmationsEnabled = true,
PublisherConfirmationTrackingEnabled = true
};
var channelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true);

var props = new BasicProperties();
byte[] msg = Encoding.UTF8.GetBytes("test");
Expand Down
4 changes: 3 additions & 1 deletion projects/Applications/MassPublish/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ await consumeChannel.BasicConsumeAsync(queue: QueueName, autoAck: true, consumer

publishTasks.Add(Task.Run(async () =>
{
using IChannel publishChannel = await publishConnection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: true);
using IChannel publishChannel = await publishConnection.CreateChannelAsync(createChannelOptions);
publishChannel.ChannelShutdownAsync += Channel_ChannelShutdownAsync;

for (int i = 0; i < ItemsPerBatch; i++)
Expand Down
13 changes: 6 additions & 7 deletions projects/Applications/PublisherConfirms/PublisherConfirms.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,11 @@
const int MESSAGE_COUNT = 50_000;
bool debug = false;

var channelOpts = new CreateChannelOptions
{
PublisherConfirmationsEnabled = true,
PublisherConfirmationTrackingEnabled = true,
OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(MAX_OUTSTANDING_CONFIRMS)
};
var channelOpts = new CreateChannelOptions(
publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: true,
outstandingPublisherConfirmationsRateLimiter: new ThrottlingRateLimiter(MAX_OUTSTANDING_CONFIRMS)
);

var props = new BasicProperties
{
Expand Down Expand Up @@ -177,7 +176,7 @@ async Task HandlePublishConfirmsAsynchronously()

await using IConnection connection = await CreateConnectionAsync();

channelOpts.PublisherConfirmationTrackingEnabled = false;
channelOpts = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: false);
await using IChannel channel = await connection.CreateChannelAsync(channelOpts);

// declare a server-named queue
Expand Down
27 changes: 17 additions & 10 deletions projects/RabbitMQ.Client/CreateChannelOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public sealed class CreateChannelOptions
/// <see cref="IChannel.GetNextPublishSequenceNumberAsync(System.Threading.CancellationToken)"/> to allow correlation
/// of the response with the correct message.
/// </summary>
public bool PublisherConfirmationsEnabled { get; set; } = false;
public readonly bool PublisherConfirmationsEnabled = false;

/// <summary>
/// Should this library track publisher confirmations for you? Defaults to <c>false</c>
Expand All @@ -63,7 +63,7 @@ public sealed class CreateChannelOptions
/// If the broker then sends a <c>basic.return</c> response for the message, this library can
/// then correctly handle the message.
/// </summary>
public bool PublisherConfirmationTrackingEnabled { get; set; } = false;
public readonly bool PublisherConfirmationTrackingEnabled = false;

/// <summary>
/// If the publisher confirmation tracking is enabled, this represents the rate limiter used to
Expand All @@ -72,7 +72,7 @@ public sealed class CreateChannelOptions
/// Defaults to a <see cref="ThrottlingRateLimiter"/> with a limit of 128 and a throttling percentage of 50% with a delay during throttling.
/// </summary>
/// <remarks>Setting the rate limiter to <c>null</c> disables the rate limiting entirely.</remarks>
public RateLimiter? OutstandingPublisherConfirmationsRateLimiter { get; set; } = new ThrottlingRateLimiter(128);
public readonly RateLimiter? OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(128);

/// <summary>
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
Expand All @@ -84,10 +84,17 @@ public sealed class CreateChannelOptions
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
/// In addition to that consumers need to be thread/concurrency safe.
/// </summary>
public ushort? ConsumerDispatchConcurrency { get; set; } = null;
public readonly ushort? ConsumerDispatchConcurrency = null;

public CreateChannelOptions()
public CreateChannelOptions(bool publisherConfirmationsEnabled,
bool publisherConfirmationTrackingEnabled,
RateLimiter? outstandingPublisherConfirmationsRateLimiter = null,
ushort? consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency)
{
PublisherConfirmationsEnabled = publisherConfirmationsEnabled;
PublisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
OutstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter;
ConsumerDispatchConcurrency = consumerDispatchConcurrency;
}

internal ushort InternalConsumerDispatchConcurrency
Expand Down Expand Up @@ -116,22 +123,22 @@ internal CreateChannelOptions(ConnectionConfig connectionConfig)
_connectionConfigContinuationTimeout = connectionConfig.ContinuationTimeout;
}

private void WithConnectionConfig(ConnectionConfig connectionConfig)
private CreateChannelOptions WithConnectionConfig(ConnectionConfig connectionConfig)
{
_connectionConfigConsumerDispatchConcurrency = connectionConfig.ConsumerDispatchConcurrency;
_connectionConfigContinuationTimeout = connectionConfig.ContinuationTimeout;
return this;
}

internal static CreateChannelOptions CreateOrUpdate(CreateChannelOptions? createChannelOptions, ConnectionConfig config)
{
if (createChannelOptions is not null)
if (createChannelOptions is null)
{
createChannelOptions.WithConnectionConfig(config);
return createChannelOptions;
return new CreateChannelOptions(config);
}
else
{
return new CreateChannelOptions(config);
return createChannelOptions.WithConnectionConfig(config);
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions projects/RabbitMQ.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -891,15 +891,15 @@
override RabbitMQ.Client.ThrottlingRateLimiter.GetStatistics() -> System.Threading.RateLimiting.RateLimiterStatistics?
override RabbitMQ.Client.ThrottlingRateLimiter.IdleDuration.get -> System.TimeSpan?
RabbitMQ.Client.CreateChannelOptions
RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.get -> ushort?

Check warning on line 894 in projects/RabbitMQ.Client/PublicAPI.Shipped.txt

View workflow job for this annotation

GitHub Actions / oauth2-uaa

Symbol 'RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.get -> ushort?' is part of the declared API, but is either not public or could not be found (https://github.com/dotnet/roslyn-analyzers/blob/main/src/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)

Check warning on line 894 in projects/RabbitMQ.Client/PublicAPI.Shipped.txt

View workflow job for this annotation

GitHub Actions / oauth2-keycloak

Symbol 'RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.get -> ushort?' is part of the declared API, but is either not public or could not be found (https://github.com/dotnet/roslyn-analyzers/blob/main/src/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)
RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.set -> void

Check warning on line 895 in projects/RabbitMQ.Client/PublicAPI.Shipped.txt

View workflow job for this annotation

GitHub Actions / oauth2-uaa

Symbol 'RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.set -> void' is part of the declared API, but is either not public or could not be found (https://github.com/dotnet/roslyn-analyzers/blob/main/src/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)

Check warning on line 895 in projects/RabbitMQ.Client/PublicAPI.Shipped.txt

View workflow job for this annotation

GitHub Actions / oauth2-keycloak

Symbol 'RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.set -> void' is part of the declared API, but is either not public or could not be found (https://github.com/dotnet/roslyn-analyzers/blob/main/src/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)
RabbitMQ.Client.CreateChannelOptions.CreateChannelOptions() -> void

Check warning on line 896 in projects/RabbitMQ.Client/PublicAPI.Shipped.txt

View workflow job for this annotation

GitHub Actions / oauth2-uaa

Symbol 'RabbitMQ.Client.CreateChannelOptions.CreateChannelOptions() -> void' is part of the declared API, but is either not public or could not be found (https://github.com/dotnet/roslyn-analyzers/blob/main/src/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)

Check warning on line 896 in projects/RabbitMQ.Client/PublicAPI.Shipped.txt

View workflow job for this annotation

GitHub Actions / oauth2-keycloak

Symbol 'RabbitMQ.Client.CreateChannelOptions.CreateChannelOptions() -> void' is part of the declared API, but is either not public or could not be found (https://github.com/dotnet/roslyn-analyzers/blob/main/src/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)
RabbitMQ.Client.CreateChannelOptions.OutstandingPublisherConfirmationsRateLimiter.get -> System.Threading.RateLimiting.RateLimiter?

Check warning on line 897 in projects/RabbitMQ.Client/PublicAPI.Shipped.txt

View workflow job for this annotation

GitHub Actions / oauth2-uaa

Symbol 'RabbitMQ.Client.CreateChannelOptions.OutstandingPublisherConfirmationsRateLimiter.get -> System.Threading.RateLimiting.RateLimiter?' is part of the declared API, but is either not public or could not be found (https://github.com/dotnet/roslyn-analyzers/blob/main/src/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)

Check warning on line 897 in projects/RabbitMQ.Client/PublicAPI.Shipped.txt

View workflow job for this annotation

GitHub Actions / oauth2-keycloak

Symbol 'RabbitMQ.Client.CreateChannelOptions.OutstandingPublisherConfirmationsRateLimiter.get -> System.Threading.RateLimiting.RateLimiter?' is part of the declared API, but is either not public or could not be found (https://github.com/dotnet/roslyn-analyzers/blob/main/src/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)
RabbitMQ.Client.CreateChannelOptions.OutstandingPublisherConfirmationsRateLimiter.set -> void

Check warning on line 898 in projects/RabbitMQ.Client/PublicAPI.Shipped.txt

View workflow job for this annotation

GitHub Actions / oauth2-uaa

Symbol 'RabbitMQ.Client.CreateChannelOptions.OutstandingPublisherConfirmationsRateLimiter.set -> void' is part of the declared API, but is either not public or could not be found (https://github.com/dotnet/roslyn-analyzers/blob/main/src/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)

Check warning on line 898 in projects/RabbitMQ.Client/PublicAPI.Shipped.txt

View workflow job for this annotation

GitHub Actions / oauth2-keycloak

Symbol 'RabbitMQ.Client.CreateChannelOptions.OutstandingPublisherConfirmationsRateLimiter.set -> void' is part of the declared API, but is either not public or could not be found (https://github.com/dotnet/roslyn-analyzers/blob/main/src/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.get -> bool

Check warning on line 899 in projects/RabbitMQ.Client/PublicAPI.Shipped.txt

View workflow job for this annotation

GitHub Actions / oauth2-uaa

Symbol 'RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.get -> bool' is part of the declared API, but is either not public or could not be found (https://github.com/dotnet/roslyn-analyzers/blob/main/src/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)

Check warning on line 899 in projects/RabbitMQ.Client/PublicAPI.Shipped.txt

View workflow job for this annotation

GitHub Actions / oauth2-keycloak

Symbol 'RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.get -> bool' is part of the declared API, but is either not public or could not be found (https://github.com/dotnet/roslyn-analyzers/blob/main/src/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.set -> void

Check warning on line 900 in projects/RabbitMQ.Client/PublicAPI.Shipped.txt

View workflow job for this annotation

GitHub Actions / oauth2-uaa

Symbol 'RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.set -> void' is part of the declared API, but is either not public or could not be found (https://github.com/dotnet/roslyn-analyzers/blob/main/src/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)

Check warning on line 900 in projects/RabbitMQ.Client/PublicAPI.Shipped.txt

View workflow job for this annotation

GitHub Actions / oauth2-keycloak

Symbol 'RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.set -> void' is part of the declared API, but is either not public or could not be found (https://github.com/dotnet/roslyn-analyzers/blob/main/src/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled.get -> bool

Check warning on line 901 in projects/RabbitMQ.Client/PublicAPI.Shipped.txt

View workflow job for this annotation

GitHub Actions / oauth2-uaa

Symbol 'RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled.get -> bool' is part of the declared API, but is either not public or could not be found (https://github.com/dotnet/roslyn-analyzers/blob/main/src/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)

Check warning on line 901 in projects/RabbitMQ.Client/PublicAPI.Shipped.txt

View workflow job for this annotation

GitHub Actions / oauth2-keycloak

Symbol 'RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled.get -> bool' is part of the declared API, but is either not public or could not be found (https://github.com/dotnet/roslyn-analyzers/blob/main/src/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled.set -> void

Check warning on line 902 in projects/RabbitMQ.Client/PublicAPI.Shipped.txt

View workflow job for this annotation

GitHub Actions / oauth2-uaa

Symbol 'RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled.set -> void' is part of the declared API, but is either not public or could not be found (https://github.com/dotnet/roslyn-analyzers/blob/main/src/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)

Check warning on line 902 in projects/RabbitMQ.Client/PublicAPI.Shipped.txt

View workflow job for this annotation

GitHub Actions / oauth2-keycloak

Symbol 'RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled.set -> void' is part of the declared API, but is either not public or could not be found (https://github.com/dotnet/roslyn-analyzers/blob/main/src/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)
RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(RabbitMQ.Client.Events.ShutdownEventArgs! reason) -> void
RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(RabbitMQ.Client.Events.ShutdownEventArgs! reason, string! prefix) -> void
RabbitMQ.Client.Exceptions.ProtocolViolationException.ProtocolViolationException() -> void
Expand All @@ -913,7 +913,7 @@
RabbitMQ.Client.IConnection.CreateChannelAsync(RabbitMQ.Client.CreateChannelOptions? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
RabbitMQ.Client.ThrottlingRateLimiter
RabbitMQ.Client.ThrottlingRateLimiter.ThrottlingRateLimiter(int maxConcurrentCalls, int? throttlingPercentage = 50) -> void
static RabbitMQ.Client.CreateChannelOptions.Default.get -> RabbitMQ.Client.CreateChannelOptions!

Check warning on line 916 in projects/RabbitMQ.Client/PublicAPI.Shipped.txt

View workflow job for this annotation

GitHub Actions / oauth2-uaa

Symbol 'static RabbitMQ.Client.CreateChannelOptions.Default.get -> RabbitMQ.Client.CreateChannelOptions!' is part of the declared API, but is either not public or could not be found (https://github.com/dotnet/roslyn-analyzers/blob/main/src/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)

Check warning on line 916 in projects/RabbitMQ.Client/PublicAPI.Shipped.txt

View workflow job for this annotation

GitHub Actions / oauth2-keycloak

Symbol 'static RabbitMQ.Client.CreateChannelOptions.Default.get -> RabbitMQ.Client.CreateChannelOptions!' is part of the declared API, but is either not public or could not be found (https://github.com/dotnet/roslyn-analyzers/blob/main/src/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
Expand All @@ -921,3 +921,8 @@
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync<T>(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, T basicProperties, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static readonly RabbitMQ.Client.Constants.DefaultContinuationTimeout -> System.TimeSpan
static readonly RabbitMQ.Client.Constants.DefaultHandshakeContinuationTimeout -> System.TimeSpan
RabbitMQ.Client.CreateChannelOptions.CreateChannelOptions(bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled, System.Threading.RateLimiting.RateLimiter? outstandingPublisherConfirmationsRateLimiter = null, ushort? consumerDispatchConcurrency = 1) -> void
readonly RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency -> ushort?
readonly RabbitMQ.Client.CreateChannelOptions.OutstandingPublisherConfirmationsRateLimiter -> System.Threading.RateLimiting.RateLimiter?
readonly RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled -> bool
readonly RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled -> bool
5 changes: 4 additions & 1 deletion projects/Test/Common/IntegrationFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public abstract class IntegrationFixture : IAsyncLifetime
protected IConnection _conn;
protected IChannel _channel;

protected static readonly CreateChannelOptions _createChannelOptions = new(publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: true);

protected static readonly Encoding _encoding = new UTF8Encoding();
protected static readonly int _processorCount = Environment.ProcessorCount;

Expand Down Expand Up @@ -153,7 +156,7 @@ public virtual async Task InitializeAsync()

if (_openChannel)
{
_channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
_channel = await _conn.CreateChannelAsync(_createChannelOptions);
}

if (IsVerbose)
Expand Down
4 changes: 2 additions & 2 deletions projects/Test/Common/TestConnectionRecoveryBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ protected async Task PublishMessagesWhileClosingConnAsync(string queueName)
{
using (AutorecoveringConnection publishingConn = await CreateAutorecoveringConnectionAsync())
{
using (IChannel publishingChannel = await publishingConn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
using (IChannel publishingChannel = await publishingConn.CreateChannelAsync(_createChannelOptions))
{
for (ushort i = 0; i < TotalMessageCount; i++)
{
Expand Down Expand Up @@ -342,7 +342,7 @@ public virtual Task PostHandleDeliveryAsync(ulong deliveryTag,

protected static async Task<bool> SendAndConsumeMessageAsync(IConnection conn, string queue, string exchange, string routingKey)
{
using (IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
using (IChannel ch = await conn.CreateChannelAsync(_createChannelOptions))
{
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName,
consumer.ReceivedAsync += MessageReceived;
await _channel.BasicConsumeAsync(queueName, true, consumer);

await using (IChannel pubCh = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
await using (IChannel pubCh = await _conn.CreateChannelAsync(_createChannelOptions))
{
await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: routingKey, body: body);
await pubCh.CloseAsync();
Expand All @@ -106,7 +106,7 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName,

await CloseAndWaitForRecoveryAsync();

await using (IChannel pubCh = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
await using (IChannel pubCh = await _conn.CreateChannelAsync(_createChannelOptions))
{
await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: "unused", body: body);
await pubCh.CloseAsync();
Expand Down
14 changes: 10 additions & 4 deletions projects/Test/Integration/TestAsyncConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
});
return Task.CompletedTask;
};
await using (IChannel publishChannel = await publishConn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
await using (IChannel publishChannel = await publishConn.CreateChannelAsync(_createChannelOptions))
{
AddCallbackExceptionHandlers(publishConn, publishChannel);
publishChannel.DefaultConsumer = new DefaultAsyncConsumer(publishChannel,
Expand Down Expand Up @@ -646,7 +646,7 @@ public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650()
var consumer1 = new AsyncEventingBasicConsumer(_channel);
consumer1.ReceivedAsync += async (sender, args) =>
{
await using IChannel innerChannel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
await using IChannel innerChannel = await _conn.CreateChannelAsync(_createChannelOptions);
await innerChannel.BasicPublishAsync(exchangeName, queue2Name,
mandatory: true,
body: Encoding.ASCII.GetBytes(nameof(TestCreateChannelWithinAsyncConsumerCallback_GH650)));
Expand Down Expand Up @@ -707,9 +707,15 @@ private async Task ValidateConsumerDispatchConcurrency()
AutorecoveringChannel autorecoveringChannel = (AutorecoveringChannel)_channel;
Assert.Equal(ConsumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);
Assert.Equal(_consumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);
await using IChannel ch = await _conn.CreateChannelAsync(
new CreateChannelOptions { ConsumerDispatchConcurrency = expectedConsumerDispatchConcurrency });

var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: false,
publisherConfirmationTrackingEnabled: false,
outstandingPublisherConfirmationsRateLimiter: null,
consumerDispatchConcurrency: expectedConsumerDispatchConcurrency);

await using IChannel ch = await _conn.CreateChannelAsync(createChannelOptions);
AutorecoveringChannel ach = (AutorecoveringChannel)ch;

Assert.Equal(expectedConsumerDispatchConcurrency, ach.ConsumerDispatcher.Concurrency);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public async Task TestAsyncEventingBasicConsumer_GH1038()
await _channel.BasicConsumeAsync(queueName, false, consumer);

//publisher
await using IChannel publisherChannel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
await using IChannel publisherChannel = await _conn.CreateChannelAsync(_createChannelOptions);
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
var props = new BasicProperties();
await publisherChannel.BasicPublishAsync(exchange: exchangeName, routingKey: string.Empty,
Expand Down
12 changes: 6 additions & 6 deletions projects/Test/Integration/TestBasicPublish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public override Task InitializeAsync()
public async Task TestBasicRoundtripArray()
{
_conn = await _connFactory.CreateConnectionAsync();
_channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
_channel = await _conn.CreateChannelAsync(_createChannelOptions);

QueueDeclareOk q = await _channel.QueueDeclareAsync();
var bp = new BasicProperties();
Expand Down Expand Up @@ -88,7 +88,7 @@ public async Task TestBasicRoundtripArray()
public async Task TestBasicRoundtripCachedString()
{
_conn = await _connFactory.CreateConnectionAsync();
_channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
_channel = await _conn.CreateChannelAsync(_createChannelOptions);

CachedString exchangeName = new CachedString(string.Empty);
CachedString queueName = new CachedString((await _channel.QueueDeclareAsync()).QueueName);
Expand Down Expand Up @@ -116,7 +116,7 @@ public async Task TestBasicRoundtripCachedString()
public async Task TestBasicRoundtripReadOnlyMemory()
{
_conn = await _connFactory.CreateConnectionAsync();
_channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
_channel = await _conn.CreateChannelAsync(_createChannelOptions);

QueueDeclareOk q = await _channel.QueueDeclareAsync();
byte[] sendBody = _encoding.GetBytes("hi");
Expand All @@ -143,7 +143,7 @@ public async Task TestBasicRoundtripReadOnlyMemory()
public async Task CanNotModifyPayloadAfterPublish()
{
_conn = await _connFactory.CreateConnectionAsync();
_channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
_channel = await _conn.CreateChannelAsync(_createChannelOptions);

QueueDeclareOk q = await _channel.QueueDeclareAsync();
byte[] sendBody = new byte[1000];
Expand Down Expand Up @@ -204,7 +204,7 @@ public async Task TestMaxInboundMessageBodySize()
Assert.Equal(maxMsgSize, cf.Endpoint.MaxInboundMessageBodySize);
Assert.Equal(maxMsgSize, conn.Endpoint.MaxInboundMessageBodySize);

await using (IChannel channel = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
await using (IChannel channel = await conn.CreateChannelAsync(_createChannelOptions))
{
channel.ChannelShutdownAsync += (o, a) =>
{
Expand Down Expand Up @@ -291,7 +291,7 @@ public async Task TestMaxInboundMessageBodySize()
public async Task TestPropertiesRoundtrip_Headers()
{
_conn = await _connFactory.CreateConnectionAsync();
_channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
_channel = await _conn.CreateChannelAsync(_createChannelOptions);

var subject = new BasicProperties
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, in

try
{
await using IChannel ch = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true });
var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: false);
await using IChannel ch = await _conn.CreateChannelAsync(createChannelOptions);
ch.ChannelShutdownAsync += (o, ea) =>
{
HandleChannelShutdown(ch, ea, (args) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public async Task TestTopologyRecoveryConsumerFilter()
return Task.CompletedTask;
};

await using (IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
await using (IChannel ch = await conn.CreateChannelAsync(_createChannelOptions))
{
await ch.ExchangeDeclareAsync(exchange, "direct");
await ch.QueueDeclareAsync(queueWithRecoveredConsumer, false, false, false);
Expand Down
Loading
Loading