diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt index d8b36cd5bf..c923a00eca 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt @@ -426,20 +426,20 @@ RabbitMQ.Client.IBasicProperties.UserId.get -> string RabbitMQ.Client.IBasicProperties.UserId.set -> void RabbitMQ.Client.IChannel RabbitMQ.Client.IChannel.BasicAckAsync(ulong deliveryTag, bool multiple, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask -RabbitMQ.Client.IChannel.BasicAcks -> System.EventHandler +RabbitMQ.Client.IChannel.BasicAcksAsync -> RabbitMQ.Client.Events.AsyncEventHandler RabbitMQ.Client.IChannel.BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask -RabbitMQ.Client.IChannel.BasicNacks -> System.EventHandler -RabbitMQ.Client.IChannel.BasicReturn -> System.EventHandler -RabbitMQ.Client.IChannel.CallbackException -> System.EventHandler +RabbitMQ.Client.IChannel.BasicNacksAsync -> RabbitMQ.Client.Events.AsyncEventHandler +RabbitMQ.Client.IChannel.BasicReturnAsync -> RabbitMQ.Client.Events.AsyncEventHandler +RabbitMQ.Client.IChannel.CallbackExceptionAsync -> RabbitMQ.Client.Events.AsyncEventHandler RabbitMQ.Client.IChannel.ChannelNumber.get -> int -RabbitMQ.Client.IChannel.ChannelShutdown -> System.EventHandler +RabbitMQ.Client.IChannel.ChannelShutdownAsync -> RabbitMQ.Client.Events.AsyncEventHandler RabbitMQ.Client.IChannel.CloseReason.get -> RabbitMQ.Client.ShutdownEventArgs RabbitMQ.Client.IChannel.ContinuationTimeout.get -> System.TimeSpan RabbitMQ.Client.IChannel.ContinuationTimeout.set -> void RabbitMQ.Client.IChannel.CurrentQueue.get -> string RabbitMQ.Client.IChannel.DefaultConsumer.get -> RabbitMQ.Client.IAsyncBasicConsumer RabbitMQ.Client.IChannel.DefaultConsumer.set -> void -RabbitMQ.Client.IChannel.FlowControl -> System.EventHandler +RabbitMQ.Client.IChannel.FlowControlAsync -> RabbitMQ.Client.Events.AsyncEventHandler RabbitMQ.Client.IChannel.GetNextPublishSequenceNumberAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask RabbitMQ.Client.IChannel.IsClosed.get -> bool RabbitMQ.Client.IChannel.IsOpen.get -> bool @@ -550,7 +550,7 @@ RabbitMQ.Client.IRecordedQueue.Exclusive.get -> bool RabbitMQ.Client.IRecordedQueue.IsServerNamed.get -> bool RabbitMQ.Client.IRecordedQueue.Name.get -> string RabbitMQ.Client.IRecoverable -RabbitMQ.Client.IRecoverable.Recovery -> System.EventHandler +RabbitMQ.Client.IRecoverable.RecoveryAsync -> RabbitMQ.Client.Events.AsyncEventHandler RabbitMQ.Client.ITcpClient RabbitMQ.Client.ITcpClient.Client.get -> System.Net.Sockets.Socket RabbitMQ.Client.ITcpClient.Close() -> void diff --git a/projects/RabbitMQ.Client/client/api/IChannel.cs b/projects/RabbitMQ.Client/client/api/IChannel.cs index fa03b6b4e0..bfe4224f6e 100644 --- a/projects/RabbitMQ.Client/client/api/IChannel.cs +++ b/projects/RabbitMQ.Client/client/api/IChannel.cs @@ -106,28 +106,28 @@ public interface IChannel : IDisposable /// /// Signalled when a Basic.Ack command arrives from the broker. /// - event EventHandler BasicAcks; + event AsyncEventHandler BasicAcksAsync; /// /// Signalled when a Basic.Nack command arrives from the broker. /// - event EventHandler BasicNacks; + event AsyncEventHandler BasicNacksAsync; /// /// Signalled when a Basic.Return command arrives from the broker. /// - event EventHandler BasicReturn; + event AsyncEventHandler BasicReturnAsync; /// /// Signalled when an exception occurs in a callback invoked by the channel. /// /// Examples of cases where this event will be signalled /// include exceptions thrown in methods, or - /// exceptions thrown in delegates etc. + /// exceptions thrown in delegates etc. /// - event EventHandler CallbackException; + event AsyncEventHandler CallbackExceptionAsync; - event EventHandler FlowControl; + event AsyncEventHandler FlowControlAsync; /// /// Notifies the destruction of the channel. @@ -136,7 +136,7 @@ public interface IChannel : IDisposable /// If the channel is already destroyed at the time an event /// handler is added to this event, the event handler will be fired immediately. /// - event EventHandler ChannelShutdown; + event AsyncEventHandler ChannelShutdownAsync; /// /// When in confirm mode, return the sequence number of the next message to be published. @@ -268,7 +268,7 @@ Task CloseAsync(ShutdownEventArgs reason, bool abort, /// /// Asynchronously enable publisher confirmations. /// - /// Set to false if tracking via and yourself. + /// Set to false if tracking via and yourself. /// CancellationToken for this operation. Task ConfirmSelectAsync(bool trackConfirmations = true, CancellationToken cancellationToken = default); diff --git a/projects/RabbitMQ.Client/client/api/IRecoverable.cs b/projects/RabbitMQ.Client/client/api/IRecoverable.cs index 6d8651c365..c8ce3edc43 100644 --- a/projects/RabbitMQ.Client/client/api/IRecoverable.cs +++ b/projects/RabbitMQ.Client/client/api/IRecoverable.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using RabbitMQ.Client.Events; namespace RabbitMQ.Client { @@ -38,6 +39,6 @@ namespace RabbitMQ.Client /// public interface IRecoverable { - event EventHandler Recovery; + event AsyncEventHandler RecoveryAsync; } } diff --git a/projects/RabbitMQ.Client/client/framing/Channel.cs b/projects/RabbitMQ.Client/client/framing/Channel.cs index 68b5dc6fc7..5ad4ea2587 100644 --- a/projects/RabbitMQ.Client/client/framing/Channel.cs +++ b/projects/RabbitMQ.Client/client/framing/Channel.cs @@ -97,8 +97,8 @@ protected override Task DispatchCommandAsync(IncomingCommand cmd, Cancella } case ProtocolCommandId.BasicReturn: { - HandleBasicReturn(cmd); - return Task.FromResult(true); + // Note: always returns true + return HandleBasicReturn(cmd); } case ProtocolCommandId.ChannelClose: { diff --git a/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs b/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs index 3731e5ab97..74e7431197 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs @@ -323,9 +323,10 @@ public override void HandleChannelShutdown(ShutdownEventArgs reason) // Nothing to do here! } - public void OnConnectionShutdown(object? sender, ShutdownEventArgs reason) + public Task OnConnectionShutdownAsync(object? sender, ShutdownEventArgs reason) { _tcs.TrySetResult(true); + return Task.CompletedTask; } } diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index 9ce69b0133..3ce67ae3c3 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -79,46 +79,46 @@ public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel _consumerDispatchConcurrency = consumerDispatchConcurrency; } - public event EventHandler BasicAcks + public event AsyncEventHandler BasicAcksAsync { - add => InnerChannel.BasicAcks += value; - remove => InnerChannel.BasicAcks -= value; + add => InnerChannel.BasicAcksAsync += value; + remove => InnerChannel.BasicAcksAsync -= value; } - public event EventHandler BasicNacks + public event AsyncEventHandler BasicNacksAsync { - add => InnerChannel.BasicNacks += value; - remove => InnerChannel.BasicNacks -= value; + add => InnerChannel.BasicNacksAsync += value; + remove => InnerChannel.BasicNacksAsync -= value; } - public event EventHandler BasicReturn + public event AsyncEventHandler BasicReturnAsync { - add => InnerChannel.BasicReturn += value; - remove => InnerChannel.BasicReturn -= value; + add => InnerChannel.BasicReturnAsync += value; + remove => InnerChannel.BasicReturnAsync -= value; } - public event EventHandler CallbackException + public event AsyncEventHandler CallbackExceptionAsync { - add => InnerChannel.CallbackException += value; - remove => InnerChannel.CallbackException -= value; + add => InnerChannel.CallbackExceptionAsync += value; + remove => InnerChannel.CallbackExceptionAsync -= value; } - public event EventHandler FlowControl + public event AsyncEventHandler FlowControlAsync { - add { InnerChannel.FlowControl += value; } - remove { InnerChannel.FlowControl -= value; } + add { InnerChannel.FlowControlAsync += value; } + remove { InnerChannel.FlowControlAsync -= value; } } - public event EventHandler ChannelShutdown + public event AsyncEventHandler ChannelShutdownAsync { - add => InnerChannel.ChannelShutdown += value; - remove => InnerChannel.ChannelShutdown -= value; + add => InnerChannel.ChannelShutdownAsync += value; + remove => InnerChannel.ChannelShutdownAsync -= value; } - public event EventHandler Recovery + public event AsyncEventHandler RecoveryAsync { - add { InnerChannel.Recovery += value; } - remove { InnerChannel.Recovery -= value; } + add { InnerChannel.RecoveryAsync += value; } + remove { InnerChannel.RecoveryAsync -= value; } } public IEnumerable ConsumerTags @@ -213,7 +213,8 @@ await _connection.RecoverConsumersAsync(this, newChannel, recordedEntitiesSemaph .ConfigureAwait(false); } - _innerChannel.RunRecoveryEventHandlers(this); + await _innerChannel.RunRecoveryEventHandlers(this) + .ConfigureAwait(false); return true; } diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 2875f364cd..9cbc9d5140 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -79,16 +79,16 @@ protected ChannelBase(ConnectionConfig config, ISession session, ContinuationTimeout = config.ContinuationTimeout; ConsumerDispatcher = new AsyncConsumerDispatcher(this, perChannelConsumerDispatchConcurrency.GetValueOrDefault(config.ConsumerDispatchConcurrency)); - Action onException = (exception, context) => - OnCallbackException(CallbackExceptionEventArgs.Build(exception, context)); - _basicAcksWrapper = new EventingWrapper("OnBasicAck", onException); - _basicNacksWrapper = new EventingWrapper("OnBasicNack", onException); - _basicReturnWrapper = new EventingWrapper("OnBasicReturn", onException); - _callbackExceptionWrapper = - new EventingWrapper(string.Empty, (exception, context) => { }); - _flowControlWrapper = new EventingWrapper("OnFlowControl", onException); - _channelShutdownWrapper = new EventingWrapper("OnChannelShutdown", onException); - _recoveryWrapper = new EventingWrapper("OnChannelRecovery", onException); + Func onExceptionAsync = (exception, context) => + OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context)); + _basicAcksAsyncWrapper = new AsyncEventingWrapper("OnBasicAck", onExceptionAsync); + _basicNacksAsyncWrapper = new AsyncEventingWrapper("OnBasicNack", onExceptionAsync); + _basicReturnAsyncWrapper = new AsyncEventingWrapper("OnBasicReturn", onExceptionAsync); + _callbackExceptionAsyncWrapper = + new AsyncEventingWrapper(string.Empty, (exception, context) => Task.CompletedTask); + _flowControlAsyncWrapper = new AsyncEventingWrapper("OnFlowControl", onExceptionAsync); + _channelShutdownAsyncWrapper = new AsyncEventingWrapper("OnChannelShutdownAsync", onExceptionAsync); + _recoveryAsyncWrapper = new AsyncEventingWrapper("OnChannelRecovery", onExceptionAsync); session.CommandReceived = HandleCommandAsync; session.SessionShutdownAsync += OnSessionShutdownAsync; Session = session; @@ -97,75 +97,75 @@ protected ChannelBase(ConnectionConfig config, ISession session, internal TimeSpan HandshakeContinuationTimeout { get; set; } = TimeSpan.FromSeconds(10); public TimeSpan ContinuationTimeout { get; set; } - public event EventHandler BasicAcks + public event AsyncEventHandler BasicAcksAsync { - add => _basicAcksWrapper.AddHandler(value); - remove => _basicAcksWrapper.RemoveHandler(value); + add => _basicAcksAsyncWrapper.AddHandler(value); + remove => _basicAcksAsyncWrapper.RemoveHandler(value); } - private EventingWrapper _basicAcksWrapper; + private AsyncEventingWrapper _basicAcksAsyncWrapper; - public event EventHandler BasicNacks + public event AsyncEventHandler BasicNacksAsync { - add => _basicNacksWrapper.AddHandler(value); - remove => _basicNacksWrapper.RemoveHandler(value); + add => _basicNacksAsyncWrapper.AddHandler(value); + remove => _basicNacksAsyncWrapper.RemoveHandler(value); } - private EventingWrapper _basicNacksWrapper; + private AsyncEventingWrapper _basicNacksAsyncWrapper; - public event EventHandler BasicReturn + public event AsyncEventHandler BasicReturnAsync { - add => _basicReturnWrapper.AddHandler(value); - remove => _basicReturnWrapper.RemoveHandler(value); + add => _basicReturnAsyncWrapper.AddHandler(value); + remove => _basicReturnAsyncWrapper.RemoveHandler(value); } - private EventingWrapper _basicReturnWrapper; + private AsyncEventingWrapper _basicReturnAsyncWrapper; - public event EventHandler CallbackException + public event AsyncEventHandler CallbackExceptionAsync { - add => _callbackExceptionWrapper.AddHandler(value); - remove => _callbackExceptionWrapper.RemoveHandler(value); + add => _callbackExceptionAsyncWrapper.AddHandler(value); + remove => _callbackExceptionAsyncWrapper.RemoveHandler(value); } - private EventingWrapper _callbackExceptionWrapper; + private AsyncEventingWrapper _callbackExceptionAsyncWrapper; - public event EventHandler FlowControl + public event AsyncEventHandler FlowControlAsync { - add => _flowControlWrapper.AddHandler(value); - remove => _flowControlWrapper.RemoveHandler(value); + add => _flowControlAsyncWrapper.AddHandler(value); + remove => _flowControlAsyncWrapper.RemoveHandler(value); } - private EventingWrapper _flowControlWrapper; + private AsyncEventingWrapper _flowControlAsyncWrapper; - public event EventHandler ChannelShutdown + public event AsyncEventHandler ChannelShutdownAsync { add { if (IsOpen) { - _channelShutdownWrapper.AddHandler(value); + _channelShutdownAsyncWrapper.AddHandler(value); } else { value(this, CloseReason); } } - remove => _channelShutdownWrapper.RemoveHandler(value); + remove => _channelShutdownAsyncWrapper.RemoveHandler(value); } - private EventingWrapper _channelShutdownWrapper; + private AsyncEventingWrapper _channelShutdownAsyncWrapper; - public event EventHandler Recovery + public event AsyncEventHandler RecoveryAsync { - add => _recoveryWrapper.AddHandler(value); - remove => _recoveryWrapper.RemoveHandler(value); + add => _recoveryAsyncWrapper.AddHandler(value); + remove => _recoveryAsyncWrapper.RemoveHandler(value); } - private EventingWrapper _recoveryWrapper; + private AsyncEventingWrapper _recoveryAsyncWrapper; - internal void RunRecoveryEventHandlers(object sender) + internal Task RunRecoveryEventHandlers(object sender) { - _recoveryWrapper.Invoke(sender, EventArgs.Empty); + return _recoveryAsyncWrapper.InvokeAsync(sender, EventArgs.Empty); } public int ChannelNumber => ((Session)Session).ChannelNumber; @@ -197,13 +197,13 @@ public void MaybeSetConnectionStartException(Exception ex) protected void TakeOver(ChannelBase other) { - _basicAcksWrapper.Takeover(other._basicAcksWrapper); - _basicNacksWrapper.Takeover(other._basicNacksWrapper); - _basicReturnWrapper.Takeover(other._basicReturnWrapper); - _callbackExceptionWrapper.Takeover(other._callbackExceptionWrapper); - _flowControlWrapper.Takeover(other._flowControlWrapper); - _channelShutdownWrapper.Takeover(other._channelShutdownWrapper); - _recoveryWrapper.Takeover(other._recoveryWrapper); + _basicAcksAsyncWrapper.Takeover(other._basicAcksAsyncWrapper); + _basicNacksAsyncWrapper.Takeover(other._basicNacksAsyncWrapper); + _basicReturnAsyncWrapper.Takeover(other._basicReturnAsyncWrapper); + _callbackExceptionAsyncWrapper.Takeover(other._callbackExceptionAsyncWrapper); + _flowControlAsyncWrapper.Takeover(other._flowControlAsyncWrapper); + _channelShutdownAsyncWrapper.Takeover(other._channelShutdownAsyncWrapper); + _recoveryAsyncWrapper.Takeover(other._recoveryAsyncWrapper); } public Task CloseAsync(ushort replyCode, string replyText, bool abort, @@ -223,7 +223,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - ChannelShutdown += k.OnConnectionShutdown; + ChannelShutdownAsync += k.OnConnectionShutdownAsync; enqueued = Enqueue(k); ConsumerDispatcher.Quiesce(); @@ -269,7 +269,7 @@ await ConsumerDispatcher.WaitForShutdownAsync() k.Dispose(); } _rpcSemaphore.Release(); - ChannelShutdown -= k.OnConnectionShutdown; + ChannelShutdownAsync -= k.OnConnectionShutdownAsync; } } @@ -463,9 +463,9 @@ protected ValueTask ModelSendObserveFlowControlAsync(in TMetho return Session.TransmitAsync(in method, in header, body, cancellationToken); } - internal void OnCallbackException(CallbackExceptionEventArgs args) + internal Task OnCallbackExceptionAsync(CallbackExceptionEventArgs args) { - _callbackExceptionWrapper.Invoke(this, args); + return _callbackExceptionAsyncWrapper.InvokeAsync(this, args); } ///Broadcasts notification of the final shutdown of the channel. @@ -480,14 +480,16 @@ internal void OnCallbackException(CallbackExceptionEventArgs args) ///shutdown event. See the definition of Enqueue() above. /// /// - private void OnChannelShutdown(ShutdownEventArgs reason) + private async Task OnChannelShutdownAsync(ShutdownEventArgs reason) { _continuationQueue.HandleChannelShutdown(reason); - _channelShutdownWrapper.Invoke(this, reason); + await _channelShutdownAsyncWrapper.InvokeAsync(this, reason) + .ConfigureAwait(false); if (ConfirmsAreEnabled) { - _confirmSemaphore.Wait(); + await _confirmSemaphore.WaitAsync() + .ConfigureAwait(false); try { if (_confirmsTaskCompletionSources?.Count > 0) @@ -518,13 +520,14 @@ private void OnChannelShutdown(ShutdownEventArgs reason) * * Aborted PR: https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1551 */ - private Task OnSessionShutdownAsync(object? sender, ShutdownEventArgs reason) + private async Task OnSessionShutdownAsync(object? sender, ShutdownEventArgs reason) { ConsumerDispatcher.Quiesce(); SetCloseReason(reason); - OnChannelShutdown(reason); - ConsumerDispatcher.Shutdown(reason); - return Task.CompletedTask; + await OnChannelShutdownAsync(reason) + .ConfigureAwait(false); + await ConsumerDispatcher.ShutdownAsync(reason) + .ConfigureAwait(false); } [MemberNotNull(nameof(_closeReason))] @@ -571,10 +574,11 @@ public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heart protected async Task HandleBasicAck(IncomingCommand cmd, CancellationToken cancellationToken) { var ack = new BasicAck(cmd.MethodSpan); - if (!_basicAcksWrapper.IsEmpty) + if (!_basicAcksAsyncWrapper.IsEmpty) { var args = new BasicAckEventArgs(ack._deliveryTag, ack._multiple); - _basicAcksWrapper.Invoke(this, args); + await _basicAcksAsyncWrapper.InvokeAsync(this, args) + .ConfigureAwait(false); } await HandleAckNack(ack._deliveryTag, ack._multiple, false, cancellationToken) @@ -585,11 +589,12 @@ await HandleAckNack(ack._deliveryTag, ack._multiple, false, cancellationToken) protected async Task HandleBasicNack(IncomingCommand cmd, CancellationToken cancellationToken) { var nack = new BasicNack(cmd.MethodSpan); - if (!_basicNacksWrapper.IsEmpty) + if (!_basicNacksAsyncWrapper.IsEmpty) { var args = new BasicNackEventArgs( nack._deliveryTag, nack._multiple, nack._requeue); - _basicNacksWrapper.Invoke(this, args); + await _basicNacksAsyncWrapper.InvokeAsync(this, args) + .ConfigureAwait(false); } await HandleAckNack(nack._deliveryTag, nack._multiple, true, cancellationToken) @@ -630,16 +635,18 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag) return deliveryTag; } - protected void HandleBasicReturn(IncomingCommand cmd) + protected async Task HandleBasicReturn(IncomingCommand cmd) { - if (!_basicReturnWrapper.IsEmpty) + if (!_basicReturnAsyncWrapper.IsEmpty) { var basicReturn = new BasicReturn(cmd.MethodSpan); var e = new BasicReturnEventArgs(basicReturn._replyCode, basicReturn._replyText, basicReturn._exchange, basicReturn._routingKey, new ReadOnlyBasicProperties(cmd.HeaderSpan), cmd.Body.Memory); - _basicReturnWrapper.Invoke(this, e); + await _basicReturnAsyncWrapper.InvokeAsync(this, e) + .ConfigureAwait(false); } + return true; } protected async Task HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken) @@ -698,9 +705,10 @@ protected async Task HandleChannelFlowAsync(IncomingCommand cmd, Cancellat await ModelSendAsync(method, cancellationToken). ConfigureAwait(false); - if (!_flowControlWrapper.IsEmpty) + if (!_flowControlAsyncWrapper.IsEmpty) { - _flowControlWrapper.Invoke(this, new FlowControlEventArgs(active)); + await _flowControlAsyncWrapper.InvokeAsync(this, new FlowControlEventArgs(active)) + .ConfigureAwait(false); } return true; diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs index 1949a5733d..a10140481c 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs @@ -57,7 +57,8 @@ await work.Consumer.HandleChannelShutdownAsync(_channel, work.Reason!) } catch (Exception e) { - _channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer)); + await _channel.OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer)) + .ConfigureAwait(false); } } } diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs index 88d873df15..05dd59044a 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs @@ -31,12 +31,6 @@ public IAsyncBasicConsumer GetAndRemoveConsumer(string tag) return _consumers.Remove(tag, out IAsyncBasicConsumer? consumer) ? consumer : GetDefaultOrFallbackConsumer(); } - public void Shutdown(ShutdownEventArgs reason) - { - DoShutdownConsumers(reason); - InternalShutdown(); - } - public Task ShutdownAsync(ShutdownEventArgs reason) { DoShutdownConsumers(reason); @@ -54,8 +48,6 @@ private void DoShutdownConsumers(ShutdownEventArgs reason) protected abstract void ShutdownConsumer(IAsyncBasicConsumer consumer, ShutdownEventArgs reason); - protected abstract void InternalShutdown(); - protected abstract Task InternalShutdownAsync(); // Do not inline as it's not the default case on a hot path diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs index 4ffc3ea9f9..1de847c928 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs @@ -114,53 +114,6 @@ public void Quiesce() _quiesce = true; } - public void WaitForShutdown() - { - if (_disposed) - { - return; - } - - if (_quiesce) - { - try - { - if (false == _reader.Completion.Wait(TimeSpan.FromSeconds(2))) - { - ESLog.Warn("consumer dispatcher did not shut down in a timely fashion (sync)"); - } - if (false == _worker.Wait(TimeSpan.FromSeconds(2))) - { - ESLog.Warn("consumer dispatcher did not shut down in a timely fashion (sync)"); - } - } - catch (AggregateException aex) - { - AggregateException aexf = aex.Flatten(); - bool foundUnexpectedException = false; - foreach (Exception innerAexf in aexf.InnerExceptions) - { - if (false == (innerAexf is OperationCanceledException)) - { - foundUnexpectedException = true; - break; - } - } - if (foundUnexpectedException) - { - ESLog.Warn("consumer dispatcher task had unexpected exceptions"); - } - } - catch (OperationCanceledException) - { - } - } - else - { - throw new InvalidOperationException("WaitForShutdown called but _quiesce is false"); - } - } - public async Task WaitForShutdownAsync() { if (_disposed) @@ -209,11 +162,6 @@ protected sealed override void ShutdownConsumer(IAsyncBasicConsumer consumer, Sh _writer.TryWrite(WorkStruct.CreateShutdown(consumer, reason)); } - protected override void InternalShutdown() - { - _writer.Complete(); - } - protected override Task InternalShutdownAsync() { _writer.Complete(); diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs index 4b5dd679a8..5d297fdf5b 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs @@ -62,9 +62,6 @@ ValueTask HandleBasicDeliverAsync(string consumerTag, void Quiesce(); - void Shutdown(ShutdownEventArgs reason); - void WaitForShutdown(); - Task ShutdownAsync(ShutdownEventArgs reason); Task WaitForShutdownAsync(); } diff --git a/projects/Test/Applications/MassPublish/Program.cs b/projects/Test/Applications/MassPublish/Program.cs index 980212d34a..e10bd4f28d 100644 --- a/projects/Test/Applications/MassPublish/Program.cs +++ b/projects/Test/Applications/MassPublish/Program.cs @@ -71,7 +71,7 @@ static async Task Main() consumeConnection.ConnectionShutdownAsync += ConnectionShutdownAsync; using IChannel consumeChannel = await consumeConnection.CreateChannelAsync(); - consumeChannel.ChannelShutdown += Channel_ChannelShutdown; + consumeChannel.ChannelShutdownAsync += Channel_ChannelShutdownAsync; await consumeChannel.BasicQosAsync(prefetchSize: 0, prefetchCount: 128, global: false); await consumeChannel.ExchangeDeclareAsync(exchange: ExchangeName, @@ -107,7 +107,7 @@ await consumeChannel.BasicConsumeAsync(queue: QueueName, autoAck: true, consumer publishTasks.Add(Task.Run(async () => { using IChannel publishChannel = await publishConnection.CreateChannelAsync(); - publishChannel.ChannelShutdown += Channel_ChannelShutdown; + publishChannel.ChannelShutdownAsync += Channel_ChannelShutdownAsync; await publishChannel.ConfirmSelectAsync(); @@ -173,13 +173,15 @@ private static Task ConnectionShutdownAsync(object sender, ShutdownEventArgs e) return Task.CompletedTask; } - private static void Channel_ChannelShutdown(object sender, ShutdownEventArgs e) + private static Task Channel_ChannelShutdownAsync(object sender, ShutdownEventArgs e) { if (e.Initiator != ShutdownInitiator.Application) { Console.Error.WriteLine("[ERROR] unexpected channel shutdown: {0}", e); s_consumeDoneEvent.TrySetResult(false); } + + return Task.CompletedTask; } private static Task AsyncListener_Received(object sender, BasicDeliverEventArgs @event) diff --git a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs index 47864833d3..68d7549fdb 100644 --- a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs +++ b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs @@ -112,7 +112,7 @@ async Task HandlePublishConfirmsAsynchronously() var allMessagesConfirmedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var outstandingConfirms = new LinkedList(); var semaphore = new SemaphoreSlim(1, 1); - void CleanOutstandingConfirms(ulong deliveryTag, bool multiple) + async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple) { if (debug) { @@ -120,7 +120,7 @@ void CleanOutstandingConfirms(ulong deliveryTag, bool multiple) DateTime.Now, deliveryTag, multiple); } - semaphore.Wait(); + await semaphore.WaitAsync(); try { if (multiple) @@ -158,11 +158,11 @@ void CleanOutstandingConfirms(ulong deliveryTag, bool multiple) } } - channel.BasicAcks += (sender, ea) => CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple); - channel.BasicNacks += (sender, ea) => + channel.BasicAcksAsync += (sender, ea) => CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple); + channel.BasicNacksAsync += (sender, ea) => { Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number: {ea.DeliveryTag} has been nacked (multiple: {ea.Multiple})"); - CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple); + return CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple); }; var sw = new Stopwatch(); diff --git a/projects/Test/Common/IntegrationFixture.cs b/projects/Test/Common/IntegrationFixture.cs index 6bb1ce044b..272d6e8d0e 100644 --- a/projects/Test/Common/IntegrationFixture.cs +++ b/projects/Test/Common/IntegrationFixture.cs @@ -258,7 +258,7 @@ protected void AddCallbackExceptionHandlers(IConnection conn, IChannel channel) if (channel != null) { - channel.CallbackException += (o, ea) => + channel.CallbackExceptionAsync += (o, ea) => { _channelCallbackException = ea.Exception; @@ -270,6 +270,8 @@ protected void AddCallbackExceptionHandlers(IConnection conn, IChannel channel) catch (InvalidOperationException) { } + + return Task.CompletedTask; }; } } @@ -297,7 +299,7 @@ protected void AddCallbackShutdownHandlers() if (_channel != null) { - _channel.ChannelShutdown += (o, ea) => + _channel.ChannelShutdownAsync += (o, ea) => { HandleChannelShutdown(_channel, ea, (args) => { @@ -310,6 +312,8 @@ protected void AddCallbackShutdownHandlers() { } }); + + return Task.CompletedTask; }; } } @@ -550,13 +554,14 @@ protected void HandleConnectionShutdown(IConnection conn, ShutdownEventArgs args a(args); } - protected void HandleChannelShutdown(object sender, ShutdownEventArgs args) + protected Task HandleChannelShutdownAsync(object sender, ShutdownEventArgs args) { if (args.Initiator != ShutdownInitiator.Application) { IChannel ch = (IChannel)sender; _output.WriteLine($"{_testDisplayName} channel {ch.ChannelNumber} shut down: {args}"); } + return Task.CompletedTask; } protected void HandleChannelShutdown(IChannel ch, ShutdownEventArgs args, Action a) diff --git a/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Channel/TestRecoveryEventHandlers.cs b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Channel/TestRecoveryEventHandlers.cs index 8bb46de1be..a68c786b53 100644 --- a/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Channel/TestRecoveryEventHandlers.cs +++ b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Channel/TestRecoveryEventHandlers.cs @@ -47,7 +47,11 @@ public TestRecoveryEventHandlers(ITestOutputHelper output) : base(output) public async Task TestRecoveryEventHandlers_Called() { int counter = 0; - ((AutorecoveringChannel)_channel).Recovery += (source, ea) => Interlocked.Increment(ref counter); + ((AutorecoveringChannel)_channel).RecoveryAsync += (source, ea) => + { + Interlocked.Increment(ref counter); + return Task.CompletedTask; + }; await CloseAndWaitForRecoveryAsync(); await CloseAndWaitForRecoveryAsync(); diff --git a/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Channel/TestShutdownEventHandlers.cs b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Channel/TestShutdownEventHandlers.cs index 6d8541faad..98c1c70903 100644 --- a/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Channel/TestShutdownEventHandlers.cs +++ b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Channel/TestShutdownEventHandlers.cs @@ -46,7 +46,11 @@ public TestShutdownEventHandlers(ITestOutputHelper output) : base(output) public async Task TestShutdownEventHandlersOnChannel_Called() { int counter = 0; - _channel.ChannelShutdown += (c, args) => Interlocked.Increment(ref counter); + _channel.ChannelShutdownAsync += (c, args) => + { + Interlocked.Increment(ref counter); + return Task.CompletedTask; + }; Assert.True(_channel.IsOpen); await CloseAndWaitForRecoveryAsync(); diff --git a/projects/Test/Integration/ConnectionRecovery/TestBasicAckAndBasicNack.cs b/projects/Test/Integration/ConnectionRecovery/TestBasicAckAndBasicNack.cs index eef47faa77..7fbb56f1d7 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestBasicAckAndBasicNack.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestBasicAckAndBasicNack.cs @@ -156,8 +156,16 @@ public async Task TestBasicAckEventHandlerRecovery() { await _channel.ConfirmSelectAsync(); var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - ((AutorecoveringChannel)_channel).BasicAcks += (m, args) => tcs.SetResult(true); - ((AutorecoveringChannel)_channel).BasicNacks += (m, args) => tcs.SetResult(true); + ((AutorecoveringChannel)_channel).BasicAcksAsync += (m, args) => + { + tcs.SetResult(true); + return Task.CompletedTask; + }; + ((AutorecoveringChannel)_channel).BasicNacksAsync += (m, args) => + { + tcs.SetResult(true); + return Task.CompletedTask; + }; await CloseAndWaitForRecoveryAsync(); await CloseAndWaitForRecoveryAsync(); diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index 8e7b594057..f5a45364c7 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -96,12 +96,13 @@ public async Task TestBasicRoundtripConcurrent() return Task.CompletedTask; }; - _channel.ChannelShutdown += (o, ea) => + _channel.ChannelShutdownAsync += (o, ea) => { HandleChannelShutdown(_channel, ea, (args) => { MaybeSetException(args, publish1SyncSource, publish2SyncSource); }); + return Task.CompletedTask; }; consumer.Received += (o, a) => @@ -189,12 +190,13 @@ public async Task TestBasicRoundtripConcurrentManyMessages() return Task.CompletedTask; }; - _channel.ChannelShutdown += (o, ea) => + _channel.ChannelShutdownAsync += (o, ea) => { HandleChannelShutdown(_channel, ea, (args) => { MaybeSetException(args, publish1SyncSource, publish2SyncSource); }); + return Task.CompletedTask; }; QueueDeclareOk q = await _channel.QueueDeclareAsync(queue: queueName, exclusive: false, autoDelete: true); @@ -217,12 +219,13 @@ public async Task TestBasicRoundtripConcurrentManyMessages() AddCallbackExceptionHandlers(publishConn, publishChannel); publishChannel.DefaultConsumer = new DefaultAsyncConsumer(publishChannel, "publishChannel,", _output); - publishChannel.ChannelShutdown += (o, ea) => + publishChannel.ChannelShutdownAsync += (o, ea) => { HandleChannelShutdown(publishChannel, ea, (args) => { MaybeSetException(args, publish1SyncSource, publish2SyncSource); }); + return Task.CompletedTask; }; await publishChannel.ConfirmSelectAsync(); @@ -261,12 +264,13 @@ public async Task TestBasicRoundtripConcurrentManyMessages() AddCallbackExceptionHandlers(consumeConn, consumeChannel); consumeChannel.DefaultConsumer = new DefaultAsyncConsumer(consumeChannel, "consumeChannel,", _output); - consumeChannel.ChannelShutdown += (o, ea) => + consumeChannel.ChannelShutdownAsync += (o, ea) => { HandleChannelShutdown(consumeChannel, ea, (args) => { MaybeSetException(ea, publish1SyncSource, publish2SyncSource); }); + return Task.CompletedTask; }; var consumer = new AsyncEventingBasicConsumer(consumeChannel); @@ -356,12 +360,13 @@ public async Task TestBasicRejectAsync() return Task.CompletedTask; }; - _channel.ChannelShutdown += (o, ea) => + _channel.ChannelShutdownAsync += (o, ea) => { HandleChannelShutdown(_channel, ea, (args) => { MaybeSetException(args, publishSyncSource); }); + return Task.CompletedTask; }; var consumer = new AsyncEventingBasicConsumer(_channel); @@ -453,12 +458,13 @@ public async Task TestBasicAckAsync() return Task.CompletedTask; }; - _channel.ChannelShutdown += (o, ea) => + _channel.ChannelShutdownAsync += (o, ea) => { HandleChannelShutdown(_channel, ea, (args) => { MaybeSetException(args, publishSyncSource); }); + return Task.CompletedTask; }; await _channel.ConfirmSelectAsync(); @@ -518,12 +524,13 @@ public async Task TestBasicNackAsync() return Task.CompletedTask; }; - _channel.ChannelShutdown += (o, ea) => + _channel.ChannelShutdownAsync += (o, ea) => { HandleChannelShutdown(_channel, ea, (args) => { MaybeSetException(ea, publishSyncSource); }); + return Task.CompletedTask; }; var consumer = new AsyncEventingBasicConsumer(_channel); @@ -625,12 +632,13 @@ public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650() return Task.CompletedTask; }; - _channel.ChannelShutdown += (o, ea) => + _channel.ChannelShutdownAsync += (o, ea) => { HandleChannelShutdown(_channel, ea, (args) => { MaybeSetException(ea, tcs); }); + return Task.CompletedTask; }; // queue1 -> produce click to queue2 diff --git a/projects/Test/Integration/TestAsyncConsumerExceptions.cs b/projects/Test/Integration/TestAsyncConsumerExceptions.cs index 7cec3a6cd3..71251fbebf 100644 --- a/projects/Test/Integration/TestAsyncConsumerExceptions.cs +++ b/projects/Test/Integration/TestAsyncConsumerExceptions.cs @@ -102,13 +102,15 @@ protected async Task TestExceptionHandlingWith(IAsyncBasicConsumer consumer, try { string q = await _channel.QueueDeclareAsync(string.Empty, false, true, false); - _channel.CallbackException += (ch, evt) => + _channel.CallbackExceptionAsync += (ch, evt) => { // _output.WriteLine($"[INFO] _channel.CallbackException: {evt.Exception}"); if (evt.Exception == TestException) { tcs.SetResult(true); } + + return Task.CompletedTask; }; string tag = await _channel.BasicConsumeAsync(q, true, string.Empty, false, false, null, consumer); diff --git a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs index b9285901a9..7152de9445 100644 --- a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs +++ b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs @@ -70,9 +70,10 @@ private void OnTokenCanceled() _onReceivedTcs.TrySetCanceled(); } - private void ConsumerChannelOnCallbackException(object sender, CallbackExceptionEventArgs e) + private Task ConsumerChannelOnCallbackExceptionAsync(object sender, CallbackExceptionEventArgs e) { _onCallbackExceptionTcs.TrySetResult(true); + return Task.CompletedTask; } private Task AsyncConsumerOnReceived(object sender, BasicDeliverEventArgs @event) @@ -96,7 +97,7 @@ public async Task TestAsyncEventingBasicConsumer_GH1038() await _channel.QueueDeclareAsync(queueName, false, false, true, null); await _channel.QueueBindAsync(queueName, exchangeName, routingKey, null); - _channel.CallbackException += ConsumerChannelOnCallbackException; + _channel.CallbackExceptionAsync += ConsumerChannelOnCallbackExceptionAsync; //async subscriber var consumer = new AsyncEventingBasicConsumer(_channel); diff --git a/projects/Test/Integration/TestBasicPublish.cs b/projects/Test/Integration/TestBasicPublish.cs index 648dcbf45b..4f582297ba 100644 --- a/projects/Test/Integration/TestBasicPublish.cs +++ b/projects/Test/Integration/TestBasicPublish.cs @@ -214,12 +214,13 @@ public async Task TestMaxInboundMessageBodySize() using (IChannel channel = await conn.CreateChannelAsync()) { - channel.ChannelShutdown += (o, a) => + channel.ChannelShutdownAsync += (o, a) => { sawChannelShutdown = true; + return Task.CompletedTask; }; - channel.CallbackException += (o, a) => + channel.CallbackExceptionAsync += (o, a) => { throw new XunitException("Unexpected channel.CallbackException"); }; diff --git a/projects/Test/Integration/TestChannelShutdown.cs b/projects/Test/Integration/TestChannelShutdown.cs index a08437af12..37af0ec193 100644 --- a/projects/Test/Integration/TestChannelShutdown.cs +++ b/projects/Test/Integration/TestChannelShutdown.cs @@ -50,9 +50,10 @@ public async Task TestConsumerDispatcherShutdown() var autorecoveringChannel = (AutorecoveringChannel)_channel; var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _channel.ChannelShutdown += (channel, args) => + _channel.ChannelShutdownAsync += (channel, args) => { tcs.SetResult(true); + return Task.CompletedTask; }; Assert.False(autorecoveringChannel.ConsumerDispatcher.IsShutdown, "dispatcher should NOT be shut down before CloseAsync"); diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs index 5cf9ab2c3b..d793b1654e 100644 --- a/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs @@ -56,14 +56,16 @@ public async Task ConcurrentPublishSingleChannel() { int publishAckCount = 0; - _channel.BasicAcks += (object sender, BasicAckEventArgs e) => + _channel.BasicAcksAsync += (object sender, BasicAckEventArgs e) => { Interlocked.Increment(ref publishAckCount); + return Task.CompletedTask; }; - _channel.BasicNacks += (object sender, BasicNackEventArgs e) => + _channel.BasicNacksAsync += (object sender, BasicNackEventArgs e) => { _output.WriteLine($"channel #{_channel.ChannelNumber} saw a nack, deliveryTag: {e.DeliveryTag}, multiple: {e.Multiple}"); + return Task.CompletedTask; }; await _channel.ConfirmSelectAsync(trackConfirmations: false); diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs index 0ab5532297..daa95cdf82 100644 --- a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs @@ -112,7 +112,7 @@ private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, in { using (IChannel ch = await _conn.CreateChannelAsync()) { - ch.ChannelShutdown += (o, ea) => + ch.ChannelShutdownAsync += (o, ea) => { HandleChannelShutdown(ch, ea, (args) => { @@ -121,22 +121,25 @@ private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, in tcs.TrySetException(args.Exception); } }); + return Task.CompletedTask; }; await ch.ConfirmSelectAsync(trackConfirmations: false); - ch.BasicAcks += (object sender, BasicAckEventArgs e) => + ch.BasicAcksAsync += (object sender, BasicAckEventArgs e) => { if (e.DeliveryTag >= _messageCount) { tcs.SetResult(true); } + return Task.CompletedTask; }; - ch.BasicNacks += (object sender, BasicNackEventArgs e) => + ch.BasicNacksAsync += (object sender, BasicNackEventArgs e) => { tcs.SetResult(false); _output.WriteLine($"channel #{ch.ChannelNumber} saw a nack, deliveryTag: {e.DeliveryTag}, multiple: {e.Multiple}"); + return Task.CompletedTask; }; QueueDeclareOk q = await ch.QueueDeclareAsync(queue: string.Empty, passive: false, durable: false, exclusive: true, autoDelete: true, arguments: null); diff --git a/projects/Test/Integration/TestConnectionShutdown.cs b/projects/Test/Integration/TestConnectionShutdown.cs index 6c2e67fe45..ec5099fd6f 100644 --- a/projects/Test/Integration/TestConnectionShutdown.cs +++ b/projects/Test/Integration/TestConnectionShutdown.cs @@ -51,9 +51,10 @@ public TestConnectionShutdown(ITestOutputHelper output) : base(output) public async Task TestCleanClosureWithSocketClosedOutOfBand() { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _channel.ChannelShutdown += (channel, args) => + _channel.ChannelShutdownAsync += (channel, args) => { tcs.SetResult(true); + return Task.CompletedTask; }; var c = (AutorecoveringConnection)_conn; @@ -75,9 +76,10 @@ public async Task TestCleanClosureWithSocketClosedOutOfBand() public async Task TestAbortWithSocketClosedOutOfBand() { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _channel.ChannelShutdown += (channel, args) => + _channel.ChannelShutdownAsync += (channel, args) => { tcs.SetResult(true); + return Task.CompletedTask; }; var c = (AutorecoveringConnection)_conn; @@ -94,9 +96,10 @@ public async Task TestDisposedWithSocketClosedOutOfBand() { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _channel.ChannelShutdown += (channel, args) => + _channel.ChannelShutdownAsync += (channel, args) => { tcs.SetResult(true); + return Task.CompletedTask; }; var c = (AutorecoveringConnection)_conn; @@ -120,9 +123,10 @@ public async Task TestShutdownSignalPropagationToChannels() { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _channel.ChannelShutdown += (channel, args) => + _channel.ChannelShutdownAsync += (channel, args) => { tcs.SetResult(true); + return Task.CompletedTask; }; await _conn.CloseAsync(); @@ -135,9 +139,10 @@ public async Task TestShutdownSignalPropagationToChannelsUsingDispose() { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _channel.ChannelShutdown += (channel, args) => + _channel.ChannelShutdownAsync += (channel, args) => { tcs.SetResult(true); + return Task.CompletedTask; }; _conn.Dispose(); @@ -152,9 +157,10 @@ public async Task TestConsumerDispatcherShutdown() var m = (AutorecoveringChannel)_channel; var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _channel.ChannelShutdown += (channel, args) => + _channel.ChannelShutdownAsync += (channel, args) => { tcs.SetResult(true); + return Task.CompletedTask; }; Assert.False(m.ConsumerDispatcher.IsShutdown, "dispatcher should NOT be shut down before CloseAsync"); await _conn.CloseAsync(); diff --git a/projects/Test/Integration/TestFloodPublishing.cs b/projects/Test/Integration/TestFloodPublishing.cs index 3b4521ef35..d301ca57f4 100644 --- a/projects/Test/Integration/TestFloodPublishing.cs +++ b/projects/Test/Integration/TestFloodPublishing.cs @@ -78,7 +78,7 @@ public async Task TestUnthrottledFloodPublishing() return Task.CompletedTask; }; - _channel.ChannelShutdown += (o, ea) => + _channel.ChannelShutdownAsync += (o, ea) => { HandleChannelShutdown(_channel, ea, (args) => { @@ -87,6 +87,8 @@ public async Task TestUnthrottledFloodPublishing() sawUnexpectedShutdown = true; } }); + + return Task.CompletedTask; }; var stopwatch = Stopwatch.StartNew(); @@ -145,7 +147,7 @@ public async Task TestMultithreadFloodPublishing() return Task.CompletedTask; }; - _channel.ChannelShutdown += (o, ea) => + _channel.ChannelShutdownAsync += (o, ea) => { HandleChannelShutdown(_channel, ea, (args) => { @@ -155,6 +157,7 @@ public async Task TestMultithreadFloodPublishing() allMessagesSeenTcs.TrySetException(args.Exception); } }); + return Task.CompletedTask; }; QueueDeclareOk q = await _channel.QueueDeclareAsync(queue: string.Empty, @@ -183,7 +186,7 @@ public async Task TestMultithreadFloodPublishing() { await publishChannel.ConfirmSelectAsync(); - publishChannel.ChannelShutdown += (o, ea) => + publishChannel.ChannelShutdownAsync += (o, ea) => { HandleChannelShutdown(publishChannel, ea, (args) => { @@ -193,6 +196,7 @@ public async Task TestMultithreadFloodPublishing() allMessagesSeenTcs.TrySetException(args.Exception); } }); + return Task.CompletedTask; }; for (int i = 0; i < publishCount && false == stop; i++) @@ -233,7 +237,7 @@ public async Task TestMultithreadFloodPublishing() using (IChannel consumeChannel = await consumeConnection.CreateChannelAsync()) { - consumeChannel.ChannelShutdown += (o, ea) => + consumeChannel.ChannelShutdownAsync += (o, ea) => { HandleChannelShutdown(consumeChannel, ea, (args) => { @@ -242,6 +246,7 @@ public async Task TestMultithreadFloodPublishing() allMessagesSeenTcs.TrySetException(args.Exception); } }); + return Task.CompletedTask; }; var consumer = new AsyncEventingBasicConsumer(consumeChannel); diff --git a/projects/Test/Integration/TestInvalidAck.cs b/projects/Test/Integration/TestInvalidAck.cs index 073a57880b..d17fb7b59e 100644 --- a/projects/Test/Integration/TestInvalidAck.cs +++ b/projects/Test/Integration/TestInvalidAck.cs @@ -48,11 +48,12 @@ public async Task TestAckWithUnknownConsumerTagAndMultipleFalse() var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); bool shutdownFired = false; ShutdownEventArgs shutdownArgs = null; - _channel.ChannelShutdown += (s, args) => + _channel.ChannelShutdownAsync += (s, args) => { shutdownFired = true; shutdownArgs = args; tcs.SetResult(true); + return Task.CompletedTask; }; await _channel.BasicAckAsync(123456, false); diff --git a/projects/Test/Integration/TestMainLoop.cs b/projects/Test/Integration/TestMainLoop.cs index 742515836e..24e611265f 100644 --- a/projects/Test/Integration/TestMainLoop.cs +++ b/projects/Test/Integration/TestMainLoop.cs @@ -75,11 +75,17 @@ public async Task TestCloseWithFaultyConsumer() QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, false); CallbackExceptionEventArgs ea = null; - _channel.CallbackException += async (_, evt) => + Task closeTask = null; + _channel.CallbackExceptionAsync += (_, evt) => { ea = evt; - await _channel.CloseAsync(); + /* + * NOTE: + * await-ing CloseAsync here WILL result in a deadlock + */ + closeTask = _channel.CloseAsync(); tcs.SetResult(true); + return Task.CompletedTask; }; await _channel.BasicConsumeAsync(q, true, new FaultyConsumer(_channel)); @@ -88,6 +94,7 @@ public async Task TestCloseWithFaultyConsumer() await WaitAsync(tcs, "CallbackException"); Assert.NotNull(ea); + await closeTask.WaitAsync(WaitSpan); Assert.False(_channel.IsOpen); Assert.Equal(200, _channel.CloseReason.ReplyCode); } diff --git a/projects/Test/Integration/TestPublishSharedChannelAsync.cs b/projects/Test/Integration/TestPublishSharedChannelAsync.cs index 3c508d087a..939ac65558 100644 --- a/projects/Test/Integration/TestPublishSharedChannelAsync.cs +++ b/projects/Test/Integration/TestPublishSharedChannelAsync.cs @@ -84,7 +84,7 @@ public async Task MultiThreadPublishOnSharedChannel() { try { - channel.ChannelShutdown += HandleChannelShutdown; + channel.ChannelShutdownAsync += HandleChannelShutdownAsync; await channel.ExchangeDeclareAsync(ExchangeName.Value, ExchangeType.Topic, passive: false, durable: false, autoDelete: true, noWait: false, arguments: null); await channel.QueueDeclareAsync(QueueName, exclusive: false, autoDelete: true); diff --git a/projects/Test/Integration/TestPublisherConfirms.cs b/projects/Test/Integration/TestPublisherConfirms.cs index 61a13a446d..43dfe5cac6 100644 --- a/projects/Test/Integration/TestPublisherConfirms.cs +++ b/projects/Test/Integration/TestPublisherConfirms.cs @@ -127,9 +127,10 @@ await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false, // number of event handler invocations int c = 0; - ch.BasicAcks += (_, args) => + ch.BasicAcksAsync += (_, args) => { Interlocked.Increment(ref c); + return Task.CompletedTask; }; try diff --git a/projects/Test/Integration/TestQueueDeclare.cs b/projects/Test/Integration/TestQueueDeclare.cs index fc6315983b..8c1cffff57 100644 --- a/projects/Test/Integration/TestQueueDeclare.cs +++ b/projects/Test/Integration/TestQueueDeclare.cs @@ -74,7 +74,7 @@ public async Task TestConcurrentQueueDeclareAndBindAsync() return Task.CompletedTask; }; - _channel.ChannelShutdown += (o, ea) => + _channel.ChannelShutdownAsync += (o, ea) => { HandleChannelShutdown(_channel, ea, (args) => { @@ -83,6 +83,8 @@ public async Task TestConcurrentQueueDeclareAndBindAsync() sawShutdown = true; } }); + + return Task.CompletedTask; }; var tasks = new List(); diff --git a/projects/Test/SequentialIntegration/TestConnectionRecovery.cs b/projects/Test/SequentialIntegration/TestConnectionRecovery.cs index dc1831a7b0..ce66f0c291 100644 --- a/projects/Test/SequentialIntegration/TestConnectionRecovery.cs +++ b/projects/Test/SequentialIntegration/TestConnectionRecovery.cs @@ -90,12 +90,13 @@ public async Task TestChannelAfterDispose_GH1086() { TaskCompletionSource sawChannelShutdownTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - void _channel_ChannelShutdown(object sender, ShutdownEventArgs e) + Task _channel_ChannelShutdownAsync(object sender, ShutdownEventArgs e) { sawChannelShutdownTcs.TrySetResult(true); + return Task.CompletedTask; } - _channel.ChannelShutdown += _channel_ChannelShutdown; + _channel.ChannelShutdownAsync += _channel_ChannelShutdownAsync; Assert.True(_channel.IsOpen);