diff --git a/src/NServiceBus.Transport.Sql.Shared/Receiving/AsyncCountdownLatch.cs b/src/NServiceBus.Transport.Sql.Shared/Receiving/AsyncCountdownLatch.cs index 512cb3a94..901079fe5 100644 --- a/src/NServiceBus.Transport.Sql.Shared/Receiving/AsyncCountdownLatch.cs +++ b/src/NServiceBus.Transport.Sql.Shared/Receiving/AsyncCountdownLatch.cs @@ -1,5 +1,6 @@ namespace NServiceBus.Transport.Sql.Shared; +using System; using System.Threading; using System.Threading.Tasks; @@ -19,13 +20,41 @@ public AsyncCountdownLatch(int count) } } - public Task WaitAsync(CancellationToken cancellationToken = default) => completionSource.Task; +#pragma warning disable PS0003 + public Task WaitAsync(CancellationToken cancellationToken) +#pragma warning restore PS0003 + { + _ = cancellationToken.Register(completionSource.SetResult); + return completionSource.Task; + } + + public Signaler GetSignaler() => new(this); - public void Signal() + void Signal() { if (Interlocked.Decrement(ref count) == 0) { completionSource.SetResult(); } } + + public class Signaler(AsyncCountdownLatch parent) : IDisposable + { + bool signalled; + + public void Signal() + { + parent.Signal(); + signalled = true; + } + + public void Dispose() + { + if (!signalled) + { + parent.Signal(); + signalled = true; + } + } + } } \ No newline at end of file diff --git a/src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs b/src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs index 9829d0c6c..a8c2c7eb2 100644 --- a/src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs +++ b/src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs @@ -204,13 +204,11 @@ async Task ReceiveMessages(CancellationToken messageReceivingCancellationToken) ? 1 : messageCount; - bool shouldWaitForReceiveTasks = true; var receiveLatch = new AsyncCountdownLatch(maximumConcurrentProcessing); for (var i = 0; i < maximumConcurrentProcessing; i++) { if (stopBatchCancellationSource.IsCancellationRequested) { - shouldWaitForReceiveTasks = false; break; } @@ -222,17 +220,15 @@ async Task ReceiveMessages(CancellationToken messageReceivingCancellationToken) localConcurrencyLimiter, receiveLatch, messageProcessingCancellationTokenSource.Token); } - if (shouldWaitForReceiveTasks) - { - // Wait for all receive operations to complete before returning (and thus peeking again) - await receiveLatch.WaitAsync(CancellationToken.None).ConfigureAwait(false); - } + // Wait for all receive operations to complete before returning (and thus peeking again) + await receiveLatch.WaitAsync(stopBatchCancellationSource.Token).ConfigureAwait(false); } async Task ProcessMessagesSwallowExceptionsAndReleaseConcurrencyLimiter( CancellationTokenSource stopBatchCancellationTokenSource, SemaphoreSlim localConcurrencyLimiter, AsyncCountdownLatch receiveLatch, CancellationToken messageProcessingCancellationToken) { + using var latchSignaler = receiveLatch.GetSignaler(); try { try @@ -241,7 +237,7 @@ async Task ProcessMessagesSwallowExceptionsAndReleaseConcurrencyLimiter( // in combination with TransactionScope will apply connection pooling and enlistment synchronous in ctor. await Task.Yield(); - await processStrategy.ProcessMessage(stopBatchCancellationTokenSource, receiveLatch, + await processStrategy.ProcessMessage(stopBatchCancellationTokenSource, latchSignaler, messageProcessingCancellationToken) .ConfigureAwait(false); diff --git a/src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessStrategy.cs b/src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessStrategy.cs index 47d00df9c..fb531d1ca 100644 --- a/src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessStrategy.cs +++ b/src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessStrategy.cs @@ -36,7 +36,7 @@ public void Init(TableBasedQueue inputQueue, TableBasedQueue errorQueue, OnMessa } public abstract Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource, - AsyncCountdownLatch receiveLatch, CancellationToken cancellationToken = default); + AsyncCountdownLatch.Signaler receiveLatch, CancellationToken cancellationToken = default); protected async Task TryHandleMessage(Message message, TransportTransaction transportTransaction, ContextBag context, CancellationToken cancellationToken = default) { diff --git a/src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessWithNativeTransaction.cs b/src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessWithNativeTransaction.cs index 5552b1f0e..85b75553a 100644 --- a/src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessWithNativeTransaction.cs +++ b/src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessWithNativeTransaction.cs @@ -22,28 +22,18 @@ public ProcessWithNativeTransaction(TransactionOptions transactionOptions, DbCon } public override async Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource, - AsyncCountdownLatch receiveLatch, CancellationToken cancellationToken = default) + AsyncCountdownLatch.Signaler receiveLatch, CancellationToken cancellationToken = default) { Message message = null; var context = new ContextBag(); - var hasLatchBeenSignalled = false; try { using (var connection = await connectionFactory.OpenNewConnection(cancellationToken).ConfigureAwait(false)) using (var transaction = connection.BeginTransaction(isolationLevel)) { - MessageReadResult receiveResult; - try - { - receiveResult = await InputQueue.TryReceive(connection, transaction, cancellationToken) - .ConfigureAwait(false); - } - finally - { - receiveLatch.Signal(); - hasLatchBeenSignalled = true; - } + var receiveResult = await InputQueue.TryReceive(connection, transaction, cancellationToken).ConfigureAwait(false); + receiveLatch.Signal(); if (receiveResult == MessageReadResult.NoMessage) { @@ -89,13 +79,6 @@ public override async Task ProcessMessage(CancellationTokenSource stopBatchCance } failureInfoStorage.RecordFailureInfoForMessage(message.TransportId, ex, context); } - finally - { - if (!hasLatchBeenSignalled) - { - receiveLatch.Signal(); - } - } } async Task TryProcess(Message message, TransportTransaction transportTransaction, ContextBag context, CancellationToken cancellationToken) diff --git a/src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessWithNoTransaction.cs b/src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessWithNoTransaction.cs index b6c183119..15037ac1f 100644 --- a/src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessWithNoTransaction.cs +++ b/src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessWithNoTransaction.cs @@ -17,11 +17,10 @@ public ProcessWithNoTransaction(DbConnectionFactory connectionFactory, FailureIn } public override async Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource, - AsyncCountdownLatch receiveLatch, CancellationToken cancellationToken = default) + AsyncCountdownLatch.Signaler receiveLatch, CancellationToken cancellationToken = default) { Message message = null; var context = new ContextBag(); - var hasLatchBeenSignalled = false; using (var connection = await connectionFactory.OpenNewConnection(cancellationToken).ConfigureAwait(false)) { @@ -29,17 +28,8 @@ public override async Task ProcessMessage(CancellationTokenSource stopBatchCance { using (var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted)) { - MessageReadResult receiveResult; - try - { - receiveResult = await InputQueue.TryReceive(connection, transaction, cancellationToken) - .ConfigureAwait(false); - } - finally - { - receiveLatch.Signal(); - hasLatchBeenSignalled = true; - } + var receiveResult = await InputQueue.TryReceive(connection, transaction, cancellationToken).ConfigureAwait(false); + receiveLatch.Signal(); if (receiveResult == MessageReadResult.NoMessage) { @@ -89,13 +79,6 @@ await ErrorQueue // Since this is TransactionMode.None, we don't care whether error handling says handled or retry. Message is gone either way. _ = await HandleError(ex, message, transportTransaction, 1, context, cancellationToken).ConfigureAwait(false); } - finally - { - if (!hasLatchBeenSignalled) - { - receiveLatch.Signal(); - } - } } } diff --git a/src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessWithTransactionScope.cs b/src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessWithTransactionScope.cs index 897b4c3ba..1cceb26f2 100644 --- a/src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessWithTransactionScope.cs +++ b/src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessWithTransactionScope.cs @@ -18,27 +18,18 @@ public ProcessWithTransactionScope(TransactionOptions transactionOptions, DbConn } public override async Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource, - AsyncCountdownLatch receiveLatch, CancellationToken cancellationToken = default) + AsyncCountdownLatch.Signaler receiveLatch, CancellationToken cancellationToken = default) { Message message = null; var context = new ContextBag(); - var hasLatchBeenSignalled = false; try { using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew, transactionOptions, TransactionScopeAsyncFlowOption.Enabled)) using (var connection = await connectionFactory.OpenNewConnection(cancellationToken).ConfigureAwait(false)) { - MessageReadResult receiveResult; - try - { - receiveResult = await InputQueue.TryReceive(connection, null, cancellationToken).ConfigureAwait(false); - } - finally - { - receiveLatch.Signal(); - hasLatchBeenSignalled = true; - } + var receiveResult = await InputQueue.TryReceive(connection, null, cancellationToken).ConfigureAwait(false); + receiveLatch.Signal(); if (receiveResult == MessageReadResult.NoMessage) { @@ -81,13 +72,6 @@ public override async Task ProcessMessage(CancellationTokenSource stopBatchCance } failureInfoStorage.RecordFailureInfoForMessage(message.TransportId, ex, context); } - finally - { - if (!hasLatchBeenSignalled) - { - receiveLatch.Signal(); - } - } } async Task TryProcess(Message message, TransportTransaction transportTransaction, ContextBag context, CancellationToken cancellationToken)