diff --git a/src/NServiceBus.Transport.PostgreSql/PostgreSqlTransportInfrastructure.cs b/src/NServiceBus.Transport.PostgreSql/PostgreSqlTransportInfrastructure.cs index df8e36f0a..d8c4d94af 100644 --- a/src/NServiceBus.Transport.PostgreSql/PostgreSqlTransportInfrastructure.cs +++ b/src/NServiceBus.Transport.PostgreSql/PostgreSqlTransportInfrastructure.cs @@ -10,7 +10,8 @@ using Logging; using NServiceBus.Transport.Sql.Shared; -class PostgreSqlTransportInfrastructure : TransportInfrastructure +[Janitor.SkipWeaving] +class PostgreSqlTransportInfrastructure : TransportInfrastructure, IDisposable { //The limit is 55=63-max(8,8). 63 is the built-in PostgreSQL limit and we also need to reserve space for: // - "_Seq_seq" suffix (8 bytes) used in the auto-created sequence for the main queue table @@ -47,12 +48,27 @@ public PostgreSqlTransportInfrastructure(PostgreSqlTransport transport, HostSett public override async Task Shutdown(CancellationToken cancellationToken = default) { - await Task.WhenAll(Receivers.Values.Select(pump => pump.StopReceive(cancellationToken))) - .ConfigureAwait(false); + try + { + await Task.WhenAll(Receivers.Values.Select(pump => pump.StopReceive(cancellationToken))) + .ConfigureAwait(false); - if (dueDelayedMessageProcessor != null) + if (dueDelayedMessageProcessor != null) + { + await dueDelayedMessageProcessor.Stop(cancellationToken).ConfigureAwait(false); + } + } + finally + { + Dispose(); + } + } + + public void Dispose() + { + foreach (var r in Receivers.Values.Cast()) { - await dueDelayedMessageProcessor.Stop(cancellationToken).ConfigureAwait(false); + r.Dispose(); } } diff --git a/src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs b/src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs index 613a840a9..a9b691341 100644 --- a/src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs +++ b/src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs @@ -5,7 +5,8 @@ namespace NServiceBus.Transport.Sql.Shared using System.Threading.Tasks; using Logging; - class MessageReceiver : IMessageReceiver + [Janitor.SkipWeaving] + class MessageReceiver : IMessageReceiver, IDisposable { public MessageReceiver( TransportDefinition transport, @@ -125,35 +126,45 @@ public async Task ChangeConcurrency(PushRuntimeSettings newLimitations, public async Task StopReceive(CancellationToken cancellationToken = default) { - if (messageReceivingCancellationTokenSource == null) + try { - // already stopped or never started - return; - } - - await messageReceivingCancellationTokenSource.CancelAsync().ConfigureAwait(false); + if (messageReceivingCancellationTokenSource == null) + { + // already stopped or never started + return; + } - await using (cancellationToken.Register(() => messageProcessingCancellationTokenSource?.Cancel())) - { - await messageReceivingTask.ConfigureAwait(false); + await messageReceivingCancellationTokenSource.CancelAsync().ConfigureAwait(false); - while (concurrencyLimiter.CurrentCount != maxConcurrency) + await using (cancellationToken.Register(() => messageProcessingCancellationTokenSource?.Cancel())) { - // Pass CancellationToken.None so that no exceptions will be thrown while waiting - // for the message receiver to gracefully shut down. The cancellation tokens passed to - // ProcessMessages (and thus the message processing pipelines) will be responsible - // for more forcefully shutting down message processing after the user's shutdown SLA - // is reached - await Task.Delay(50, CancellationToken.None).ConfigureAwait(false); + await messageReceivingTask.ConfigureAwait(false); + + while (concurrencyLimiter.CurrentCount != maxConcurrency) + { + // Pass CancellationToken.None so that no exceptions will be thrown while waiting + // for the message receiver to gracefully shut down. The cancellation tokens passed to + // ProcessMessages (and thus the message processing pipelines) will be responsible + // for more forcefully shutting down message processing after the user's shutdown SLA + // is reached + await Task.Delay(50, CancellationToken.None).ConfigureAwait(false); + } } } + finally + { + Dispose(); + } + } - messageReceivingCircuitBreaker.Dispose(); - messageProcessingCircuitBreaker.Dispose(); - concurrencyLimiter.Dispose(); - messageReceivingCancellationTokenSource.Dispose(); + public void Dispose() + { + messageReceivingCircuitBreaker?.Dispose(); + messageProcessingCircuitBreaker?.Dispose(); + concurrencyLimiter?.Dispose(); + messageReceivingCancellationTokenSource?.Dispose(); messageReceivingCancellationTokenSource = null; - messageProcessingCancellationTokenSource.Dispose(); + messageProcessingCancellationTokenSource?.Dispose(); } async Task ReceiveMessagesAndSwallowExceptions(CancellationToken messageReceivingCancellationToken) diff --git a/src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs b/src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs index e7c180d6c..aa6b33d19 100644 --- a/src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs +++ b/src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs @@ -10,7 +10,8 @@ namespace NServiceBus.Transport.SqlServer using NServiceBus.Transport.Sql.Shared; using Transport; - class SqlServerTransportInfrastructure : TransportInfrastructure + [Janitor.SkipWeaving] + class SqlServerTransportInfrastructure : TransportInfrastructure, IDisposable { public SqlServerTransportInfrastructure(SqlServerTransport transport, HostSettings hostSettings, ReceiveSettings[] receiveSettings, string[] sendingAddresses) { @@ -337,12 +338,27 @@ public void ConfigureSendInfrastructure() public override async Task Shutdown(CancellationToken cancellationToken = default) { - await Task.WhenAll(Receivers.Values.Select(pump => pump.StopReceive(cancellationToken))) - .ConfigureAwait(false); + try + { + await Task.WhenAll(Receivers.Values.Select(pump => pump.StopReceive(cancellationToken))) + .ConfigureAwait(false); - if (dueDelayedMessageProcessor != null) + if (dueDelayedMessageProcessor != null) + { + await dueDelayedMessageProcessor.Stop(cancellationToken).ConfigureAwait(false); + } + } + finally + { + Dispose(); + } + } + + public void Dispose() + { + foreach (var r in Receivers.Values.Cast()) { - await dueDelayedMessageProcessor.Stop(cancellationToken).ConfigureAwait(false); + r.Dispose(); } }