diff --git a/.ci/windows/versions.json b/.ci/windows/versions.json index 511d65528e..71ca2ed68c 100644 --- a/.ci/windows/versions.json +++ b/.ci/windows/versions.json @@ -1,4 +1,4 @@ { - "erlang": "26.2.5", - "rabbitmq": "3.13.3" + "erlang": "26.2.5.2", + "rabbitmq": "3.13.6" } diff --git a/projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs b/projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs index 0dfb28a58b..caf408be0d 100644 --- a/projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs +++ b/projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs @@ -2,11 +2,10 @@ using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; -using RabbitMQ.Client.Events; namespace RabbitMQ.Benchmarks { - internal sealed class AsyncBasicConsumerFake : IAsyncBasicConsumer, IBasicConsumer + internal sealed class AsyncBasicConsumerFake : IAsyncBasicConsumer { private readonly ManualResetEventSlim _autoResetEvent; private int _current; @@ -18,7 +17,7 @@ public AsyncBasicConsumerFake(ManualResetEventSlim autoResetEvent) _autoResetEvent = autoResetEvent; } - public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, + public Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory body) { if (Interlocked.Increment(ref _current) == Count) @@ -29,53 +28,14 @@ public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redel return Task.CompletedTask; } - Task IBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, - IReadOnlyBasicProperties properties, ReadOnlyMemory body) - { - if (Interlocked.Increment(ref _current) == Count) - { - _current = 0; - _autoResetEvent.Set(); - } - return Task.CompletedTask; - } + public Task HandleBasicCancelAsync(string consumerTag) => Task.CompletedTask; - public Task HandleBasicCancel(string consumerTag) => Task.CompletedTask; + public Task HandleBasicCancelOkAsync(string consumerTag) => Task.CompletedTask; - public Task HandleBasicCancelOk(string consumerTag) => Task.CompletedTask; + public Task HandleBasicConsumeOkAsync(string consumerTag) => Task.CompletedTask; - public Task HandleBasicConsumeOk(string consumerTag) => Task.CompletedTask; - - public Task HandleChannelShutdown(object channel, ShutdownEventArgs reason) => Task.CompletedTask; + public Task HandleChannelShutdownAsync(object channel, ShutdownEventArgs reason) => Task.CompletedTask; public IChannel Channel { get; } - - event EventHandler IBasicConsumer.ConsumerCancelled - { - add { } - remove { } - } - - public event AsyncEventHandler ConsumerCancelled - { - add { } - remove { } - } - - void IBasicConsumer.HandleBasicCancelOk(string consumerTag) - { - } - - void IBasicConsumer.HandleBasicConsumeOk(string consumerTag) - { - } - - void IBasicConsumer.HandleChannelShutdown(object channel, ShutdownEventArgs reason) - { - } - - void IBasicConsumer.HandleBasicCancel(string consumerTag) - { - } } } diff --git a/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs b/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs index 9caa404d27..7fc75ffa08 100644 --- a/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs +++ b/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs @@ -52,30 +52,7 @@ public async Task AsyncConsumerDispatcher() { for (int i = 0; i < Count; i++) { - await _dispatcher.HandleBasicDeliverAsync(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, body, - CancellationToken.None); - } - _autoResetEvent.Wait(); - _autoResetEvent.Reset(); - } - } - - [GlobalSetup(Target = nameof(ConsumerDispatcher))] - public async Task SetUpConsumer() - { - _consumer.Count = Count; - _dispatcher = new ConsumerDispatcher(null, Concurrency); - await _dispatcher.HandleBasicConsumeOkAsync(_consumer, _consumerTag, CancellationToken.None); - } - - [Benchmark] - public async Task ConsumerDispatcher() - { - using (RentedMemory body = new RentedMemory(_body)) - { - for (int i = 0; i < Count; i++) - { - await _dispatcher.HandleBasicDeliverAsync(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, body, + await _dispatcher.HandleBasicDeliverAsync(_consumerTag, _deliveryTag, false, _exchange, _routingKey, default, body, CancellationToken.None); } _autoResetEvent.Wait(); diff --git a/projects/Benchmarks/Networking/Networking_BasicDeliver_Commons.cs b/projects/Benchmarks/Networking/Networking_BasicDeliver_Commons.cs index b4bb341ece..717156d7cd 100644 --- a/projects/Benchmarks/Networking/Networking_BasicDeliver_Commons.cs +++ b/projects/Benchmarks/Networking/Networking_BasicDeliver_Commons.cs @@ -1,8 +1,8 @@ -using System.Threading; +using System; +using System.Threading; using System.Threading.Tasks; using BenchmarkDotNet.Attributes; using RabbitMQ.Client; -using RabbitMQ.Client.Events; namespace Benchmarks.Networking { @@ -11,19 +11,10 @@ public class Networking_BasicDeliver_Commons { public static async Task Publish_Hello_World(IConnection connection, uint messageCount, byte[] body) { - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); using (IChannel channel = await connection.CreateChannelAsync()) { QueueDeclareOk queue = await channel.QueueDeclareAsync(); - int consumed = 0; - var consumer = new EventingBasicConsumer(channel); - consumer.Received += (s, args) => - { - if (Interlocked.Increment(ref consumed) == messageCount) - { - tcs.SetResult(true); - } - }; + var consumer = new CountingConsumer(messageCount); await channel.BasicConsumeAsync(queue.QueueName, true, consumer); for (int i = 0; i < messageCount; i++) @@ -31,9 +22,34 @@ public static async Task Publish_Hello_World(IConnection connection, uint messag await channel.BasicPublishAsync("", queue.QueueName, body); } - await tcs.Task; + await consumer.CompletedTask.ConfigureAwait(false); await channel.CloseAsync(); } } } + + internal sealed class CountingConsumer : AsyncDefaultBasicConsumer + { + private int _remainingCount; + private readonly TaskCompletionSource _tcs; + + public Task CompletedTask => _tcs.Task; + + public CountingConsumer(uint messageCount) + { + _remainingCount = (int)messageCount; + _tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + } + + /// + public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory body) + { + if (Interlocked.Decrement(ref _remainingCount) == 0) + { + _tcs.SetResult(true); + } + + return Task.CompletedTask; + } + } } diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt index 9c2e282e77..91b90ae610 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt @@ -60,13 +60,10 @@ override RabbitMQ.Client.AmqpTcpEndpoint.ToString() -> string override RabbitMQ.Client.AmqpTimestamp.Equals(object obj) -> bool override RabbitMQ.Client.AmqpTimestamp.GetHashCode() -> int override RabbitMQ.Client.AmqpTimestamp.ToString() -> string -override RabbitMQ.Client.Events.AsyncEventingBasicConsumer.HandleBasicCancelOk(string consumerTag) -> System.Threading.Tasks.Task -override RabbitMQ.Client.Events.AsyncEventingBasicConsumer.HandleBasicConsumeOk(string consumerTag) -> System.Threading.Tasks.Task -override RabbitMQ.Client.Events.AsyncEventingBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IReadOnlyBasicProperties properties, System.ReadOnlyMemory body) -> System.Threading.Tasks.Task -override RabbitMQ.Client.Events.AsyncEventingBasicConsumer.HandleChannelShutdown(object channel, RabbitMQ.Client.ShutdownEventArgs reason) -> System.Threading.Tasks.Task -override RabbitMQ.Client.Events.EventingBasicConsumer.HandleBasicCancelOk(string consumerTag) -> void -override RabbitMQ.Client.Events.EventingBasicConsumer.HandleBasicConsumeOk(string consumerTag) -> void -override RabbitMQ.Client.Events.EventingBasicConsumer.HandleChannelShutdown(object channel, RabbitMQ.Client.ShutdownEventArgs reason) -> void +override RabbitMQ.Client.Events.AsyncEventingBasicConsumer.OnCancel(params string[] consumerTags) -> System.Threading.Tasks.Task +override RabbitMQ.Client.Events.AsyncEventingBasicConsumer.HandleBasicConsumeOkAsync(string consumerTag) -> System.Threading.Tasks.Task +override RabbitMQ.Client.Events.AsyncEventingBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IReadOnlyBasicProperties properties, System.ReadOnlyMemory body) -> System.Threading.Tasks.Task +override RabbitMQ.Client.Events.AsyncEventingBasicConsumer.HandleChannelShutdownAsync(object channel, RabbitMQ.Client.ShutdownEventArgs reason) -> System.Threading.Tasks.Task override RabbitMQ.Client.Exceptions.MalformedFrameException.ReplyCode.get -> ushort override RabbitMQ.Client.Exceptions.SyntaxErrorException.ReplyCode.get -> ushort override RabbitMQ.Client.Exceptions.UnexpectedFrameException.ReplyCode.get -> ushort @@ -103,7 +100,6 @@ RabbitMQ.Client.AsyncDefaultBasicConsumer.AsyncDefaultBasicConsumer() -> void RabbitMQ.Client.AsyncDefaultBasicConsumer.AsyncDefaultBasicConsumer(RabbitMQ.Client.IChannel channel) -> void RabbitMQ.Client.AsyncDefaultBasicConsumer.Channel.get -> RabbitMQ.Client.IChannel RabbitMQ.Client.AsyncDefaultBasicConsumer.Channel.set -> void -RabbitMQ.Client.AsyncDefaultBasicConsumer.ConsumerCancelled -> RabbitMQ.Client.Events.AsyncEventHandler RabbitMQ.Client.AsyncDefaultBasicConsumer.ConsumerTags.get -> string[] RabbitMQ.Client.AsyncDefaultBasicConsumer.IsRunning.get -> bool RabbitMQ.Client.AsyncDefaultBasicConsumer.IsRunning.set -> void @@ -214,8 +210,6 @@ RabbitMQ.Client.ConnectionFactory.CredentialsProvider.get -> RabbitMQ.Client.ICr RabbitMQ.Client.ConnectionFactory.CredentialsProvider.set -> void RabbitMQ.Client.ConnectionFactory.CredentialsRefresher.get -> RabbitMQ.Client.ICredentialsRefresher RabbitMQ.Client.ConnectionFactory.CredentialsRefresher.set -> void -RabbitMQ.Client.ConnectionFactory.DispatchConsumersAsync.get -> bool -RabbitMQ.Client.ConnectionFactory.DispatchConsumersAsync.set -> void RabbitMQ.Client.ConnectionFactory.Endpoint.get -> RabbitMQ.Client.AmqpTcpEndpoint RabbitMQ.Client.ConnectionFactory.Endpoint.set -> void RabbitMQ.Client.ConnectionFactory.EndpointResolverFactory.get -> System.Func, RabbitMQ.Client.IEndpointResolver> @@ -262,17 +256,6 @@ RabbitMQ.Client.ConnectionFactoryBase RabbitMQ.Client.ConnectionFactoryBase.ConnectionFactoryBase() -> void RabbitMQ.Client.ConnectionFactoryBase.SocketFactory -> System.Func RabbitMQ.Client.Constants -RabbitMQ.Client.DefaultBasicConsumer -RabbitMQ.Client.DefaultBasicConsumer.Channel.get -> RabbitMQ.Client.IChannel -RabbitMQ.Client.DefaultBasicConsumer.Channel.set -> void -RabbitMQ.Client.DefaultBasicConsumer.ConsumerCancelled -> System.EventHandler -RabbitMQ.Client.DefaultBasicConsumer.ConsumerTags.get -> string[] -RabbitMQ.Client.DefaultBasicConsumer.DefaultBasicConsumer() -> void -RabbitMQ.Client.DefaultBasicConsumer.DefaultBasicConsumer(RabbitMQ.Client.IChannel channel) -> void -RabbitMQ.Client.DefaultBasicConsumer.IsRunning.get -> bool -RabbitMQ.Client.DefaultBasicConsumer.IsRunning.set -> void -RabbitMQ.Client.DefaultBasicConsumer.ShutdownReason.get -> RabbitMQ.Client.ShutdownEventArgs -RabbitMQ.Client.DefaultBasicConsumer.ShutdownReason.set -> void RabbitMQ.Client.DefaultEndpointResolver RabbitMQ.Client.DefaultEndpointResolver.All() -> System.Collections.Generic.IEnumerable RabbitMQ.Client.DefaultEndpointResolver.DefaultEndpointResolver(System.Collections.Generic.IEnumerable tcpEndpoints) -> void @@ -307,12 +290,6 @@ RabbitMQ.Client.Events.ConsumerEventArgs RabbitMQ.Client.Events.ConsumerEventArgs.ConsumerEventArgs(string[] consumerTags) -> void RabbitMQ.Client.Events.ConsumerTagChangedAfterRecoveryEventArgs RabbitMQ.Client.Events.ConsumerTagChangedAfterRecoveryEventArgs.ConsumerTagChangedAfterRecoveryEventArgs(string tagBefore, string tagAfter) -> void -RabbitMQ.Client.Events.EventingBasicConsumer -RabbitMQ.Client.Events.EventingBasicConsumer.EventingBasicConsumer(RabbitMQ.Client.IChannel channel) -> void -RabbitMQ.Client.Events.EventingBasicConsumer.Received -> System.EventHandler -RabbitMQ.Client.Events.EventingBasicConsumer.Registered -> System.EventHandler -RabbitMQ.Client.Events.EventingBasicConsumer.Shutdown -> System.EventHandler -RabbitMQ.Client.Events.EventingBasicConsumer.Unregistered -> System.EventHandler RabbitMQ.Client.Events.FlowControlEventArgs RabbitMQ.Client.Events.FlowControlEventArgs.FlowControlEventArgs(bool active) -> void RabbitMQ.Client.Events.QueueNameChangedAfterRecoveryEventArgs @@ -409,24 +386,16 @@ RabbitMQ.Client.IAmqpWriteable.GetRequiredBufferSize() -> int RabbitMQ.Client.IAmqpWriteable.WriteTo(System.Span span) -> int RabbitMQ.Client.IAsyncBasicConsumer RabbitMQ.Client.IAsyncBasicConsumer.Channel.get -> RabbitMQ.Client.IChannel -RabbitMQ.Client.IAsyncBasicConsumer.ConsumerCancelled -> RabbitMQ.Client.Events.AsyncEventHandler -RabbitMQ.Client.IAsyncBasicConsumer.HandleBasicCancel(string consumerTag) -> System.Threading.Tasks.Task -RabbitMQ.Client.IAsyncBasicConsumer.HandleBasicCancelOk(string consumerTag) -> System.Threading.Tasks.Task -RabbitMQ.Client.IAsyncBasicConsumer.HandleBasicConsumeOk(string consumerTag) -> System.Threading.Tasks.Task -RabbitMQ.Client.IAsyncBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IReadOnlyBasicProperties properties, System.ReadOnlyMemory body) -> System.Threading.Tasks.Task -RabbitMQ.Client.IAsyncBasicConsumer.HandleChannelShutdown(object channel, RabbitMQ.Client.ShutdownEventArgs reason) -> System.Threading.Tasks.Task +RabbitMQ.Client.IAsyncBasicConsumer.HandleBasicCancelAsync(string consumerTag) -> System.Threading.Tasks.Task +RabbitMQ.Client.IAsyncBasicConsumer.HandleBasicCancelOkAsync(string consumerTag) -> System.Threading.Tasks.Task +RabbitMQ.Client.IAsyncBasicConsumer.HandleBasicConsumeOkAsync(string consumerTag) -> System.Threading.Tasks.Task +RabbitMQ.Client.IAsyncBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IReadOnlyBasicProperties properties, System.ReadOnlyMemory body) -> System.Threading.Tasks.Task +RabbitMQ.Client.IAsyncBasicConsumer.HandleChannelShutdownAsync(object channel, RabbitMQ.Client.ShutdownEventArgs reason) -> System.Threading.Tasks.Task RabbitMQ.Client.IAuthMechanism RabbitMQ.Client.IAuthMechanism.handleChallenge(byte[] challenge, RabbitMQ.Client.ConnectionConfig config) -> byte[] RabbitMQ.Client.IAuthMechanismFactory RabbitMQ.Client.IAuthMechanismFactory.GetInstance() -> RabbitMQ.Client.IAuthMechanism RabbitMQ.Client.IAuthMechanismFactory.Name.get -> string -RabbitMQ.Client.IBasicConsumer -RabbitMQ.Client.IBasicConsumer.Channel.get -> RabbitMQ.Client.IChannel -RabbitMQ.Client.IBasicConsumer.ConsumerCancelled -> System.EventHandler -RabbitMQ.Client.IBasicConsumer.HandleBasicCancel(string consumerTag) -> void -RabbitMQ.Client.IBasicConsumer.HandleBasicCancelOk(string consumerTag) -> void -RabbitMQ.Client.IBasicConsumer.HandleBasicConsumeOk(string consumerTag) -> void -RabbitMQ.Client.IBasicConsumer.HandleChannelShutdown(object channel, RabbitMQ.Client.ShutdownEventArgs reason) -> void RabbitMQ.Client.IBasicProperties RabbitMQ.Client.IBasicProperties.AppId.get -> string RabbitMQ.Client.IBasicProperties.AppId.set -> void @@ -489,7 +458,7 @@ RabbitMQ.Client.IChannel.CloseReason.get -> RabbitMQ.Client.ShutdownEventArgs RabbitMQ.Client.IChannel.ContinuationTimeout.get -> System.TimeSpan RabbitMQ.Client.IChannel.ContinuationTimeout.set -> void RabbitMQ.Client.IChannel.CurrentQueue.get -> string -RabbitMQ.Client.IChannel.DefaultConsumer.get -> RabbitMQ.Client.IBasicConsumer +RabbitMQ.Client.IChannel.DefaultConsumer.get -> RabbitMQ.Client.IAsyncBasicConsumer RabbitMQ.Client.IChannel.DefaultConsumer.set -> void RabbitMQ.Client.IChannel.FlowControl -> System.EventHandler RabbitMQ.Client.IChannel.IsClosed.get -> bool @@ -532,8 +501,6 @@ RabbitMQ.Client.IConnectionFactory.CredentialsProvider.get -> RabbitMQ.Client.IC RabbitMQ.Client.IConnectionFactory.CredentialsProvider.set -> void RabbitMQ.Client.IConnectionFactory.CredentialsRefresher.get -> RabbitMQ.Client.ICredentialsRefresher RabbitMQ.Client.IConnectionFactory.CredentialsRefresher.set -> void -RabbitMQ.Client.IConnectionFactory.DispatchConsumersAsync.get -> bool -RabbitMQ.Client.IConnectionFactory.DispatchConsumersAsync.set -> void RabbitMQ.Client.IConnectionFactory.HandshakeContinuationTimeout.get -> System.TimeSpan RabbitMQ.Client.IConnectionFactory.HandshakeContinuationTimeout.set -> void RabbitMQ.Client.IConnectionFactory.Password.get -> string @@ -780,7 +747,6 @@ readonly RabbitMQ.Client.ConnectionConfig.ClientProperties -> System.Collections readonly RabbitMQ.Client.ConnectionConfig.ClientProvidedName -> string readonly RabbitMQ.Client.ConnectionConfig.ContinuationTimeout -> System.TimeSpan readonly RabbitMQ.Client.ConnectionConfig.DispatchConsumerConcurrency -> int -readonly RabbitMQ.Client.ConnectionConfig.DispatchConsumersAsync -> bool readonly RabbitMQ.Client.ConnectionConfig.HandshakeContinuationTimeout -> System.TimeSpan readonly RabbitMQ.Client.ConnectionConfig.HeartbeatInterval -> System.TimeSpan readonly RabbitMQ.Client.ConnectionConfig.MaxChannelCount -> ushort @@ -860,17 +826,12 @@ static readonly RabbitMQ.Client.ConnectionFactory.DefaultHeartbeat -> System.Tim static readonly RabbitMQ.Client.Protocols.AMQP_0_9_1 -> RabbitMQ.Client.IProtocol static readonly RabbitMQ.Client.Protocols.DefaultProtocol -> RabbitMQ.Client.IProtocol static readonly RabbitMQ.Client.PublicationAddress.PSEUDO_URI_PARSER -> System.Text.RegularExpressions.Regex -virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.HandleBasicCancel(string consumerTag) -> System.Threading.Tasks.Task -virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.HandleBasicCancelOk(string consumerTag) -> System.Threading.Tasks.Task -virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.HandleBasicConsumeOk(string consumerTag) -> System.Threading.Tasks.Task -virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IReadOnlyBasicProperties properties, System.ReadOnlyMemory body) -> System.Threading.Tasks.Task -virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.HandleChannelShutdown(object channel, RabbitMQ.Client.ShutdownEventArgs reason) -> System.Threading.Tasks.Task +virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.HandleBasicCancelAsync(string consumerTag) -> System.Threading.Tasks.Task +virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.HandleBasicCancelOkAsync(string consumerTag) -> System.Threading.Tasks.Task +virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.HandleBasicConsumeOkAsync(string consumerTag) -> System.Threading.Tasks.Task +virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IReadOnlyBasicProperties properties, System.ReadOnlyMemory body) -> System.Threading.Tasks.Task +virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.HandleChannelShutdownAsync(object channel, RabbitMQ.Client.ShutdownEventArgs reason) -> System.Threading.Tasks.Task virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.OnCancel(params string[] consumerTags) -> System.Threading.Tasks.Task -virtual RabbitMQ.Client.DefaultBasicConsumer.HandleBasicCancel(string consumerTag) -> void -virtual RabbitMQ.Client.DefaultBasicConsumer.HandleBasicCancelOk(string consumerTag) -> void -virtual RabbitMQ.Client.DefaultBasicConsumer.HandleBasicConsumeOk(string consumerTag) -> void -virtual RabbitMQ.Client.DefaultBasicConsumer.HandleChannelShutdown(object channel, RabbitMQ.Client.ShutdownEventArgs reason) -> void -virtual RabbitMQ.Client.DefaultBasicConsumer.OnCancel(params string[] consumerTags) -> void virtual RabbitMQ.Client.Exceptions.ProtocolException.ShutdownReason.get -> RabbitMQ.Client.ShutdownEventArgs virtual RabbitMQ.Client.TcpClientAdapter.Client.get -> System.Net.Sockets.Socket virtual RabbitMQ.Client.TcpClientAdapter.Close() -> void @@ -882,7 +843,6 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.get -> System.TimeSpan virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void ~const RabbitMQ.Client.RabbitMQActivitySource.PublisherSourceName = "RabbitMQ.Client.Publisher" -> string ~const RabbitMQ.Client.RabbitMQActivitySource.SubscriberSourceName = "RabbitMQ.Client.Subscriber" -> string -~override RabbitMQ.Client.Events.EventingBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IReadOnlyBasicProperties properties, System.ReadOnlyMemory body) -> System.Threading.Tasks.Task ~RabbitMQ.Client.AmqpTcpEndpoint.AmqpTcpEndpoint(string hostName, int portOrMinusOne, RabbitMQ.Client.SslOption ssl, uint maxInboundMessageBodySize) -> void ~RabbitMQ.Client.ConnectionFactory.CreateConnectionAsync(RabbitMQ.Client.IEndpointResolver endpointResolver, string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.ConnectionFactory.CreateConnectionAsync(string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task @@ -891,9 +851,8 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void ~RabbitMQ.Client.ConnectionFactory.CreateConnectionAsync(System.Collections.Generic.IEnumerable hostnames, string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.ConnectionFactory.CreateConnectionAsync(System.Collections.Generic.IEnumerable hostnames, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.ConnectionFactory.CreateConnectionAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task -~RabbitMQ.Client.IBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IReadOnlyBasicProperties properties, System.ReadOnlyMemory body) -> System.Threading.Tasks.Task ~RabbitMQ.Client.IChannel.BasicCancelAsync(string consumerTag, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task -~RabbitMQ.Client.IChannel.BasicConsumeAsync(string queue, bool autoAck, string consumerTag, bool noLocal, bool exclusive, System.Collections.Generic.IDictionary arguments, RabbitMQ.Client.IBasicConsumer consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task +~RabbitMQ.Client.IChannel.BasicConsumeAsync(string queue, bool autoAck, string consumerTag, bool noLocal, bool exclusive, System.Collections.Generic.IDictionary arguments, RabbitMQ.Client.IAsyncBasicConsumer consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.IChannel.BasicGetAsync(string queue, bool autoAck, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask ~RabbitMQ.Client.IChannel.BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.IChannel.BasicRejectAsync(ulong deliveryTag, bool requeue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task @@ -938,10 +897,10 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void ~RabbitMQ.Client.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync.get -> System.Func ~RabbitMQ.Client.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync.set -> void ~static RabbitMQ.Client.IChannelExtensions.AbortAsync(this RabbitMQ.Client.IChannel channel, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.IBasicConsumer consumer, string queue, bool autoAck = false, string consumerTag = "", bool noLocal = false, bool exclusive = false, System.Collections.Generic.IDictionary arguments = null) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, System.Collections.Generic.IDictionary arguments, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.Task +~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.IAsyncBasicConsumer consumer, string queue, bool autoAck = false, string consumerTag = "", bool noLocal = false, bool exclusive = false, System.Collections.Generic.IDictionary arguments = null) -> System.Threading.Tasks.Task +~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, RabbitMQ.Client.IAsyncBasicConsumer consumer) -> System.Threading.Tasks.Task +~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, RabbitMQ.Client.IAsyncBasicConsumer consumer) -> System.Threading.Tasks.Task +~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, System.Collections.Generic.IDictionary arguments, RabbitMQ.Client.IAsyncBasicConsumer consumer) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, ushort replyCode, string replyText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.IChannelExtensions.ExchangeDeclareAsync(this RabbitMQ.Client.IChannel channel, string exchange, string type, bool durable = false, bool autoDelete = false, System.Collections.Generic.IDictionary arguments = null, bool noWait = false) -> System.Threading.Tasks.Task @@ -960,5 +919,3 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void ~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.set -> void ~static RabbitMQ.Client.RabbitMQActivitySource.ContextInjector.get -> System.Action> ~static RabbitMQ.Client.RabbitMQActivitySource.ContextInjector.set -> void -~virtual RabbitMQ.Client.DefaultBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IReadOnlyBasicProperties properties, System.ReadOnlyMemory body) -> System.Threading.Tasks.Task - diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index 4cf06ab2d5..bbd247d625 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -1,2 +1 @@ RabbitMQ.Client.BasicProperties.BasicProperties(RabbitMQ.Client.IReadOnlyBasicProperties! input) -> void -RabbitMQ.Client.IConnection.DispatchConsumersAsyncEnabled.get -> bool diff --git a/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs b/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs index b4cc859762..b48ed713b6 100644 --- a/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs @@ -2,17 +2,15 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; -using RabbitMQ.Client.Events; -using RabbitMQ.Client.Impl; namespace RabbitMQ.Client { - public class AsyncDefaultBasicConsumer : IBasicConsumer, IAsyncBasicConsumer + public class AsyncDefaultBasicConsumer : IAsyncBasicConsumer { private readonly HashSet _consumerTags = new HashSet(); /// - /// Creates a new instance of an . + /// Creates a new instance of an . /// public AsyncDefaultBasicConsumer() { @@ -50,16 +48,6 @@ public string[] ConsumerTags /// public ShutdownEventArgs? ShutdownReason { get; protected set; } - /// - /// Signalled when the consumer gets cancelled. - /// - public event AsyncEventHandler ConsumerCancelled - { - add => _consumerCancelledWrapper.AddHandler(value); - remove => _consumerCancelledWrapper.RemoveHandler(value); - } - private AsyncEventingWrapper _consumerCancelledWrapper; - /// /// Retrieve the this consumer is associated with, /// for use in acknowledging received messages, for instance. @@ -69,10 +57,10 @@ public event AsyncEventHandler ConsumerCancelled /// /// Called when the consumer is cancelled for reasons other than by a basicCancel: /// e.g. the queue has been deleted (either by this channel or by any other channel). - /// See for notification of consumer cancellation due to basicCancel + /// See for notification of consumer cancellation due to basicCancel /// /// Consumer tag this consumer is registered. - public virtual Task HandleBasicCancel(string consumerTag) + public virtual Task HandleBasicCancelAsync(string consumerTag) { return OnCancel(consumerTag); } @@ -81,7 +69,7 @@ public virtual Task HandleBasicCancel(string consumerTag) /// Called upon successful deregistration of the consumer from the broker. /// /// Consumer tag this consumer is registered. - public virtual Task HandleBasicCancelOk(string consumerTag) + public virtual Task HandleBasicCancelOkAsync(string consumerTag) { return OnCancel(consumerTag); } @@ -90,7 +78,7 @@ public virtual Task HandleBasicCancelOk(string consumerTag) /// Called upon successful registration of the consumer with the broker. /// /// Consumer tag this consumer is registered. - public virtual Task HandleBasicConsumeOk(string consumerTag) + public virtual Task HandleBasicConsumeOkAsync(string consumerTag) { _consumerTags.Add(consumerTag); IsRunning = true; @@ -107,7 +95,7 @@ public virtual Task HandleBasicConsumeOk(string consumerTag) /// Accessing the body at a later point is unsafe as its memory can /// be already released. /// - public virtual Task HandleBasicDeliver(string consumerTag, + public virtual Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, @@ -124,7 +112,7 @@ public virtual Task HandleBasicDeliver(string consumerTag, /// /// A channel this consumer was registered on. /// Shutdown context. - public virtual Task HandleChannelShutdown(object channel, ShutdownEventArgs reason) + public virtual Task HandleChannelShutdownAsync(object channel, ShutdownEventArgs reason) { ShutdownReason = reason; return OnCancel(_consumerTags.ToArray()); @@ -136,51 +124,15 @@ public virtual Task HandleChannelShutdown(object channel, ShutdownEventArgs reas /// /// This default implementation simply sets the property to false, and takes no further action. /// - public virtual async Task OnCancel(params string[] consumerTags) + public virtual Task OnCancel(params string[] consumerTags) { IsRunning = false; - if (!_consumerCancelledWrapper.IsEmpty) - { - await _consumerCancelledWrapper.InvokeAsync(this, new ConsumerEventArgs(consumerTags)) - .ConfigureAwait(false); - } foreach (string consumerTag in consumerTags) { _consumerTags.Remove(consumerTag); } - } - - event EventHandler IBasicConsumer.ConsumerCancelled - { - add { throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'."); } - remove { throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'."); } - } - void IBasicConsumer.HandleBasicCancelOk(string consumerTag) - { - throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'."); - } - - void IBasicConsumer.HandleBasicConsumeOk(string consumerTag) - { - throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'."); - } - - Task IBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, string routingKey, - IReadOnlyBasicProperties properties, ReadOnlyMemory body) - { - throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'."); - } - - void IBasicConsumer.HandleChannelShutdown(object channel, ShutdownEventArgs reason) - { - throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'."); - } - - void IBasicConsumer.HandleBasicCancel(string consumerTag) - { - throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'."); + return Task.CompletedTask; } } } diff --git a/projects/RabbitMQ.Client/client/api/ConnectionConfig.cs b/projects/RabbitMQ.Client/client/api/ConnectionConfig.cs index fcc225ea61..0dce4e8dd3 100644 --- a/projects/RabbitMQ.Client/client/api/ConnectionConfig.cs +++ b/projects/RabbitMQ.Client/client/api/ConnectionConfig.cs @@ -136,12 +136,7 @@ public sealed class ConnectionConfig public readonly TimeSpan RequestedConnectionTimeout; /// - /// Set to true will enable an asynchronous consumer dispatcher which is compatible with . - /// - public readonly bool DispatchConsumersAsync; - - /// - /// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one + /// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one /// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading. /// can handle concurrency much more efficiently due to the non-blocking nature of the consumer. /// @@ -156,8 +151,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password, ushort maxChannelCount, uint maxFrameSize, uint maxInboundMessageBodySize, bool topologyRecoveryEnabled, TopologyRecoveryFilter topologyRecoveryFilter, TopologyRecoveryExceptionHandler topologyRecoveryExceptionHandler, TimeSpan networkRecoveryInterval, TimeSpan heartbeatInterval, TimeSpan continuationTimeout, TimeSpan handshakeContinuationTimeout, TimeSpan requestedConnectionTimeout, - bool dispatchConsumersAsync, int dispatchConsumerConcurrency, - Func> frameHandlerFactoryAsync) + int dispatchConsumerConcurrency, Func> frameHandlerFactoryAsync) { VirtualHost = virtualHost; UserName = userName; @@ -178,7 +172,6 @@ internal ConnectionConfig(string virtualHost, string userName, string password, ContinuationTimeout = continuationTimeout; HandshakeContinuationTimeout = handshakeContinuationTimeout; RequestedConnectionTimeout = requestedConnectionTimeout; - DispatchConsumersAsync = dispatchConsumersAsync; DispatchConsumerConcurrency = dispatchConsumerConcurrency; FrameHandlerFactoryAsync = frameHandlerFactoryAsync; } diff --git a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs index 5ac3f46981..4858395357 100644 --- a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs +++ b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs @@ -170,13 +170,7 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor public bool AutomaticRecoveryEnabled { get; set; } = true; /// - /// Set to true will enable an asynchronous consumer dispatcher which is compatible with . - /// Defaults to false. - /// - public bool DispatchConsumersAsync { get; set; } = false; - - /// - /// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one + /// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one /// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading. /// can handle concurrency much more efficiently due to the non-blocking nature of the consumer. /// Defaults to 1. @@ -612,7 +606,6 @@ private ConnectionConfig CreateConfig(string? clientProvidedName) ContinuationTimeout, HandshakeContinuationTimeout, RequestedConnectionTimeout, - DispatchConsumersAsync, ConsumerDispatchConcurrency, CreateFrameHandlerAsync); } diff --git a/projects/RabbitMQ.Client/client/api/DefaultBasicConsumer.cs b/projects/RabbitMQ.Client/client/api/DefaultBasicConsumer.cs deleted file mode 100644 index 7ad66cd2f1..0000000000 --- a/projects/RabbitMQ.Client/client/api/DefaultBasicConsumer.cs +++ /dev/null @@ -1,195 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. -//--------------------------------------------------------------------------- - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using RabbitMQ.Client.Events; -using RabbitMQ.Client.Impl; - -namespace RabbitMQ.Client -{ - /// - /// Useful default/base implementation of . - /// Subclass and override in application code. - /// - /// - /// Note that the "Handle*" methods run in the connection's thread! - /// Consider using , which exposes - /// events that can be subscribed to consumer messages. - /// - public class DefaultBasicConsumer : IBasicConsumer - { - private readonly HashSet _consumerTags = new HashSet(); - - /// - /// Creates a new instance of an . - /// - public DefaultBasicConsumer() - { - } - - /// - /// Constructor which sets the Channel property to the given value. - /// - /// Common AMQP channel. - public DefaultBasicConsumer(IChannel channel) - { - Channel = channel; - } - - /// - /// Retrieve the consumer tags this consumer is registered as; to be used to identify - /// this consumer, for example, when cancelling it with . - /// This value is an array because a single consumer instance can be reused to consume on - /// multiple channels. - /// - public string[] ConsumerTags - { - get - { - return _consumerTags.ToArray(); - } - } - - /// - /// Returns true while the consumer is registered and expecting deliveries from the broker. - /// - public bool IsRunning { get; protected set; } - - /// - /// If our shuts down, this property will contain a description of the reason for the - /// shutdown. Otherwise it will contain null. See . - /// - public ShutdownEventArgs? ShutdownReason { get; protected set; } - - /// - /// Signalled when the consumer gets cancelled. - /// - public event EventHandler ConsumerCancelled - { - add => _consumerCancelledWrapper.AddHandler(value); - remove => _consumerCancelledWrapper.RemoveHandler(value); - } - private EventingWrapper _consumerCancelledWrapper; - - /// - /// Retrieve the this consumer is associated with, - /// for use in acknowledging received messages, for instance. - /// - public IChannel? Channel { get; set; } - - /// - /// Called when the consumer is cancelled for reasons other than by a basicCancel: - /// e.g. the queue has been deleted (either by this channel or by any other channel). - /// See for notification of consumer cancellation due to basicCancel - /// - /// Consumer tag this consumer is registered. - public virtual void HandleBasicCancel(string consumerTag) - { - OnCancel(consumerTag); - } - - /// - /// Called upon successful deregistration of the consumer from the broker. - /// - /// Consumer tag this consumer is registered. - public virtual void HandleBasicCancelOk(string consumerTag) - { - OnCancel(consumerTag); - } - - /// - /// Called upon successful registration of the consumer with the broker. - /// - /// Consumer tag this consumer is registered. - public virtual void HandleBasicConsumeOk(string consumerTag) - { - _consumerTags.Add(consumerTag); - IsRunning = true; - } - - /// - /// Called each time a message is delivered for this consumer. - /// - /// - /// This is a no-op implementation. It will not acknowledge deliveries via - /// if consuming in automatic acknowledgement mode. - /// Subclasses must copy or fully use delivery body before returning. - /// Accessing the body at a later point is unsafe as its memory can - /// be already released. - /// - public virtual Task HandleBasicDeliverAsync(string consumerTag, - ulong deliveryTag, - bool redelivered, - string exchange, - string routingKey, - IReadOnlyBasicProperties properties, - ReadOnlyMemory body) - { - // Nothing to do here. - return Task.CompletedTask; - } - - /// - /// Called when the channel (channel) this consumer was registered on terminates. - /// - /// A channel this consumer was registered on. - /// Shutdown context. - public virtual void HandleChannelShutdown(object channel, ShutdownEventArgs reason) - { - ShutdownReason = reason; - OnCancel(_consumerTags.ToArray()); - } - - /// - /// Default implementation - overridable in subclasses. - /// The set of consumer tags that where cancelled - /// - /// This default implementation simply sets the - /// property to false, and takes no further action. - /// - public virtual void OnCancel(params string[] consumerTags) - { - IsRunning = false; - if (!_consumerCancelledWrapper.IsEmpty) - { - _consumerCancelledWrapper.Invoke(this, new ConsumerEventArgs(consumerTags)); - } - - foreach (string consumerTag in consumerTags) - { - _consumerTags.Remove(consumerTag); - } - } - } -} diff --git a/projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs b/projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs index b96fbe2e81..cd29c48302 100644 --- a/projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs @@ -1,42 +1,38 @@ using System; using System.Threading.Tasks; -using RabbitMQ.Client.Events; - namespace RabbitMQ.Client { + /// + /// Consumer interface. Used to receive messages from a queue by subscription. + /// public interface IAsyncBasicConsumer { /// /// Retrieve the this consumer is associated with, - /// for use in acknowledging received messages, for instance. + /// for use in acknowledging received messages, for instance. /// IChannel? Channel { get; } /// - /// Signalled when the consumer gets cancelled. - /// - event AsyncEventHandler ConsumerCancelled; - - /// - /// Called when the consumer is cancelled for reasons other than by a basicCancel: - /// e.g. the queue has been deleted (either by this channel or by any other channel). - /// See for notification of consumer cancellation due to basicCancel + /// Called when the consumer is cancelled for reasons other than by a basicCancel: + /// e.g. the queue has been deleted (either by this channel or by any other channel). + /// See for notification of consumer cancellation due to basicCancel /// /// Consumer tag this consumer is registered. - Task HandleBasicCancel(string consumerTag); + Task HandleBasicCancelAsync(string consumerTag); /// /// Called upon successful deregistration of the consumer from the broker. /// /// Consumer tag this consumer is registered. - Task HandleBasicCancelOk(string consumerTag); + Task HandleBasicCancelOkAsync(string consumerTag); /// /// Called upon successful registration of the consumer with the broker. /// /// Consumer tag this consumer is registered. - Task HandleBasicConsumeOk(string consumerTag); + Task HandleBasicConsumeOkAsync(string consumerTag); /// /// Called each time a message arrives for this consumer. @@ -46,7 +42,7 @@ public interface IAsyncBasicConsumer /// Note that in particular, some delivered messages may require acknowledgement via . /// The implementation of this method in this class does NOT acknowledge such messages. /// - Task HandleBasicDeliver(string consumerTag, + Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, @@ -55,10 +51,10 @@ Task HandleBasicDeliver(string consumerTag, ReadOnlyMemory body); /// - /// Called when the channel shuts down. - /// - /// Common AMQP channel. - /// Information about the reason why a particular channel, session, or connection was destroyed. - Task HandleChannelShutdown(object channel, ShutdownEventArgs reason); + /// Called when the channel shuts down. + /// + /// Common AMQP channel. + /// Information about the reason why a particular channel, session, or connection was destroyed. + Task HandleChannelShutdownAsync(object channel, ShutdownEventArgs reason); } } diff --git a/projects/RabbitMQ.Client/client/api/IBasicConsumer.cs b/projects/RabbitMQ.Client/client/api/IBasicConsumer.cs deleted file mode 100644 index 82b802e1b7..0000000000 --- a/projects/RabbitMQ.Client/client/api/IBasicConsumer.cs +++ /dev/null @@ -1,107 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. -//--------------------------------------------------------------------------- - -using System; -using System.Threading.Tasks; -using RabbitMQ.Client.Events; - -namespace RabbitMQ.Client -{ - /// Consumer interface. Used to - ///receive messages from a queue by subscription. - /// - /// - /// See IChannel.BasicConsume, IChannel.BasicCancel. - /// - /// - /// Note that the "Handle*" methods run in the connection's - /// thread! Consider using , which uses a - /// SharedQueue instance to safely pass received messages across - /// to user threads. - /// - /// - public interface IBasicConsumer - { - /// - /// Retrieve the this consumer is associated with, - /// for use in acknowledging received messages, for instance. - /// - IChannel? Channel { get; } - - /// - /// Signalled when the consumer gets cancelled. - /// - event EventHandler ConsumerCancelled; - - /// - /// Called when the consumer is cancelled for reasons other than by a basicCancel: - /// e.g. the queue has been deleted (either by this channel or by any other channel). - /// See for notification of consumer cancellation due to basicCancel - /// - /// Consumer tag this consumer is registered. - void HandleBasicCancel(string consumerTag); - - /// - /// Called upon successful deregistration of the consumer from the broker. - /// - /// Consumer tag this consumer is registered. - void HandleBasicCancelOk(string consumerTag); - - /// - /// Called upon successful registration of the consumer with the broker. - /// - /// Consumer tag this consumer is registered. - void HandleBasicConsumeOk(string consumerTag); - - /// - /// Called each time a message arrives for this consumer. - /// - /// - /// Does nothing with the passed in information. - /// Note that in particular, some delivered messages may require acknowledgement via . - /// The implementation of this method in this class does NOT acknowledge such messages. - /// - Task HandleBasicDeliverAsync(string consumerTag, - ulong deliveryTag, - bool redelivered, - string exchange, - string routingKey, - IReadOnlyBasicProperties properties, - ReadOnlyMemory body); - - /// - /// Called when the channel shuts down. - /// - /// Common AMQP channel. - /// Information about the reason why a particular channel, session, or connection was destroyed. - void HandleChannelShutdown(object channel, ShutdownEventArgs reason); - } -} diff --git a/projects/RabbitMQ.Client/client/api/IChannel.cs b/projects/RabbitMQ.Client/client/api/IChannel.cs index dc171db3f8..434c0c60c7 100644 --- a/projects/RabbitMQ.Client/client/api/IChannel.cs +++ b/projects/RabbitMQ.Client/client/api/IChannel.cs @@ -82,7 +82,7 @@ public interface IChannel : IDisposable /// /// Most people will not need to use this. /// - IBasicConsumer? DefaultConsumer { get; set; } + IAsyncBasicConsumer? DefaultConsumer { get; set; } /// /// Returns true if the channel is no longer in a state where it can be used. @@ -127,7 +127,7 @@ public interface IChannel : IDisposable /// Signalled when an exception occurs in a callback invoked by the channel. /// /// Examples of cases where this event will be signalled - /// include exceptions thrown in methods, or + /// include exceptions thrown in methods, or /// exceptions thrown in delegates etc. /// event EventHandler CallbackException; @@ -174,11 +174,11 @@ Task BasicCancelAsync(string consumerTag, bool noWait = false, /// If set to true, this consumer will not receive messages published by the same connection. /// If set to true, the consumer is exclusive. /// Consumer arguments. - /// The consumer, an instance of + /// The consumer, an instance of /// Cancellation token for this operation. /// Task BasicConsumeAsync(string queue, bool autoAck, string consumerTag, bool noLocal, bool exclusive, - IDictionary? arguments, IBasicConsumer consumer, + IDictionary? arguments, IAsyncBasicConsumer consumer, CancellationToken cancellationToken = default); /// diff --git a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs index c35a3f4562..f47b7e6467 100644 --- a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs +++ b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs @@ -42,7 +42,7 @@ public static class IChannelExtensions { /// Asynchronously start a Basic content-class consumer. public static Task BasicConsumeAsync(this IChannel channel, - IBasicConsumer consumer, + IAsyncBasicConsumer consumer, string queue, bool autoAck = false, string consumerTag = "", @@ -56,7 +56,7 @@ public static Task BasicConsumeAsync(this IChannel channel, /// Asynchronously start a Basic content-class consumer. public static Task BasicConsumeAsync(this IChannel channel, string queue, bool autoAck, - IBasicConsumer consumer) + IAsyncBasicConsumer consumer) { return channel.BasicConsumeAsync(queue, autoAck, string.Empty, false, false, null, consumer); } @@ -65,7 +65,7 @@ public static Task BasicConsumeAsync(this IChannel channel, string queue public static Task BasicConsumeAsync(this IChannel channel, string queue, bool autoAck, string consumerTag, - IBasicConsumer consumer) + IAsyncBasicConsumer consumer) { return channel.BasicConsumeAsync(queue, autoAck, consumerTag, false, false, null, consumer); } @@ -75,7 +75,7 @@ public static Task BasicConsumeAsync(this IChannel channel, string queue bool autoAck, string consumerTag, IDictionary? arguments, - IBasicConsumer consumer) + IAsyncBasicConsumer consumer) { return channel.BasicConsumeAsync(queue, autoAck, consumerTag, false, false, arguments, consumer); } diff --git a/projects/RabbitMQ.Client/client/api/IConnection.cs b/projects/RabbitMQ.Client/client/api/IConnection.cs index 5f3510ee77..ba2632ddae 100644 --- a/projects/RabbitMQ.Client/client/api/IConnection.cs +++ b/projects/RabbitMQ.Client/client/api/IConnection.cs @@ -126,11 +126,6 @@ public interface IConnection : INetworkConnection, IDisposable /// IEnumerable ShutdownReport { get; } - /// - /// Returns true if the connection is set to use asynchronous consumer dispatchers. - /// - public bool DispatchConsumersAsyncEnabled { get; } - /// /// Application-specific connection name, will be displayed in the management UI /// if RabbitMQ server supports it. This value doesn't have to be unique and cannot diff --git a/projects/RabbitMQ.Client/client/api/IConnectionFactory.cs b/projects/RabbitMQ.Client/client/api/IConnectionFactory.cs index 1059cde1c5..28d471c9e2 100644 --- a/projects/RabbitMQ.Client/client/api/IConnectionFactory.cs +++ b/projects/RabbitMQ.Client/client/api/IConnectionFactory.cs @@ -75,7 +75,7 @@ public interface IConnectionFactory string VirtualHost { get; set; } /// - /// Credentials provider. It is optional. When set, username and password + /// Credentials provider. It is optional. When set, username and password /// are obtained thru this provider. /// ICredentialsProvider? CredentialsProvider { get; set; } @@ -190,13 +190,7 @@ Task CreateConnectionAsync(IEnumerable endpoints, TimeSpan ContinuationTimeout { get; set; } /// - /// Gets or sets a value indicating whether an asynchronous consumer dispatcher which is compatible with is used. - /// - /// if an asynchronous consumer dispatcher which is compatible with is used; otherwise, . - bool DispatchConsumersAsync { get; set; } - - /// - /// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one + /// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one /// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading. /// can handle concurrency much more efficiently due to the non-blocking nature of the consumer. /// Defaults to 1. diff --git a/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs b/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs index 6b20297998..9b88c51f9b 100644 --- a/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs @@ -51,21 +51,21 @@ public event AsyncEventHandler Unregistered private AsyncEventingWrapper _unregisteredWrapper; ///Fires when the server confirms successful consumer cancellation. - public override async Task HandleBasicCancelOk(string consumerTag) + public override async Task OnCancel(params string[] consumerTags) { - await base.HandleBasicCancelOk(consumerTag) + await base.OnCancel(consumerTags) .ConfigureAwait(false); if (!_unregisteredWrapper.IsEmpty) { - await _unregisteredWrapper.InvokeAsync(this, new ConsumerEventArgs(new[] { consumerTag })) + await _unregisteredWrapper.InvokeAsync(this, new ConsumerEventArgs(consumerTags)) .ConfigureAwait(false); } } ///Fires when the server confirms successful consumer registration. - public override async Task HandleBasicConsumeOk(string consumerTag) + public override async Task HandleBasicConsumeOkAsync(string consumerTag) { - await base.HandleBasicConsumeOk(consumerTag) + await base.HandleBasicConsumeOkAsync(consumerTag) .ConfigureAwait(false); if (!_registeredWrapper.IsEmpty) { @@ -75,7 +75,7 @@ await base.HandleBasicConsumeOk(consumerTag) } ///Fires the Received event. - public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, + public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory body) { var deliverEventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); @@ -85,9 +85,9 @@ public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, b } ///Fires the Shutdown event. - public override async Task HandleChannelShutdown(object channel, ShutdownEventArgs reason) + public override async Task HandleChannelShutdownAsync(object channel, ShutdownEventArgs reason) { - await base.HandleChannelShutdown(channel, reason) + await base.HandleChannelShutdownAsync(channel, reason) .ConfigureAwait(false); if (!_shutdownWrapper.IsEmpty) { diff --git a/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs b/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs deleted file mode 100644 index 5fe1307e2d..0000000000 --- a/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs +++ /dev/null @@ -1,104 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. -//--------------------------------------------------------------------------- - -using System; -using System.Threading.Tasks; - -namespace RabbitMQ.Client.Events -{ - ///Class exposing an IBasicConsumer's - ///methods as separate events. - public class EventingBasicConsumer : DefaultBasicConsumer - { - ///Constructor which sets the Channel property to the - ///given value. - public EventingBasicConsumer(IChannel channel) : base(channel) - { - } - - /// - /// Event fired when a delivery arrives for the consumer. - /// - /// - /// Handlers must copy or fully use delivery body before returning. - /// Accessing the body at a later point is unsafe as its memory can - /// be already released. - /// - public event EventHandler? Received; - - ///Fires when the server confirms successful consumer registration. - public event EventHandler? Registered; - - ///Fires on channel (channel) shutdown, both client and server initiated. - public event EventHandler? Shutdown; - - ///Fires when the server confirms successful consumer cancellation. - public event EventHandler? Unregistered; - - ///Fires when the server confirms successful consumer cancellation. - public override void HandleBasicCancelOk(string consumerTag) - { - base.HandleBasicCancelOk(consumerTag); - Unregistered?.Invoke(this, new ConsumerEventArgs(new[] { consumerTag })); - } - - ///Fires when the server confirms successful consumer cancellation. - public override void HandleBasicConsumeOk(string consumerTag) - { - base.HandleBasicConsumeOk(consumerTag); - Registered?.Invoke(this, new ConsumerEventArgs(new[] { consumerTag })); - } - - /// - /// Invoked when a delivery arrives for the consumer. - /// - /// - /// Handlers must copy or fully use delivery body before returning. - /// Accessing the body at a later point is unsafe as its memory can - /// be already released. - /// - public override async Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, - IReadOnlyBasicProperties properties, ReadOnlyMemory body) - { - var eventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); - await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body) - .ConfigureAwait(false); - Received?.Invoke(this, eventArgs); - } - - ///Fires the Shutdown event. - public override void HandleChannelShutdown(object channel, ShutdownEventArgs reason) - { - base.HandleChannelShutdown(channel, reason); - Shutdown?.Invoke(this, reason); - } - } -} diff --git a/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs b/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs index 28a4bb04f7..3731e5ab97 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs @@ -224,10 +224,10 @@ await _consumerDispatcher.HandleBasicCancelOkAsync(_consumerTag, CancellationTok internal sealed class BasicConsumeAsyncRpcContinuation : AsyncRpcContinuation { - private readonly IBasicConsumer _consumer; + private readonly IAsyncBasicConsumer _consumer; private readonly IConsumerDispatcher _consumerDispatcher; - public BasicConsumeAsyncRpcContinuation(IBasicConsumer consumer, IConsumerDispatcher consumerDispatcher, + public BasicConsumeAsyncRpcContinuation(IAsyncBasicConsumer consumer, IConsumerDispatcher consumerDispatcher, TimeSpan continuationTimeout, CancellationToken cancellationToken) : base(continuationTimeout, cancellationToken) { diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index a822da6351..87a68e18e1 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -130,7 +130,7 @@ public IEnumerable ConsumerTags public ShutdownEventArgs? CloseReason => InnerChannel.CloseReason; - public IBasicConsumer? DefaultConsumer + public IAsyncBasicConsumer? DefaultConsumer { get => InnerChannel.DefaultConsumer; set => InnerChannel.DefaultConsumer = value; @@ -272,7 +272,7 @@ await _innerChannel.BasicCancelAsync(consumerTag, noWait, cancellationToken) } public async Task BasicConsumeAsync(string queue, bool autoAck, string consumerTag, bool noLocal, bool exclusive, - IDictionary? arguments, IBasicConsumer consumer, + IDictionary? arguments, IAsyncBasicConsumer consumer, CancellationToken cancellationToken) { string resultConsumerTag = await InnerChannel.BasicConsumeAsync(queue, autoAck, consumerTag, noLocal, diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs index 0ae34e39af..19391278ce 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs @@ -173,8 +173,6 @@ public event EventHandler RecoveringConsumer public IProtocol Protocol => Endpoint.Protocol; - public bool DispatchConsumersAsyncEnabled => _config.DispatchConsumersAsync; - public async ValueTask CreateNonRecoveringChannelAsync(CancellationToken cancellationToken) { ISession session = InnerConnection.CreateSession(); diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index f761409d4d..f1b75c1919 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -73,16 +73,7 @@ internal abstract class ChannelBase : IChannel, IRecoverable protected ChannelBase(ConnectionConfig config, ISession session) { ContinuationTimeout = config.ContinuationTimeout; - - if (config.DispatchConsumersAsync) - { - ConsumerDispatcher = new AsyncConsumerDispatcher(this, config.DispatchConsumerConcurrency); - } - else - { - ConsumerDispatcher = new ConsumerDispatcher(this, config.DispatchConsumerConcurrency); - } - + ConsumerDispatcher = new AsyncConsumerDispatcher(this, config.DispatchConsumerConcurrency); Action onException = (exception, context) => OnCallbackException(CallbackExceptionEventArgs.Build(exception, context)); _basicAcksWrapper = new EventingWrapper("OnBasicAck", onException); @@ -174,7 +165,7 @@ internal void RunRecoveryEventHandlers(object sender) public int ChannelNumber => ((Session)Session).ChannelNumber; - public IBasicConsumer? DefaultConsumer + public IAsyncBasicConsumer? DefaultConsumer { get => ConsumerDispatcher.DefaultConsumer; set => ConsumerDispatcher.DefaultConsumer = value; @@ -890,25 +881,9 @@ await ModelSendAsync(method, k.CancellationToken) } public async Task BasicConsumeAsync(string queue, bool autoAck, string consumerTag, bool noLocal, bool exclusive, - IDictionary? arguments, IBasicConsumer consumer, + IDictionary? arguments, IAsyncBasicConsumer consumer, CancellationToken cancellationToken) { - if (ConsumerDispatcher is AsyncConsumerDispatcher) - { - if (false == (consumer is IAsyncBasicConsumer)) - { - throw new InvalidOperationException("When using an AsyncConsumerDispatcher, the consumer must implement IAsyncBasicConsumer"); - } - } - - if (ConsumerDispatcher is ConsumerDispatcher) - { - if (consumer is IAsyncBasicConsumer) - { - throw new InvalidOperationException("When using an ConsumerDispatcher, the consumer must not implement IAsyncBasicConsumer"); - } - } - // NOTE: // Maybe don't dispose this instance because the CancellationToken must remain // valid for processing the response. diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index e50a85cb52..ad0fce16ad 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -100,8 +100,6 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler) public int LocalPort => _frameHandler.LocalPort; public int RemotePort => _frameHandler.RemotePort; - public bool DispatchConsumersAsyncEnabled => _config.DispatchConsumersAsync; - public IDictionary? ServerProperties { get; private set; } public IEnumerable ShutdownReport => _shutdownReport; diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs index 8bd9384f7d..890b548a77 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs @@ -31,26 +31,26 @@ protected override async Task ProcessChannelAsync() using (Activity? activity = RabbitMQActivitySource.Deliver(work.RoutingKey!, work.Exchange!, work.DeliveryTag, work.BasicProperties!, work.Body.Size)) { - await work.AsyncConsumer.HandleBasicDeliver( + await work.Consumer.HandleBasicDeliverAsync( work.ConsumerTag!, work.DeliveryTag, work.Redelivered, work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory) .ConfigureAwait(false); } break; case WorkType.Cancel: - await work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag!) + await work.Consumer.HandleBasicCancelAsync(work.ConsumerTag!) .ConfigureAwait(false); break; case WorkType.CancelOk: - await work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag!) + await work.Consumer.HandleBasicCancelOkAsync(work.ConsumerTag!) .ConfigureAwait(false); break; case WorkType.ConsumeOk: - await work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag!) + await work.Consumer.HandleBasicConsumeOkAsync(work.ConsumerTag!) .ConfigureAwait(false); break; case WorkType.Shutdown: - await work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason!) + await work.Consumer.HandleChannelShutdownAsync(_channel, work.Reason!) .ConfigureAwait(false); break; } diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs deleted file mode 100644 index 6247a59b43..0000000000 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs +++ /dev/null @@ -1,73 +0,0 @@ -using System; -using System.Diagnostics; -using System.Threading.Tasks; -using RabbitMQ.Client.Events; -using RabbitMQ.Client.Impl; - -namespace RabbitMQ.Client.ConsumerDispatching -{ - internal sealed class ConsumerDispatcher : ConsumerDispatcherChannelBase - { - internal ConsumerDispatcher(ChannelBase channel, int concurrency) - : base(channel, concurrency) - { - } - - protected override async Task ProcessChannelAsync() - { - try - { - while (await _reader.WaitToReadAsync().ConfigureAwait(false)) - { - while (_reader.TryRead(out WorkStruct work)) - { - using (work) - { - try - { - IBasicConsumer consumer = work.Consumer; - string consumerTag = work.ConsumerTag!; - switch (work.WorkType) - { - case WorkType.Deliver: - using (Activity? activity = RabbitMQActivitySource.Deliver(work.RoutingKey!, work.Exchange!, - work.DeliveryTag, work.BasicProperties!, work.Body.Size)) - { - await consumer.HandleBasicDeliverAsync( - consumerTag, work.DeliveryTag, work.Redelivered, - work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory) - .ConfigureAwait(false); - } - break; - case WorkType.Cancel: - consumer.HandleBasicCancel(consumerTag); - break; - case WorkType.CancelOk: - consumer.HandleBasicCancelOk(consumerTag); - break; - case WorkType.ConsumeOk: - consumer.HandleBasicConsumeOk(consumerTag); - break; - case WorkType.Shutdown: - consumer.HandleChannelShutdown(_channel, work.Reason!); - break; - } - } - catch (Exception e) - { - _channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer)); - } - } - } - } - } - catch (OperationCanceledException) - { - if (false == _reader.Completion.IsCompleted) - { - throw; - } - } - } - } -} diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs index c5d455a914..88d873df15 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs @@ -8,27 +8,27 @@ namespace RabbitMQ.Client.ConsumerDispatching internal abstract class ConsumerDispatcherBase { private static readonly FallbackConsumer s_fallbackConsumer = new FallbackConsumer(); - private readonly ConcurrentDictionary _consumers = new ConcurrentDictionary(); + private readonly ConcurrentDictionary _consumers = new ConcurrentDictionary(); - public IBasicConsumer? DefaultConsumer { get; set; } + public IAsyncBasicConsumer? DefaultConsumer { get; set; } protected ConsumerDispatcherBase() { } - protected void AddConsumer(IBasicConsumer consumer, string tag) + protected void AddConsumer(IAsyncBasicConsumer consumer, string tag) { _consumers[tag] = consumer; } - protected IBasicConsumer GetConsumerOrDefault(string tag) + protected IAsyncBasicConsumer GetConsumerOrDefault(string tag) { - return _consumers.TryGetValue(tag, out IBasicConsumer? consumer) ? consumer : GetDefaultOrFallbackConsumer(); + return _consumers.TryGetValue(tag, out IAsyncBasicConsumer? consumer) ? consumer : GetDefaultOrFallbackConsumer(); } - public IBasicConsumer GetAndRemoveConsumer(string tag) + public IAsyncBasicConsumer GetAndRemoveConsumer(string tag) { - return _consumers.Remove(tag, out IBasicConsumer? consumer) ? consumer : GetDefaultOrFallbackConsumer(); + return _consumers.Remove(tag, out IAsyncBasicConsumer? consumer) ? consumer : GetDefaultOrFallbackConsumer(); } public void Shutdown(ShutdownEventArgs reason) @@ -45,14 +45,14 @@ public Task ShutdownAsync(ShutdownEventArgs reason) private void DoShutdownConsumers(ShutdownEventArgs reason) { - foreach (KeyValuePair pair in _consumers.ToArray()) + foreach (KeyValuePair pair in _consumers.ToArray()) { ShutdownConsumer(pair.Value, reason); } _consumers.Clear(); } - protected abstract void ShutdownConsumer(IBasicConsumer consumer, ShutdownEventArgs reason); + protected abstract void ShutdownConsumer(IAsyncBasicConsumer consumer, ShutdownEventArgs reason); protected abstract void InternalShutdown(); @@ -60,7 +60,7 @@ private void DoShutdownConsumers(ShutdownEventArgs reason) // Do not inline as it's not the default case on a hot path [MethodImpl(MethodImplOptions.NoInlining)] - private IBasicConsumer GetDefaultOrFallbackConsumer() + private IAsyncBasicConsumer GetDefaultOrFallbackConsumer() { return DefaultConsumer ?? s_fallbackConsumer; } diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs index 23e7c4b1d3..217f80d823 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs @@ -53,7 +53,7 @@ public bool IsShutdown } } - public ValueTask HandleBasicConsumeOkAsync(IBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken) + public ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken) { if (false == _disposed && false == _quiesce) { @@ -73,7 +73,7 @@ public ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, { if (false == _disposed && false == _quiesce) { - IBasicConsumer consumer = GetConsumerOrDefault(consumerTag); + IAsyncBasicConsumer consumer = GetConsumerOrDefault(consumerTag); var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body); return _writer.WriteAsync(work, cancellationToken); } @@ -87,7 +87,7 @@ public ValueTask HandleBasicCancelOkAsync(string consumerTag, CancellationToken { if (false == _disposed && false == _quiesce) { - IBasicConsumer consumer = GetAndRemoveConsumer(consumerTag); + IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag); WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag); return _writer.WriteAsync(work, cancellationToken); } @@ -101,7 +101,7 @@ public ValueTask HandleBasicCancelAsync(string consumerTag, CancellationToken ca { if (false == _disposed && false == _quiesce) { - IBasicConsumer consumer = GetAndRemoveConsumer(consumerTag); + IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag); WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag); return _writer.WriteAsync(work, cancellationToken); } @@ -206,7 +206,7 @@ await _worker } } - protected sealed override void ShutdownConsumer(IBasicConsumer consumer, ShutdownEventArgs reason) + protected sealed override void ShutdownConsumer(IAsyncBasicConsumer consumer, ShutdownEventArgs reason) { _writer.TryWrite(WorkStruct.CreateShutdown(consumer, reason)); } @@ -226,8 +226,7 @@ protected override Task InternalShutdownAsync() protected readonly struct WorkStruct : IDisposable { - public readonly IBasicConsumer Consumer; - public IAsyncBasicConsumer AsyncConsumer => (IAsyncBasicConsumer)Consumer; + public readonly IAsyncBasicConsumer Consumer; public readonly string? ConsumerTag; public readonly ulong DeliveryTag; public readonly bool Redelivered; @@ -238,7 +237,7 @@ protected override Task InternalShutdownAsync() public readonly ShutdownEventArgs? Reason; public readonly WorkType WorkType; - private WorkStruct(WorkType type, IBasicConsumer consumer, string consumerTag) + private WorkStruct(WorkType type, IAsyncBasicConsumer consumer, string consumerTag) : this() { WorkType = type; @@ -246,7 +245,7 @@ private WorkStruct(WorkType type, IBasicConsumer consumer, string consumerTag) ConsumerTag = consumerTag; } - private WorkStruct(IBasicConsumer consumer, ShutdownEventArgs reason) + private WorkStruct(IAsyncBasicConsumer consumer, ShutdownEventArgs reason) : this() { WorkType = WorkType.Shutdown; @@ -254,7 +253,7 @@ private WorkStruct(IBasicConsumer consumer, ShutdownEventArgs reason) Reason = reason; } - private WorkStruct(IBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered, + private WorkStruct(IAsyncBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body) { WorkType = WorkType.Deliver; @@ -269,27 +268,27 @@ private WorkStruct(IBasicConsumer consumer, string consumerTag, ulong deliveryTa Reason = default; } - public static WorkStruct CreateCancel(IBasicConsumer consumer, string consumerTag) + public static WorkStruct CreateCancel(IAsyncBasicConsumer consumer, string consumerTag) { return new WorkStruct(WorkType.Cancel, consumer, consumerTag); } - public static WorkStruct CreateCancelOk(IBasicConsumer consumer, string consumerTag) + public static WorkStruct CreateCancelOk(IAsyncBasicConsumer consumer, string consumerTag) { return new WorkStruct(WorkType.CancelOk, consumer, consumerTag); } - public static WorkStruct CreateConsumeOk(IBasicConsumer consumer, string consumerTag) + public static WorkStruct CreateConsumeOk(IAsyncBasicConsumer consumer, string consumerTag) { return new WorkStruct(WorkType.ConsumeOk, consumer, consumerTag); } - public static WorkStruct CreateShutdown(IBasicConsumer consumer, ShutdownEventArgs reason) + public static WorkStruct CreateShutdown(IAsyncBasicConsumer consumer, ShutdownEventArgs reason) { return new WorkStruct(consumer, reason); } - public static WorkStruct CreateDeliver(IBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered, + public static WorkStruct CreateDeliver(IAsyncBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body) { return new WorkStruct(consumer, consumerTag, deliveryTag, redelivered, diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/FallbackConsumer.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/FallbackConsumer.cs index 17eb121cd7..ab5f7c203c 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/FallbackConsumer.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/FallbackConsumer.cs @@ -1,80 +1,41 @@ using System; using System.Threading.Tasks; -using RabbitMQ.Client.Events; using RabbitMQ.Client.Logging; namespace RabbitMQ.Client.ConsumerDispatching { - internal sealed class FallbackConsumer : IBasicConsumer, IAsyncBasicConsumer + internal sealed class FallbackConsumer : IAsyncBasicConsumer { public IChannel? Channel { get; } = null; - event AsyncEventHandler IAsyncBasicConsumer.ConsumerCancelled + Task IAsyncBasicConsumer.HandleBasicCancelAsync(string consumerTag) { - add { } - remove { } - } - - event EventHandler IBasicConsumer.ConsumerCancelled - { - add { } - remove { } - } - - void IBasicConsumer.HandleBasicCancel(string consumerTag) - { - ESLog.Info($"Unhandled {nameof(IBasicConsumer.HandleBasicCancel)} for tag {consumerTag}"); - } - - void IBasicConsumer.HandleBasicCancelOk(string consumerTag) - { - ESLog.Info($"Unhandled {nameof(IBasicConsumer.HandleBasicCancelOk)} for tag {consumerTag}"); - } - - void IBasicConsumer.HandleBasicConsumeOk(string consumerTag) - { - ESLog.Info($"Unhandled {nameof(IBasicConsumer.HandleBasicConsumeOk)} for tag {consumerTag}"); - } - - Task IBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, - IReadOnlyBasicProperties properties, ReadOnlyMemory body) - { - ESLog.Info($"Unhandled {nameof(IBasicConsumer.HandleBasicDeliverAsync)} for tag {consumerTag}"); - return Task.CompletedTask; - } - - void IBasicConsumer.HandleChannelShutdown(object channel, ShutdownEventArgs reason) - { - ESLog.Info($"Unhandled {nameof(IBasicConsumer.HandleChannelShutdown)}"); - } - - Task IAsyncBasicConsumer.HandleBasicCancel(string consumerTag) - { - ((IBasicConsumer)this).HandleBasicCancel(consumerTag); + ESLog.Info($"Unhandled {nameof(IAsyncBasicConsumer.HandleBasicCancelAsync)} for tag {consumerTag}"); return Task.CompletedTask; } - Task IAsyncBasicConsumer.HandleBasicCancelOk(string consumerTag) + Task IAsyncBasicConsumer.HandleBasicCancelOkAsync(string consumerTag) { - ((IBasicConsumer)this).HandleBasicCancelOk(consumerTag); + ESLog.Info($"Unhandled {nameof(IAsyncBasicConsumer.HandleBasicCancelOkAsync)} for tag {consumerTag}"); return Task.CompletedTask; } - Task IAsyncBasicConsumer.HandleBasicConsumeOk(string consumerTag) + Task IAsyncBasicConsumer.HandleBasicConsumeOkAsync(string consumerTag) { - ((IBasicConsumer)this).HandleBasicConsumeOk(consumerTag); + ESLog.Info($"Unhandled {nameof(IAsyncBasicConsumer.HandleBasicConsumeOkAsync)} for tag {consumerTag}"); return Task.CompletedTask; } - Task IAsyncBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, + Task IAsyncBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory body) { - return ((IBasicConsumer)this).HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); + ESLog.Info($"Unhandled {nameof(IAsyncBasicConsumer.HandleBasicDeliverAsync)} for tag {consumerTag}"); + return Task.CompletedTask; } - Task IAsyncBasicConsumer.HandleChannelShutdown(object channel, ShutdownEventArgs reason) + Task IAsyncBasicConsumer.HandleChannelShutdownAsync(object channel, ShutdownEventArgs reason) { - ((IBasicConsumer)this).HandleChannelShutdown(channel, reason); + ESLog.Info($"Unhandled {nameof(IAsyncBasicConsumer.HandleChannelShutdownAsync)}"); return Task.CompletedTask; } } diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs index cd87b5d5a6..686036b9c1 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs @@ -37,13 +37,13 @@ namespace RabbitMQ.Client.ConsumerDispatching { internal interface IConsumerDispatcher : IDisposable { - IBasicConsumer? DefaultConsumer { get; set; } + IAsyncBasicConsumer? DefaultConsumer { get; set; } bool IsShutdown { get; } - IBasicConsumer GetAndRemoveConsumer(string tag); + IAsyncBasicConsumer GetAndRemoveConsumer(string tag); - ValueTask HandleBasicConsumeOkAsync(IBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken); + ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken); ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, diff --git a/projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs b/projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs index 272b47ad16..bc672034ec 100644 --- a/projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs +++ b/projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs @@ -38,14 +38,14 @@ namespace RabbitMQ.Client.Impl internal readonly struct RecordedConsumer : IRecordedConsumer { private readonly AutorecoveringChannel _channel; - private readonly IBasicConsumer _consumer; + private readonly IAsyncBasicConsumer _consumer; private readonly string _queue; private readonly bool _autoAck; private readonly string _consumerTag; private readonly bool _exclusive; private readonly IDictionary? _arguments; - public RecordedConsumer(AutorecoveringChannel channel, IBasicConsumer consumer, string consumerTag, string queue, bool autoAck, bool exclusive, IDictionary? arguments) + public RecordedConsumer(AutorecoveringChannel channel, IAsyncBasicConsumer consumer, string consumerTag, string queue, bool autoAck, bool exclusive, IDictionary? arguments) { if (channel == null) { @@ -95,7 +95,7 @@ public RecordedConsumer(AutorecoveringChannel channel, IBasicConsumer consumer, } public AutorecoveringChannel Channel => _channel; - public IBasicConsumer Consumer => _consumer; + public IAsyncBasicConsumer Consumer => _consumer; public string Queue => _queue; public bool AutoAck => _autoAck; public string ConsumerTag => _consumerTag; diff --git a/projects/Test/Applications/CreateChannel/Program.cs b/projects/Test/Applications/CreateChannel/Program.cs index d3f5d66ce6..f93ae61bf8 100644 --- a/projects/Test/Applications/CreateChannel/Program.cs +++ b/projects/Test/Applications/CreateChannel/Program.cs @@ -19,7 +19,7 @@ public static async Task Main() { doneEvent = new AutoResetEvent(false); - var connectionFactory = new ConnectionFactory { DispatchConsumersAsync = true }; + var connectionFactory = new ConnectionFactory { }; IConnection connection = await connectionFactory.CreateConnectionAsync(); var watch = Stopwatch.StartNew(); diff --git a/projects/Test/Applications/MassPublish/Program.cs b/projects/Test/Applications/MassPublish/Program.cs index 57bcdd9b72..b0785c0fac 100644 --- a/projects/Test/Applications/MassPublish/Program.cs +++ b/projects/Test/Applications/MassPublish/Program.cs @@ -50,7 +50,6 @@ static class Program { HostName = RmqHost, ClientProvidedName = AppId + "-CONSUME", - DispatchConsumersAsync = true }; static readonly Random s_random; diff --git a/projects/Test/Common/IntegrationFixture.cs b/projects/Test/Common/IntegrationFixture.cs index 7a92378ce6..54dcf2df0b 100644 --- a/projects/Test/Common/IntegrationFixture.cs +++ b/projects/Test/Common/IntegrationFixture.cs @@ -71,7 +71,6 @@ public abstract class IntegrationFixture : IAsyncLifetime protected readonly ITestOutputHelper _output; protected readonly string _testDisplayName; - protected readonly bool _dispatchConsumersAsync = false; protected readonly ushort _consumerDispatchConcurrency = 1; protected readonly bool _openChannel = true; @@ -110,11 +109,9 @@ static IntegrationFixture() } public IntegrationFixture(ITestOutputHelper output, - bool dispatchConsumersAsync = false, ushort consumerDispatchConcurrency = 1, bool openChannel = true) { - _dispatchConsumersAsync = dispatchConsumersAsync; _consumerDispatchConcurrency = consumerDispatchConcurrency; _openChannel = openChannel; _output = output; @@ -140,14 +137,13 @@ public virtual async Task InitializeAsync() { /* * https://github.com/rabbitmq/rabbitmq-dotnet-client/commit/120f9bfce627f704956e1008d095b853b459d45b#r135400345 - * + * * Integration tests must use CreateConnectionFactory so that ClientProvidedName is set for the connection. * Tests that close connections via `rabbitmqctl` depend on finding the connection PID via its name. */ if (_connFactory == null) { _connFactory = CreateConnectionFactory(); - _connFactory.DispatchConsumersAsync = _dispatchConsumersAsync; _connFactory.ConsumerDispatchConcurrency = _consumerDispatchConcurrency; } diff --git a/projects/Test/Common/TestConnectionRecoveryBase.cs b/projects/Test/Common/TestConnectionRecoveryBase.cs index 527af9b9c5..19e403a49a 100644 --- a/projects/Test/Common/TestConnectionRecoveryBase.cs +++ b/projects/Test/Common/TestConnectionRecoveryBase.cs @@ -46,8 +46,8 @@ public class TestConnectionRecoveryBase : IntegrationFixture protected const ushort TotalMessageCount = 16384; protected const ushort CloseAtCount = 16; - public TestConnectionRecoveryBase(ITestOutputHelper output, bool dispatchConsumersAsync = false) - : base(output, dispatchConsumersAsync: dispatchConsumersAsync) + public TestConnectionRecoveryBase(ITestOutputHelper output) + : base(output) { _messageBody = GetRandomBody(4096); } @@ -302,7 +302,7 @@ public override Task PostHandleDeliveryAsync(ulong deliveryTag) } } - public class TestBasicConsumer : DefaultBasicConsumer + public class TestBasicConsumer : AsyncDefaultBasicConsumer { protected readonly TaskCompletionSource _allMessagesSeenTcs; protected readonly ushort _totalMessageCount; diff --git a/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoveringConsumerEventHandlers.cs b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoveringConsumerEventHandlers.cs index d7db66feed..889c26b45c 100644 --- a/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoveringConsumerEventHandlers.cs +++ b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoveringConsumerEventHandlers.cs @@ -52,7 +52,7 @@ public TestRecoveringConsumerEventHandlers(ITestOutputHelper output) : base(outp public async Task TestRecoveringConsumerEventHandlers_Called(int iterations) { RabbitMQ.Client.QueueDeclareOk q = await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false); - var cons = new EventingBasicConsumer(_channel); + var cons = new AsyncEventingBasicConsumer(_channel); await _channel.BasicConsumeAsync(q, true, cons); int counter = 0; @@ -78,7 +78,7 @@ public async Task TestRecoveringConsumerEventHandler_EventArgumentsArePassedDown }; RabbitMQ.Client.QueueDeclareOk q = await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false); - var cons = new EventingBasicConsumer(_channel); + var cons = new AsyncEventingBasicConsumer(_channel); string expectedCTag = await _channel.BasicConsumeAsync(cons, q, arguments: arguments); bool ctagMatches = false; diff --git a/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs b/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs index 65884f6c80..d22f998291 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs @@ -43,7 +43,7 @@ namespace Test.Integration.ConnectionRecovery public class TestConnectionRecovery : TestConnectionRecoveryBase { public TestConnectionRecovery(ITestOutputHelper output) - : base(output, dispatchConsumersAsync: true) + : base(output) { } diff --git a/projects/Test/Integration/ConnectionRecovery/TestConsumerRecovery.cs b/projects/Test/Integration/ConnectionRecovery/TestConsumerRecovery.cs index aedde71fd1..da4c465561 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestConsumerRecovery.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestConsumerRecovery.cs @@ -41,7 +41,7 @@ namespace Test.Integration.ConnectionRecovery public class TestConsumerRecovery : TestConnectionRecoveryBase { public TestConsumerRecovery(ITestOutputHelper output) - : base(output, dispatchConsumersAsync: true) + : base(output) { } diff --git a/projects/Test/Integration/ConnectionRecovery/TestDeclaration.cs b/projects/Test/Integration/ConnectionRecovery/TestDeclaration.cs index 89f83a2761..138f7faf08 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestDeclaration.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestDeclaration.cs @@ -119,7 +119,7 @@ public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer() { string q = Guid.NewGuid().ToString(); await _channel.QueueDeclareAsync(q, false, false, true); - var dummy = new EventingBasicConsumer(_channel); + var dummy = new AsyncEventingBasicConsumer(_channel); string tag = await _channel.BasicConsumeAsync(q, true, dummy); await _channel.BasicCancelAsync(tag); } diff --git a/projects/Test/Integration/ConnectionRecovery/TestQueueRecoveryWithArguments.cs b/projects/Test/Integration/ConnectionRecovery/TestQueueRecoveryWithArguments.cs index 6dcfd25962..f16aefca20 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestQueueRecoveryWithArguments.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestQueueRecoveryWithArguments.cs @@ -77,7 +77,7 @@ await _channel.ExchangeDeclareAsync(exchange: tdiRetryExchangeName, await _channel.QueueBindAsync(testQueueName, tdiRetryExchangeName, testQueueName); - var consumerAsync = new EventingBasicConsumer(_channel); + var consumerAsync = new AsyncEventingBasicConsumer(_channel); await _channel.BasicConsumeAsync(queue: testQueueName, autoAck: false, consumer: consumerAsync); await CloseAndWaitForRecoveryAsync(); diff --git a/projects/Test/Integration/ConnectionRecovery/TestRpcAfterRecovery.cs b/projects/Test/Integration/ConnectionRecovery/TestRpcAfterRecovery.cs index 647932c4fd..61708203e4 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestRpcAfterRecovery.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestRpcAfterRecovery.cs @@ -42,7 +42,7 @@ namespace Test.Integration.ConnectionRecovery public class TestRpcAfterRecovery : TestConnectionRecoveryBase { public TestRpcAfterRecovery(ITestOutputHelper output) - : base(output, dispatchConsumersAsync: true) + : base(output) { } @@ -86,11 +86,11 @@ public async Task TestPublishRpcRightAfterReconnect() /* * Note: * 406 is received, when the reply consumer isn't yet recovered. - * + * * Note that this test _used_ to do an immediate assertion, but it would * fail sometimes. Re-tries were added with a time limit to work around * this. - * + * * Assert.NotEqual(406, a.ShutdownReason.ReplyCode); */ if (a.ShutdownReason.ReplyCode == 406) diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index 1febb2cb22..73762e47bd 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -45,15 +45,13 @@ public class TestAsyncConsumer : IntegrationFixture private readonly ShutdownEventArgs _closeArgs = new ShutdownEventArgs(ShutdownInitiator.Application, Constants.ReplySuccess, "normal shutdown"); public TestAsyncConsumer(ITestOutputHelper output) - : base(output, dispatchConsumersAsync: true, consumerDispatchConcurrency: 2) + : base(output, consumerDispatchConcurrency: 2) { } [Fact] public async Task TestBasicRoundtripConcurrent() { - Assert.True(_conn.DispatchConsumersAsyncEnabled); - AddCallbackExceptionHandlers(); _channel.DefaultConsumer = new DefaultAsyncConsumer("_channel,", _output); @@ -101,12 +99,12 @@ public async Task TestBasicRoundtripConcurrent() consumer.Received += (o, a) => { - if (ByteArraysEqual(a.Body.ToArray(), body1)) + if (ByteArraysEqual(a.Body.Span, body1)) { body1Received = true; publish1SyncSource.TrySetResult(true); } - else if (ByteArraysEqual(a.Body.ToArray(), body2)) + else if (ByteArraysEqual(a.Body.Span, body2)) { body2Received = true; publish2SyncSource.TrySetResult(true); @@ -147,8 +145,6 @@ public async Task TestBasicRoundtripConcurrent() [Fact] public async Task TestBasicRoundtripConcurrentManyMessages() { - Assert.True(_conn.DispatchConsumersAsyncEnabled); - AddCallbackExceptionHandlers(); _channel.DefaultConsumer = new DefaultAsyncConsumer("_channel,", _output); @@ -324,8 +320,6 @@ public async Task TestBasicRoundtripConcurrentManyMessages() [Fact] public async Task TestBasicRejectAsync() { - Assert.True(_conn.DispatchConsumersAsyncEnabled); - string queueName = GenerateQueueName(); var publishSyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -366,7 +360,7 @@ public async Task TestBasicRejectAsync() * AI.TestAsyncConsumer.TestBasicRejectAsync channel 1 shut down: * AMQP close-reason, initiated by Peer, code=406, text= * 'PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms ...', classId=0, methodId=0 - * + * * Added Task.Yield() to see if it ever happens again. */ await Task.Yield(); @@ -491,8 +485,6 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false, [Fact] public async Task TestBasicNackAsync() { - Assert.True(_conn.DispatchConsumersAsyncEnabled); - var publishSyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _conn.ConnectionShutdown += (o, ea) => @@ -563,31 +555,9 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false, await _channel.CloseAsync(_closeArgs, false, CancellationToken.None); } - [Fact] - public async Task NonAsyncConsumerShouldThrowInvalidOperationException() - { - Assert.True(_conn.DispatchConsumersAsyncEnabled); - - bool sawException = false; - QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, false); - await _channel.BasicPublishAsync(string.Empty, q.QueueName, GetRandomBody(1024)); - var consumer = new EventingBasicConsumer(_channel); - try - { - string consumerTag = await _channel.BasicConsumeAsync(q.QueueName, false, string.Empty, false, false, null, consumer); - } - catch (InvalidOperationException) - { - sawException = true; - } - Assert.True(sawException, "did not see expected InvalidOperationException"); - } - [Fact] public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer() { - Assert.True(_conn.DispatchConsumersAsyncEnabled); - AssertRecordedQueues((RabbitMQ.Client.Framing.Impl.AutorecoveringConnection)_conn, 0); var tasks = new List(); for (int i = 0; i < 256; i++) @@ -608,8 +578,6 @@ public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer() [Fact] public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650() { - Assert.True(_conn.DispatchConsumersAsyncEnabled); - string exchangeName = GenerateExchangeName(); string queue1Name = GenerateQueueName(); string queue2Name = GenerateQueueName(); @@ -677,8 +645,6 @@ public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650() [Fact] public async Task TestCloseWithinEventHandler_GH1567() { - Assert.True(_conn.DispatchConsumersAsyncEnabled); - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); QueueDeclareOk q = await _channel.QueueDeclareAsync(); @@ -759,35 +725,35 @@ public DefaultAsyncConsumer(string logPrefix, ITestOutputHelper output) _output = output; } - public override Task HandleBasicCancel(string consumerTag) + public override Task HandleBasicCancelAsync(string consumerTag) { - _output.WriteLine("[ERROR] {0} HandleBasicCancel {1}", _logPrefix, consumerTag); - return base.HandleBasicCancel(consumerTag); + _output.WriteLine("[ERROR] {0} HandleBasicCancelAsync {1}", _logPrefix, consumerTag); + return base.HandleBasicCancelAsync(consumerTag); } - public override Task HandleBasicCancelOk(string consumerTag) + public override Task HandleBasicCancelOkAsync(string consumerTag) { - _output.WriteLine("[ERROR] {0} HandleBasicCancelOk {1}", _logPrefix, consumerTag); - return base.HandleBasicCancelOk(consumerTag); + _output.WriteLine("[ERROR] {0} HandleBasicCancelOkAsync {1}", _logPrefix, consumerTag); + return base.HandleBasicCancelOkAsync(consumerTag); } - public override Task HandleBasicConsumeOk(string consumerTag) + public override Task HandleBasicConsumeOkAsync(string consumerTag) { - _output.WriteLine("[ERROR] {0} HandleBasicConsumeOk {1}", _logPrefix, consumerTag); - return base.HandleBasicConsumeOk(consumerTag); + _output.WriteLine("[ERROR] {0} HandleBasicConsumeOkAsync {1}", _logPrefix, consumerTag); + return base.HandleBasicConsumeOkAsync(consumerTag); } - public override async Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, + public override async Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory body) { - _output.WriteLine("[ERROR] {0} HandleBasicDeliver {1}", _logPrefix, consumerTag); - await base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); + _output.WriteLine("[ERROR] {0} HandleBasicDeliverAsync {1}", _logPrefix, consumerTag); + await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); } - public override Task HandleChannelShutdown(object channel, ShutdownEventArgs reason) + public override Task HandleChannelShutdownAsync(object channel, ShutdownEventArgs reason) { - _output.WriteLine("[ERROR] {0} HandleChannelShutdown", _logPrefix); - return base.HandleChannelShutdown(channel, reason); + _output.WriteLine("[ERROR] {0} HandleChannelShutdownAsync", _logPrefix); + return base.HandleChannelShutdownAsync(channel, reason); } public override Task OnCancel(params string[] consumerTags) diff --git a/projects/Test/Integration/TestConsumerCancelNotify.cs b/projects/Test/Integration/TestAsyncConsumerCancelNotify.cs similarity index 81% rename from projects/Test/Integration/TestConsumerCancelNotify.cs rename to projects/Test/Integration/TestAsyncConsumerCancelNotify.cs index 1491039877..c13afaa36c 100644 --- a/projects/Test/Integration/TestConsumerCancelNotify.cs +++ b/projects/Test/Integration/TestAsyncConsumerCancelNotify.cs @@ -38,12 +38,12 @@ namespace Test.Integration { - public class TestConsumerCancelNotify : IntegrationFixture + public class TestAsyncConsumerCancelNotify : IntegrationFixture { private readonly TaskCompletionSource _tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); private string _consumerTag; - public TestConsumerCancelNotify(ITestOutputHelper output) : base(output) + public TestAsyncConsumerCancelNotify(ITestOutputHelper output) : base(output) { } @@ -68,15 +68,16 @@ public async Task TestCorrectConsumerTag() await _channel.QueueDeclareAsync(q1, false, false, false); await _channel.QueueDeclareAsync(q2, false, false, false); - EventingBasicConsumer consumer = new EventingBasicConsumer(_channel); + AsyncEventingBasicConsumer consumer = new AsyncEventingBasicConsumer(_channel); string consumerTag1 = await _channel.BasicConsumeAsync(q1, true, consumer); string consumerTag2 = await _channel.BasicConsumeAsync(q2, true, consumer); string notifiedConsumerTag = null; - consumer.ConsumerCancelled += (sender, args) => + consumer.Unregistered += (sender, args) => { notifiedConsumerTag = args.ConsumerTags.First(); _tcs.TrySetResult(true); + return Task.CompletedTask; }; await _channel.QueueDeleteAsync(q1); @@ -89,7 +90,7 @@ public async Task TestCorrectConsumerTag() private async Task TestConsumerCancelAsync(string queue, bool eventMode) { await _channel.QueueDeclareAsync(queue, false, true, false); - IBasicConsumer consumer = new CancelNotificationConsumer(_channel, this, eventMode); + IAsyncBasicConsumer consumer = new CancelNotificationConsumer(_channel, this, eventMode); string actualConsumerTag = await _channel.BasicConsumeAsync(queue, false, consumer); await _channel.QueueDeleteAsync(queue); @@ -97,23 +98,23 @@ private async Task TestConsumerCancelAsync(string queue, bool eventMode) Assert.Equal(actualConsumerTag, _consumerTag); } - private class CancelNotificationConsumer : DefaultBasicConsumer + private class CancelNotificationConsumer : AsyncEventingBasicConsumer { - private readonly TestConsumerCancelNotify _testClass; + private readonly TestAsyncConsumerCancelNotify _testClass; private readonly bool _eventMode; - public CancelNotificationConsumer(IChannel channel, TestConsumerCancelNotify tc, bool eventMode) + public CancelNotificationConsumer(IChannel channel, TestAsyncConsumerCancelNotify tc, bool eventMode) : base(channel) { _testClass = tc; _eventMode = eventMode; if (eventMode) { - ConsumerCancelled += Cancelled; + Unregistered += CancelledAsync; } } - public override void HandleBasicCancel(string consumerTag) + public override Task HandleBasicCancelAsync(string consumerTag) { if (!_eventMode) { @@ -121,13 +122,14 @@ public override void HandleBasicCancel(string consumerTag) _testClass._tcs.SetResult(true); } - base.HandleBasicCancel(consumerTag); + return base.HandleBasicCancelAsync(consumerTag); } - private void Cancelled(object sender, ConsumerEventArgs arg) + private Task CancelledAsync(object sender, ConsumerEventArgs arg) { _testClass._consumerTag = arg.ConsumerTags[0]; _testClass._tcs.SetResult(true); + return Task.CompletedTask; } } } diff --git a/projects/Test/Integration/TestConsumerCount.cs b/projects/Test/Integration/TestAsyncConsumerCount.cs similarity index 92% rename from projects/Test/Integration/TestConsumerCount.cs rename to projects/Test/Integration/TestAsyncConsumerCount.cs index 076f9c282d..c83e916fe3 100644 --- a/projects/Test/Integration/TestConsumerCount.cs +++ b/projects/Test/Integration/TestAsyncConsumerCount.cs @@ -37,9 +37,9 @@ namespace Test.Integration { - public class TestConsumerCount : IntegrationFixture + public class TestAsyncConsumerCount : IntegrationFixture { - public TestConsumerCount(ITestOutputHelper output) : base(output) + public TestAsyncConsumerCount(ITestOutputHelper output) : base(output) { } @@ -50,7 +50,7 @@ public async Task TestConsumerCountMethod() await _channel.QueueDeclareAsync(queue: q, durable: false, exclusive: true, autoDelete: false, arguments: null); Assert.Equal(0u, await _channel.ConsumerCountAsync(q)); - string tag = await _channel.BasicConsumeAsync(q, true, new EventingBasicConsumer(_channel)); + string tag = await _channel.BasicConsumeAsync(q, true, new AsyncEventingBasicConsumer(_channel)); Assert.Equal(1u, await _channel.ConsumerCountAsync(q)); await _channel.BasicCancelAsync(tag); diff --git a/projects/Test/Integration/TestAsyncConsumerExceptions.cs b/projects/Test/Integration/TestAsyncConsumerExceptions.cs index 5852ad7f6b..7cec3a6cd3 100644 --- a/projects/Test/Integration/TestAsyncConsumerExceptions.cs +++ b/projects/Test/Integration/TestAsyncConsumerExceptions.cs @@ -45,7 +45,6 @@ public class TestAsyncConsumerExceptions : IntegrationFixture public TestAsyncConsumerExceptions(ITestOutputHelper output) : base(output, - dispatchConsumersAsync: true, consumerDispatchConcurrency: 1) { } @@ -61,41 +60,41 @@ protected override void DisposeAssertions() [Fact] public Task TestCancelNotificationExceptionHandling() { - IBasicConsumer consumer = new ConsumerFailingOnCancel(_channel); + IAsyncBasicConsumer consumer = new ConsumerFailingOnCancel(_channel); return TestExceptionHandlingWith(consumer, (ch, q, c, ct) => ch.QueueDeleteAsync(q, false, false)); } [Fact] public Task TestConsumerCancelOkExceptionHandling() { - IBasicConsumer consumer = new ConsumerFailingOnCancelOk(_channel); + IAsyncBasicConsumer consumer = new ConsumerFailingOnCancelOk(_channel); return TestExceptionHandlingWith(consumer, (ch, q, c, ct) => ch.BasicCancelAsync(ct)); } [Fact] public Task TestConsumerConsumeOkExceptionHandling() { - IBasicConsumer consumer = new ConsumerFailingOnConsumeOk(_channel); + IAsyncBasicConsumer consumer = new ConsumerFailingOnConsumeOk(_channel); return TestExceptionHandlingWith(consumer, async (ch, q, c, ct) => await Task.Yield()); } [Fact] public Task TestConsumerShutdownExceptionHandling() { - IBasicConsumer consumer = new ConsumerFailingOnShutdown(_channel); + IAsyncBasicConsumer consumer = new ConsumerFailingOnShutdown(_channel); return TestExceptionHandlingWith(consumer, (ch, q, c, ct) => ch.CloseAsync()); } [Fact] public Task TestDeliveryExceptionHandling() { - IBasicConsumer consumer = new ConsumerFailingOnDelivery(_channel); + IAsyncBasicConsumer consumer = new ConsumerFailingOnDelivery(_channel); return TestExceptionHandlingWith(consumer, (ch, q, c, ct) => ch.BasicPublishAsync("", q, _encoding.GetBytes("msg")).AsTask()); } - protected async Task TestExceptionHandlingWith(IBasicConsumer consumer, - Func action) + protected async Task TestExceptionHandlingWith(IAsyncBasicConsumer consumer, + Func action) { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var cts = new CancellationTokenSource(ShortSpan); @@ -129,7 +128,7 @@ public ConsumerFailingOnDelivery(IChannel channel) : base(channel) { } - public override Task HandleBasicDeliver(string consumerTag, + public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, @@ -147,7 +146,7 @@ public ConsumerFailingOnCancel(IChannel channel) : base(channel) { } - public override Task HandleBasicCancel(string consumerTag) + public override Task HandleBasicCancelAsync(string consumerTag) { return Task.FromException(TestException); } @@ -159,7 +158,7 @@ public ConsumerFailingOnShutdown(IChannel channel) : base(channel) { } - public override Task HandleChannelShutdown(object channel, ShutdownEventArgs reason) + public override Task HandleChannelShutdownAsync(object channel, ShutdownEventArgs reason) { return Task.FromException(TestException); } @@ -171,7 +170,7 @@ public ConsumerFailingOnConsumeOk(IChannel channel) : base(channel) { } - public override Task HandleBasicConsumeOk(string consumerTag) + public override Task HandleBasicConsumeOkAsync(string consumerTag) { return Task.FromException(TestException); } @@ -183,7 +182,7 @@ public ConsumerFailingOnCancelOk(IChannel channel) : base(channel) { } - public override Task HandleBasicCancelOk(string consumerTag) + public override Task HandleBasicCancelOkAsync(string consumerTag) { return Task.FromException(TestException); } diff --git a/projects/Test/Integration/TestConsumerOperationDispatch.cs b/projects/Test/Integration/TestAsyncConsumerOperationDispatch.cs similarity index 92% rename from projects/Test/Integration/TestConsumerOperationDispatch.cs rename to projects/Test/Integration/TestAsyncConsumerOperationDispatch.cs index a22ffd0e73..bf4db9077e 100644 --- a/projects/Test/Integration/TestConsumerOperationDispatch.cs +++ b/projects/Test/Integration/TestAsyncConsumerOperationDispatch.cs @@ -40,7 +40,7 @@ namespace Test.Integration { - public class TestConsumerOperationDispatch : IntegrationFixture + public class TestAsyncConsumerOperationDispatch : IntegrationFixture { // number of channels (and consumers) private const int Y = 100; @@ -55,7 +55,7 @@ public class TestConsumerOperationDispatch : IntegrationFixture private readonly List _consumers = new List(); - public TestConsumerOperationDispatch(ITestOutputHelper output) : base(output) + public TestAsyncConsumerOperationDispatch(ITestOutputHelper output) : base(output) { } @@ -74,7 +74,7 @@ public override async Task DisposeAsync() await base.DisposeAsync(); } - private class CollectingConsumer : DefaultBasicConsumer + private class CollectingConsumer : AsyncDefaultBasicConsumer { public List DeliveryTags { get; } @@ -165,11 +165,12 @@ public async Task TestChannelShutdownDoesNotShutDownDispatcher() await ch2.QueueBindAsync(queue: q2, exchange: _x, routingKey: ""); var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - await ch1.BasicConsumeAsync(q1, true, new EventingBasicConsumer(ch1)); - var c2 = new EventingBasicConsumer(ch2); + await ch1.BasicConsumeAsync(q1, true, new AsyncEventingBasicConsumer(ch1)); + var c2 = new AsyncEventingBasicConsumer(ch2); c2.Received += (object sender, BasicDeliverEventArgs e) => { tcs.SetResult(true); + return Task.CompletedTask; }; await ch2.BasicConsumeAsync(q2, true, c2); // closing this channel must not affect ch2 @@ -179,7 +180,7 @@ public async Task TestChannelShutdownDoesNotShutDownDispatcher() await WaitAsync(tcs, "received event"); } - private class ShutdownLatchConsumer : DefaultBasicConsumer + private class ShutdownLatchConsumer : AsyncDefaultBasicConsumer { public ShutdownLatchConsumer() { @@ -190,7 +191,7 @@ public ShutdownLatchConsumer() public readonly TaskCompletionSource Latch; public readonly TaskCompletionSource DuplicateLatch; - public override void HandleChannelShutdown(object channel, ShutdownEventArgs reason) + public override Task HandleChannelShutdownAsync(object channel, ShutdownEventArgs reason) { // keep track of duplicates if (Latch.Task.IsCompletedSuccessfully()) @@ -201,6 +202,8 @@ public override void HandleChannelShutdown(object channel, ShutdownEventArgs rea { Latch.SetResult(true); } + + return Task.CompletedTask; } } diff --git a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs index 0dc5db573e..0fad674463 100644 --- a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs +++ b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs @@ -49,7 +49,7 @@ public class TestAsyncEventingBasicConsumer : IntegrationFixture new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); public TestAsyncEventingBasicConsumer(ITestOutputHelper output) - : base(output, dispatchConsumersAsync: true, consumerDispatchConcurrency: 2) + : base(output, consumerDispatchConcurrency: 2) { _ctr = _cts.Token.Register(OnTokenCanceled); } diff --git a/projects/Test/Integration/TestBasicPublish.cs b/projects/Test/Integration/TestBasicPublish.cs index 0b61d0edc6..23612a8ee2 100644 --- a/projects/Test/Integration/TestBasicPublish.cs +++ b/projects/Test/Integration/TestBasicPublish.cs @@ -65,13 +65,14 @@ public async Task TestBasicRoundtripArray() var bp = new BasicProperties(); byte[] sendBody = _encoding.GetBytes("hi"); byte[] consumeBody = null; - var consumer = new EventingBasicConsumer(_channel); + var consumer = new AsyncEventingBasicConsumer(_channel); using (var consumerReceivedSemaphore = new SemaphoreSlim(0, 1)) { consumer.Received += (o, a) => { consumeBody = a.Body.ToArray(); consumerReceivedSemaphore.Release(); + return Task.CompletedTask; }; string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer); @@ -94,13 +95,14 @@ public async Task TestBasicRoundtripCachedString() CachedString queueName = new CachedString((await _channel.QueueDeclareAsync()).QueueName); byte[] sendBody = _encoding.GetBytes("hi"); byte[] consumeBody = null; - var consumer = new EventingBasicConsumer(_channel); + var consumer = new AsyncEventingBasicConsumer(_channel); using (var consumerReceivedSemaphore = new SemaphoreSlim(0, 1)) { consumer.Received += (o, a) => { consumeBody = a.Body.ToArray(); consumerReceivedSemaphore.Release(); + return Task.CompletedTask; }; string tag = await _channel.BasicConsumeAsync(queueName.Value, true, consumer); @@ -122,13 +124,14 @@ public async Task TestBasicRoundtripReadOnlyMemory() QueueDeclareOk q = await _channel.QueueDeclareAsync(); byte[] sendBody = _encoding.GetBytes("hi"); byte[] consumeBody = null; - var consumer = new EventingBasicConsumer(_channel); + var consumer = new AsyncEventingBasicConsumer(_channel); using (var consumerReceivedSemaphore = new SemaphoreSlim(0, 1)) { consumer.Received += (o, a) => { consumeBody = a.Body.ToArray(); consumerReceivedSemaphore.Release(); + return Task.CompletedTask; }; string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer); @@ -149,7 +152,7 @@ public async Task CanNotModifyPayloadAfterPublish() QueueDeclareOk q = await _channel.QueueDeclareAsync(); byte[] sendBody = new byte[1000]; - var consumer = new EventingBasicConsumer(_channel); + var consumer = new AsyncEventingBasicConsumer(_channel); using (var consumerReceivedSemaphore = new SemaphoreSlim(0, 1)) { bool modified = true; @@ -160,6 +163,7 @@ public async Task CanNotModifyPayloadAfterPublish() modified = false; } consumerReceivedSemaphore.Release(); + return Task.CompletedTask; }; string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer); @@ -194,7 +198,7 @@ public async Task TestMaxInboundMessageBodySize() bool sawConnectionShutdown = false; bool sawChannelShutdown = false; bool sawConsumerRegistered = false; - bool sawConsumerCancelled = false; + bool sawConsumerUnregistered = false; using (IConnection conn = await cf.CreateConnectionAsync()) { @@ -221,31 +225,30 @@ public async Task TestMaxInboundMessageBodySize() QueueDeclareOk q = await channel.QueueDeclareAsync(); - var consumer = new EventingBasicConsumer(channel); + var consumer = new AsyncEventingBasicConsumer(channel); consumer.Shutdown += (o, a) => { tcs.SetResult(true); + return Task.CompletedTask; }; consumer.Registered += (o, a) => { sawConsumerRegistered = true; + return Task.CompletedTask; }; consumer.Unregistered += (o, a) => { - throw new XunitException("Unexpected consumer.Unregistered"); - }; - - consumer.ConsumerCancelled += (o, a) => - { - sawConsumerCancelled = true; + sawConsumerUnregistered = true; + return Task.CompletedTask; }; consumer.Received += (o, a) => { Interlocked.Increment(ref count); + return Task.CompletedTask; }; string tag = await channel.BasicConsumeAsync(q.QueueName, true, consumer); @@ -258,7 +261,7 @@ public async Task TestMaxInboundMessageBodySize() Assert.True(sawConnectionShutdown); Assert.True(sawChannelShutdown); Assert.True(sawConsumerRegistered); - Assert.True(sawConsumerCancelled); + Assert.True(sawConsumerUnregistered); try { @@ -303,7 +306,7 @@ public async Task TestPropertiesRoundtrip_Headers() bp.Headers["Hello"] = "World"; byte[] sendBody = _encoding.GetBytes("hi"); byte[] consumeBody = null; - var consumer = new EventingBasicConsumer(_channel); + var consumer = new AsyncEventingBasicConsumer(_channel); using (var consumerReceivedSemaphore = new SemaphoreSlim(0, 1)) { string response = null; @@ -312,6 +315,7 @@ public async Task TestPropertiesRoundtrip_Headers() response = _encoding.GetString(a.BasicProperties.Headers["Hello"] as byte[]); consumeBody = a.Body.ToArray(); consumerReceivedSemaphore.Release(); + return Task.CompletedTask; }; string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer); diff --git a/projects/Test/Integration/TestChannelSoftErrors.cs b/projects/Test/Integration/TestChannelSoftErrors.cs index 402c09bad7..1ab02d84fe 100644 --- a/projects/Test/Integration/TestChannelSoftErrors.cs +++ b/projects/Test/Integration/TestChannelSoftErrors.cs @@ -67,7 +67,7 @@ public async Task TestConsumeOnNonExistingQueue() { OperationInterruptedException exception = await Assert.ThrowsAsync(() => { - var consumer = new EventingBasicConsumer(_channel); + var consumer = new AsyncEventingBasicConsumer(_channel); return _channel.BasicConsumeAsync("NonExistingQueue", true, consumer); }); diff --git a/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs b/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs index fce8dbed88..e6e7e786fe 100644 --- a/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs +++ b/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs @@ -125,7 +125,7 @@ public async Task TestConsumerWorkServiceRecovery() { string q = (await ch.QueueDeclareAsync("dotnet-client.recovery.consumer_work_pool1", false, false, false)).QueueName; - var cons = new EventingBasicConsumer(ch); + var cons = new AsyncEventingBasicConsumer(ch); await ch.BasicConsumeAsync(q, true, cons); await AssertConsumerCountAsync(ch, q, 1); @@ -133,7 +133,11 @@ public async Task TestConsumerWorkServiceRecovery() Assert.True(ch.IsOpen); var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - cons.Received += (s, args) => tcs.SetResult(true); + cons.Received += (s, args) => + { + tcs.SetResult(true); + return Task.CompletedTask; + }; await ch.BasicPublishAsync("", q, _encoding.GetBytes("msg")); await WaitAsync(tcs, "received event"); @@ -158,7 +162,7 @@ public async Task TestConsumerRecoveryOnClientNamedQueueWithOneRecovery() string q1 = (await ch.QueueDeclareAsync(q0, false, false, false)).QueueName; Assert.Equal(q0, q1); - var cons = new EventingBasicConsumer(ch); + var cons = new AsyncEventingBasicConsumer(ch); await ch.BasicConsumeAsync(q1, true, cons); await AssertConsumerCountAsync(ch, q1, 1); @@ -181,7 +185,11 @@ public async Task TestConsumerRecoveryOnClientNamedQueueWithOneRecovery() Assert.False(queueNameChangeAfterRecoveryCalled); var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - cons.Received += (s, args) => tcs.SetResult(true); + cons.Received += (s, args) => + { + tcs.SetResult(true); + return Task.CompletedTask; + }; await ch.BasicPublishAsync("", q1, _encoding.GetBytes("msg")); await WaitAsync(tcs, "received event"); @@ -207,7 +215,7 @@ public async Task TestConsumerRecoveryWithServerNamedQueue() string qname = queueDeclareResult.QueueName; Assert.False(string.IsNullOrEmpty(qname)); - var cons = new EventingBasicConsumer(ch); + var cons = new AsyncEventingBasicConsumer(ch); await ch.BasicConsumeAsync(string.Empty, true, cons); await AssertConsumerCountAsync(ch, qname, 1); @@ -292,13 +300,21 @@ public async Task TestTopologyRecoveryConsumerFilter() await ch.QueuePurgeAsync(queueWithIgnoredConsumer); var consumerRecoveryTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var consumerToRecover = new EventingBasicConsumer(ch); - consumerToRecover.Received += (source, ea) => consumerRecoveryTcs.SetResult(true); + var consumerToRecover = new AsyncEventingBasicConsumer(ch); + consumerToRecover.Received += (source, ea) => + { + consumerRecoveryTcs.SetResult(true); + return Task.CompletedTask; + }; await ch.BasicConsumeAsync(queueWithRecoveredConsumer, true, "recovered.consumer", consumerToRecover); var ignoredTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var consumerToIgnore = new EventingBasicConsumer(ch); - consumerToIgnore.Received += (source, ea) => ignoredTcs.SetResult(true); + var consumerToIgnore = new AsyncEventingBasicConsumer(ch); + consumerToIgnore.Received += (source, ea) => + { + ignoredTcs.SetResult(true); + return Task.CompletedTask; + }; await ch.BasicConsumeAsync(queueWithIgnoredConsumer, true, "filtered.consumer", consumerToIgnore); try diff --git a/projects/Test/Integration/TestConnectionTopologyRecovery.cs b/projects/Test/Integration/TestConnectionTopologyRecovery.cs index e6883b03d4..e0606fedb3 100644 --- a/projects/Test/Integration/TestConnectionTopologyRecovery.cs +++ b/projects/Test/Integration/TestConnectionTopologyRecovery.cs @@ -63,7 +63,7 @@ public async Task TestRecoverTopologyOnDisposedChannel() await ch.CloseAsync(); } - var cons = new EventingBasicConsumer(_channel); + var cons = new AsyncEventingBasicConsumer(_channel); await _channel.BasicConsumeAsync(q, true, cons); await AssertConsumerCountAsync(_channel, q, 1); @@ -71,7 +71,11 @@ public async Task TestRecoverTopologyOnDisposedChannel() await AssertConsumerCountAsync(_channel, q, 1); var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - cons.Received += (s, args) => tcs.SetResult(true); + cons.Received += (s, args) => + { + tcs.SetResult(true); + return Task.CompletedTask; + }; await _channel.BasicPublishAsync("", q, _messageBody); await WaitAsync(tcs, "received event"); @@ -246,13 +250,21 @@ public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities() await ch.QueuePurgeAsync(queue2); var consumerReceivedTcs1 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var consumer1 = new EventingBasicConsumer(ch); - consumer1.Received += (source, ea) => consumerReceivedTcs1.SetResult(true); + var consumer1 = new AsyncEventingBasicConsumer(ch); + consumer1.Received += (source, ea) => + { + consumerReceivedTcs1.SetResult(true); + return Task.CompletedTask; + }; await ch.BasicConsumeAsync(queue1, true, "recovered.consumer", consumer1); var consumerReceivedTcs2 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var consumer2 = new EventingBasicConsumer(ch); - consumer2.Received += (source, ea) => consumerReceivedTcs2.SetResult(true); + var consumer2 = new AsyncEventingBasicConsumer(ch); + consumer2.Received += (source, ea) => + { + consumerReceivedTcs2.SetResult(true); + return Task.CompletedTask; + }; await ch.BasicConsumeAsync(queue2, true, "filtered.consumer", consumer2); await _channel.ExchangeDeleteAsync(exchange); @@ -506,8 +518,12 @@ public async Task TestTopologyRecoveryConsumerExceptionHandler() await _channel.QueuePurgeAsync(queueWithExceptionConsumer); var recoveredConsumerReceivedTcs = new ManualResetEventSlim(false); - var consumerToRecover = new EventingBasicConsumer(ch); - consumerToRecover.Received += (source, ea) => recoveredConsumerReceivedTcs.Set(); + var consumerToRecover = new AsyncEventingBasicConsumer(ch); + consumerToRecover.Received += (source, ea) => + { + recoveredConsumerReceivedTcs.Set(); + return Task.CompletedTask; + }; await ch.BasicConsumeAsync(queueWithExceptionConsumer, true, "exception.consumer", consumerToRecover); await _channel.QueueDeleteAsync(queueWithExceptionConsumer); diff --git a/projects/Test/Integration/TestConsumer.cs b/projects/Test/Integration/TestConsumer.cs deleted file mode 100644 index 3b95c14677..0000000000 --- a/projects/Test/Integration/TestConsumer.cs +++ /dev/null @@ -1,178 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. -//--------------------------------------------------------------------------- - -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using RabbitMQ.Client; -using RabbitMQ.Client.Events; -using Xunit; -using Xunit.Abstractions; - -namespace Test.Integration -{ - public class TestConsumer : IntegrationFixture - { - private readonly byte[] _body = GetRandomBody(64); - private readonly ShutdownEventArgs _closeArgs = new ShutdownEventArgs(ShutdownInitiator.Application, Constants.ReplySuccess, "normal shutdown"); - - public TestConsumer(ITestOutputHelper output) : base(output) - { - } - - [Fact] - public async Task AsyncConsumerShouldThrowInvalidOperationException() - { - Assert.False(_conn.DispatchConsumersAsyncEnabled); - - bool sawException = false; - QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, false); - await _channel.BasicPublishAsync(string.Empty, q.QueueName, GetRandomBody(1024)); - var consumer = new AsyncEventingBasicConsumer(_channel); - try - { - string consumerTag = await _channel.BasicConsumeAsync(q.QueueName, false, string.Empty, false, false, null, consumer); - } - catch (InvalidOperationException) - { - sawException = true; - } - Assert.True(sawException, "did not see expected InvalidOperationException"); - } - - [Fact] - public async Task TestBasicRoundtrip() - { - Assert.False(_conn.DispatchConsumersAsyncEnabled); - - TimeSpan waitSpan = TimeSpan.FromSeconds(2); - QueueDeclareOk q = await _channel.QueueDeclareAsync(); - await _channel.BasicPublishAsync("", q.QueueName, _body); - var consumer = new EventingBasicConsumer(_channel); - using (var consumerReceivedSemaphore = new SemaphoreSlim(0, 1)) - { - consumer.Received += (o, a) => - { - consumerReceivedSemaphore.Release(); - }; - string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer); - // ensure we get a delivery - bool waitRes = await consumerReceivedSemaphore.WaitAsync(waitSpan); - Assert.True(waitRes); - // unsubscribe and ensure no further deliveries - await _channel.BasicCancelAsync(tag); - await _channel.BasicPublishAsync("", q.QueueName, _body); - bool waitResFalse = await consumerReceivedSemaphore.WaitAsync(waitSpan); - Assert.False(waitResFalse); - } - } - - [Fact] - public async Task TestBasicRoundtripNoWait() - { - Assert.False(_conn.DispatchConsumersAsyncEnabled); - - QueueDeclareOk q = await _channel.QueueDeclareAsync(); - await _channel.BasicPublishAsync("", q.QueueName, _body); - var consumer = new EventingBasicConsumer(_channel); - using (var consumerReceivedSemaphore = new SemaphoreSlim(0, 1)) - { - consumer.Received += (o, a) => - { - consumerReceivedSemaphore.Release(); - }; - string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer); - // ensure we get a delivery - bool waitRes0 = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(2)); - Assert.True(waitRes0); - // unsubscribe and ensure no further deliveries - await _channel.BasicCancelAsync(tag, noWait: true); - await _channel.BasicPublishAsync("", q.QueueName, _body); - bool waitRes1 = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(2)); - Assert.False(waitRes1); - } - } - - [Fact] - public async Task ConcurrentEventingTestForReceived() - { - Assert.False(_conn.DispatchConsumersAsyncEnabled); - - const int NumberOfThreads = 4; - const int NumberOfRegistrations = 5000; - - byte[] called = new byte[NumberOfThreads * NumberOfRegistrations]; - - QueueDeclareOk q = await _channel.QueueDeclareAsync(); - var consumer = new EventingBasicConsumer(_channel); - await _channel.BasicConsumeAsync(q.QueueName, true, consumer); - var countdownEvent = new CountdownEvent(NumberOfThreads); - - var tasks = new List(); - for (int i = 0; i < NumberOfThreads; i++) - { - int threadIndex = i; - tasks.Add(Task.Run(() => - { - int start = threadIndex * NumberOfRegistrations; - for (int j = start; j < start + NumberOfRegistrations; j++) - { - int receivedIndex = j; - consumer.Received += (sender, eventArgs) => - { - called[receivedIndex] = 1; - }; - } - countdownEvent.Signal(); - })); - } - - countdownEvent.Wait(); - - // Add last receiver - var lastConsumerReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.Received += (o, a) => - { - lastConsumerReceivedTcs.SetResult(true); - }; - - // Send message - await _channel.BasicPublishAsync("", q.QueueName, ReadOnlyMemory.Empty); - - await lastConsumerReceivedTcs.Task.WaitAsync(TimingFixture.TestTimeout); - Assert.True(await lastConsumerReceivedTcs.Task); - - // Check received messages - Assert.Equal(-1, called.AsSpan().IndexOf((byte)0)); - } - } -} diff --git a/projects/Test/Integration/TestConsumerExceptions.cs b/projects/Test/Integration/TestConsumerExceptions.cs deleted file mode 100644 index 94c9633cdf..0000000000 --- a/projects/Test/Integration/TestConsumerExceptions.cs +++ /dev/null @@ -1,179 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. -//--------------------------------------------------------------------------- - -using System; -using System.Threading.Tasks; -using RabbitMQ.Client; -using Xunit; -using Xunit.Abstractions; - -namespace Test.Integration -{ - public class TestConsumerExceptions : IntegrationFixture - { - protected override void DisposeAssertions() - { - /* - * Note: don't do anything since these tests expect callback - * exceptions - */ - } - - private class ConsumerFailingOnDelivery : DefaultBasicConsumer - { - public ConsumerFailingOnDelivery(IChannel channel) : base(channel) - { - } - - public override Task HandleBasicDeliverAsync(string consumerTag, - ulong deliveryTag, - bool redelivered, - string exchange, - string routingKey, - IReadOnlyBasicProperties properties, - ReadOnlyMemory body) - { - throw new Exception("oops"); - } - } - - private class ConsumerFailingOnCancel : DefaultBasicConsumer - { - public ConsumerFailingOnCancel(IChannel channel) : base(channel) - { - } - - public override void HandleBasicCancel(string consumerTag) - { - throw new Exception("oops"); - } - } - - private class ConsumerFailingOnShutdown : DefaultBasicConsumer - { - public ConsumerFailingOnShutdown(IChannel channel) : base(channel) - { - } - - public override void HandleChannelShutdown(object channel, ShutdownEventArgs reason) - { - throw new Exception("oops"); - } - } - - private class ConsumerFailingOnConsumeOk : DefaultBasicConsumer - { - public ConsumerFailingOnConsumeOk(IChannel channel) : base(channel) - { - } - - public override void HandleBasicConsumeOk(string consumerTag) - { - throw new Exception("oops"); - } - } - - private class ConsumerFailingOnCancelOk : DefaultBasicConsumer - { - public ConsumerFailingOnCancelOk(IChannel channel) : base(channel) - { - } - - public override void HandleBasicCancelOk(string consumerTag) - { - throw new Exception("oops"); - } - } - - protected async Task TestExceptionHandlingWithAsync(IBasicConsumer consumer, - Func action) - { - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - bool notified = false; - string q = await _channel.QueueDeclareAsync(); - - _channel.CallbackException += (m, evt) => - { - notified = true; - tcs.SetResult(true); - }; - - string tag = await _channel.BasicConsumeAsync(q, true, consumer); - await action(_channel, q, consumer, tag); - await WaitAsync(tcs, "callback exception"); - - Assert.True(notified); - } - - public TestConsumerExceptions(ITestOutputHelper output) : base(output) - { - } - - [Fact] - public Task TestCancelNotificationExceptionHandling() - { - IBasicConsumer consumer = new ConsumerFailingOnCancel(_channel); - return TestExceptionHandlingWithAsync(consumer, (ch, q, c, ct) => - { - return ch.QueueDeleteAsync(q); - }); - } - - [Fact] - public Task TestConsumerCancelOkExceptionHandling() - { - IBasicConsumer consumer = new ConsumerFailingOnCancelOk(_channel); - return TestExceptionHandlingWithAsync(consumer, (ch, q, c, ct) => ch.BasicCancelAsync(ct)); - } - - [Fact] - public Task TestConsumerConsumeOkExceptionHandling() - { - IBasicConsumer consumer = new ConsumerFailingOnConsumeOk(_channel); - return TestExceptionHandlingWithAsync(consumer, (ch, q, c, ct) => Task.CompletedTask); - } - - [Fact] - public Task TestConsumerShutdownExceptionHandling() - { - IBasicConsumer consumer = new ConsumerFailingOnShutdown(_channel); - return TestExceptionHandlingWithAsync(consumer, (ch, q, c, ct) => ch.CloseAsync()); - } - - [Fact] - public Task TestDeliveryExceptionHandling() - { - IBasicConsumer consumer = new ConsumerFailingOnDelivery(_channel); - return TestExceptionHandlingWithAsync(consumer, (ch, q, c, ct) => - ch.BasicPublishAsync("", q, _encoding.GetBytes("msg")).AsTask()); - } - } -} diff --git a/projects/Test/Integration/TestEventingConsumer.cs b/projects/Test/Integration/TestEventingConsumer.cs deleted file mode 100644 index 5072dcdbc1..0000000000 --- a/projects/Test/Integration/TestEventingConsumer.cs +++ /dev/null @@ -1,133 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. -//--------------------------------------------------------------------------- - -using System.Threading.Tasks; -using RabbitMQ.Client; -using RabbitMQ.Client.Events; -using Xunit; -using Xunit.Abstractions; - -namespace Test.Integration -{ - public class TestEventingConsumer : IntegrationFixture - { - public TestEventingConsumer(ITestOutputHelper output) : base(output) - { - } - - [Fact] - public async Task TestEventingConsumerRegistrationEvents() - { - string q = await _channel.QueueDeclareAsync(); - - var registeredTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - object registeredSender = null; - - var unregisteredTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - object unregisteredSender = null; - - EventingBasicConsumer ec = new EventingBasicConsumer(_channel); - ec.Registered += (s, args) => - { - registeredSender = s; - registeredTcs.SetResult(true); - }; - - ec.Unregistered += (s, args) => - { - unregisteredSender = s; - unregisteredTcs.SetResult(true); - }; - - string tag = await _channel.BasicConsumeAsync(q, false, ec); - await WaitAsync(registeredTcs, "consumer registered"); - - Assert.NotNull(registeredSender); - Assert.Equal(ec, registeredSender); - Assert.Equal(_channel, ((EventingBasicConsumer)registeredSender).Channel); - - await _channel.BasicCancelAsync(tag); - - await WaitAsync(unregisteredTcs, "consumer unregistered"); - Assert.NotNull(unregisteredSender); - Assert.Equal(ec, unregisteredSender); - Assert.Equal(_channel, ((EventingBasicConsumer)unregisteredSender).Channel); - } - - [Fact] - public async Task TestEventingConsumerDeliveryEvents() - { - var tcs0 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - string q = await _channel.QueueDeclareAsync(); - - bool receivedInvoked = false; - object receivedSender = null; - - var ec = new EventingBasicConsumer(_channel); - ec.Received += (s, args) => - { - receivedInvoked = true; - receivedSender = s; - tcs0.SetResult(true); - }; - - await _channel.BasicConsumeAsync(q, true, ec); - await _channel.BasicPublishAsync("", q, _encoding.GetBytes("msg")); - - await WaitAsync(tcs0, "received event"); - var tcs1 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - - Assert.True(receivedInvoked); - Assert.NotNull(receivedSender); - Assert.Equal(ec, receivedSender); - Assert.Equal(_channel, ((EventingBasicConsumer)receivedSender).Channel); - - bool shutdownInvoked = false; - object shutdownSender = null; - - ec.Shutdown += (s, args) => - { - shutdownInvoked = true; - shutdownSender = s; - tcs1.SetResult(true); - }; - - await _channel.CloseAsync(); - - await WaitAsync(tcs1, "shutdown event"); - - Assert.True(shutdownInvoked); - Assert.NotNull(shutdownSender); - Assert.Equal(ec, shutdownSender); - Assert.Equal(_channel, ((EventingBasicConsumer)shutdownSender).Channel); - } - } -} diff --git a/projects/Test/Integration/TestFloodPublishing.cs b/projects/Test/Integration/TestFloodPublishing.cs index 3abe0e8f88..513373a3c4 100644 --- a/projects/Test/Integration/TestFloodPublishing.cs +++ b/projects/Test/Integration/TestFloodPublishing.cs @@ -118,7 +118,6 @@ public async Task TestUnthrottledFloodPublishing() public async Task TestMultithreadFloodPublishing() { _connFactory = CreateConnectionFactory(); - _connFactory.DispatchConsumersAsync = true; _connFactory.AutomaticRecoveryEnabled = false; _conn = await _connFactory.CreateConnectionAsync(); diff --git a/projects/Test/Integration/TestMainLoop.cs b/projects/Test/Integration/TestMainLoop.cs index 219da5d61b..742515836e 100644 --- a/projects/Test/Integration/TestMainLoop.cs +++ b/projects/Test/Integration/TestMainLoop.cs @@ -52,7 +52,7 @@ protected override void DisposeAssertions() */ } - private sealed class FaultyConsumer : DefaultBasicConsumer + private sealed class FaultyConsumer : AsyncDefaultBasicConsumer { public FaultyConsumer(IChannel channel) : base(channel) { } diff --git a/projects/Test/OAuth2/TestOAuth2.cs b/projects/Test/OAuth2/TestOAuth2.cs index 6357e173c2..f42abeb718 100644 --- a/projects/Test/OAuth2/TestOAuth2.cs +++ b/projects/Test/OAuth2/TestOAuth2.cs @@ -115,7 +115,6 @@ public TestOAuth2(ITestOutputHelper testOutputHelper) _connectionFactory = new ConnectionFactory { AutomaticRecoveryEnabled = true, - DispatchConsumersAsync = true, CredentialsProvider = GetCredentialsProvider(options), CredentialsRefresher = GetCredentialsRefresher(), ClientProvidedName = nameof(TestOAuth2) diff --git a/projects/Test/SequentialIntegration/TestActivitySource.cs b/projects/Test/SequentialIntegration/TestActivitySource.cs index 2bd3227d48..c8434f2c27 100644 --- a/projects/Test/SequentialIntegration/TestActivitySource.cs +++ b/projects/Test/SequentialIntegration/TestActivitySource.cs @@ -92,13 +92,14 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); byte[] sendBody = Encoding.UTF8.GetBytes("hi"); byte[] consumeBody = null; - var consumer = new EventingBasicConsumer(_channel); + var consumer = new AsyncEventingBasicConsumer(_channel); var consumerReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); consumer.Received += (o, a) => { consumeBody = a.Body.ToArray(); consumerReceivedTcs.SetResult(true); + return Task.CompletedTask; }; string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); @@ -130,13 +131,14 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTags(bool use QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); byte[] sendBody = Encoding.UTF8.GetBytes("hi"); byte[] consumeBody = null; - var consumer = new EventingBasicConsumer(_channel); + var consumer = new AsyncEventingBasicConsumer(_channel); var consumerReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); consumer.Received += (o, a) => { consumeBody = a.Body.ToArray(); consumerReceivedTcs.SetResult(true); + return Task.CompletedTask; }; string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); @@ -170,13 +172,14 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTags(boo QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); byte[] sendBody = Encoding.UTF8.GetBytes("hi"); byte[] consumeBody = null; - var consumer = new EventingBasicConsumer(_channel); + var consumer = new AsyncEventingBasicConsumer(_channel); var consumerReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); consumer.Received += (o, a) => { consumeBody = a.Body.ToArray(); consumerReceivedTcs.SetResult(true); + return Task.CompletedTask; }; string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); @@ -210,13 +213,14 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); byte[] sendBody = Encoding.UTF8.GetBytes("hi"); byte[] consumeBody = null; - var consumer = new EventingBasicConsumer(_channel); + var consumer = new AsyncEventingBasicConsumer(_channel); var consumerReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); consumer.Received += (o, a) => { consumeBody = a.Body.ToArray(); consumerReceivedTcs.SetResult(true); + return Task.CompletedTask; }; string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); @@ -249,13 +253,14 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); byte[] sendBody = Encoding.UTF8.GetBytes("hi"); byte[] consumeBody = null; - var consumer = new EventingBasicConsumer(_channel); + var consumer = new AsyncEventingBasicConsumer(_channel); var consumerReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); consumer.Received += (o, a) => { consumeBody = a.Body.ToArray(); consumerReceivedTcs.SetResult(true); + return Task.CompletedTask; }; string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); @@ -290,13 +295,14 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); byte[] sendBody = Encoding.UTF8.GetBytes("hi"); byte[] consumeBody = null; - var consumer = new EventingBasicConsumer(_channel); + var consumer = new AsyncEventingBasicConsumer(_channel); var consumerReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); consumer.Received += (o, a) => { consumeBody = a.Body.ToArray(); consumerReceivedTcs.SetResult(true); + return Task.CompletedTask; }; string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); diff --git a/projects/Test/SequentialIntegration/TestOpenTelemetry.cs b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs index dbeb603180..f8088baf01 100644 --- a/projects/Test/SequentialIntegration/TestOpenTelemetry.cs +++ b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs @@ -104,7 +104,7 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); byte[] sendBody = Encoding.UTF8.GetBytes("hi"); byte[] consumeBody = null; - var consumer = new EventingBasicConsumer(_channel); + var consumer = new AsyncEventingBasicConsumer(_channel); var consumerReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); consumer.Received += (o, a) => @@ -120,6 +120,8 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera consumerReceivedTcs.SetException( EqualException.ForMismatchedStrings(baggageGuid, baggageItem, 0, 0)); } + + return Task.CompletedTask; }; string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); @@ -160,7 +162,7 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); byte[] sendBody = Encoding.UTF8.GetBytes("hi"); byte[] consumeBody = null; - var consumer = new EventingBasicConsumer(_channel); + var consumer = new AsyncEventingBasicConsumer(_channel); var consumerReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); consumer.Received += (o, a) => @@ -176,6 +178,8 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs consumerReceivedTcs.SetException( EqualException.ForMismatchedStrings(baggageGuid, baggageItem, 0, 0)); } + + return Task.CompletedTask; }; string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); @@ -216,7 +220,7 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); byte[] sendBody = Encoding.UTF8.GetBytes("hi"); byte[] consumeBody = null; - var consumer = new EventingBasicConsumer(_channel); + var consumer = new AsyncEventingBasicConsumer(_channel); var consumerReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); consumer.Received += (o, a) => @@ -232,6 +236,8 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn consumerReceivedTcs.SetException( EqualException.ForMismatchedStrings(baggageGuid, baggageItem, 0, 0)); } + + return Task.CompletedTask; }; string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); @@ -273,7 +279,7 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); byte[] sendBody = Encoding.UTF8.GetBytes("hi"); byte[] consumeBody = null; - var consumer = new EventingBasicConsumer(_channel); + var consumer = new AsyncEventingBasicConsumer(_channel); var consumerReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); consumer.Received += (o, a) => @@ -289,6 +295,8 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo consumerReceivedTcs.SetException( EqualException.ForMismatchedStrings(baggageGuid, baggageItem, 0, 0)); } + + return Task.CompletedTask; }; string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);