Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace NServiceBus.Transport.Sql.Shared;

using System;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -19,13 +20,41 @@ public AsyncCountdownLatch(int count)
}
}

public Task WaitAsync(CancellationToken cancellationToken = default) => completionSource.Task;
#pragma warning disable PS0003
public Task WaitAsync(CancellationToken cancellationToken)
#pragma warning restore PS0003
{
_ = cancellationToken.Register(completionSource.SetResult);
return completionSource.Task;
}

public Signaler GetSignaler() => new(this);

public void Signal()
void Signal()
{
if (Interlocked.Decrement(ref count) == 0)
{
completionSource.SetResult();
}
}

public class Signaler(AsyncCountdownLatch parent) : IDisposable
{
bool signalled;

public void Signal()
{
parent.Signal();
signalled = true;
}

public void Dispose()
{
if (!signalled)
{
parent.Signal();
signalled = true;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,11 @@ async Task ReceiveMessages(CancellationToken messageReceivingCancellationToken)
? 1
: messageCount;

bool shouldWaitForReceiveTasks = true;
var receiveLatch = new AsyncCountdownLatch(maximumConcurrentProcessing);
for (var i = 0; i < maximumConcurrentProcessing; i++)
{
if (stopBatchCancellationSource.IsCancellationRequested)
{
shouldWaitForReceiveTasks = false;
break;
}

Expand All @@ -222,17 +220,15 @@ async Task ReceiveMessages(CancellationToken messageReceivingCancellationToken)
localConcurrencyLimiter, receiveLatch, messageProcessingCancellationTokenSource.Token);
}

if (shouldWaitForReceiveTasks)
{
// Wait for all receive operations to complete before returning (and thus peeking again)
await receiveLatch.WaitAsync(CancellationToken.None).ConfigureAwait(false);
}
// Wait for all receive operations to complete before returning (and thus peeking again)
await receiveLatch.WaitAsync(stopBatchCancellationSource.Token).ConfigureAwait(false);
}

async Task ProcessMessagesSwallowExceptionsAndReleaseConcurrencyLimiter(
CancellationTokenSource stopBatchCancellationTokenSource, SemaphoreSlim localConcurrencyLimiter,
AsyncCountdownLatch receiveLatch, CancellationToken messageProcessingCancellationToken)
{
using var latchSignaler = receiveLatch.GetSignaler();
try
{
try
Expand All @@ -241,7 +237,7 @@ async Task ProcessMessagesSwallowExceptionsAndReleaseConcurrencyLimiter(
// in combination with TransactionScope will apply connection pooling and enlistment synchronous in ctor.
await Task.Yield();

await processStrategy.ProcessMessage(stopBatchCancellationTokenSource, receiveLatch,
await processStrategy.ProcessMessage(stopBatchCancellationTokenSource, latchSignaler,
messageProcessingCancellationToken)
.ConfigureAwait(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void Init(TableBasedQueue inputQueue, TableBasedQueue errorQueue, OnMessa
}

public abstract Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource,
AsyncCountdownLatch receiveLatch, CancellationToken cancellationToken = default);
AsyncCountdownLatch.Signaler receiveLatch, CancellationToken cancellationToken = default);

protected async Task<bool> TryHandleMessage(Message message, TransportTransaction transportTransaction, ContextBag context, CancellationToken cancellationToken = default)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,18 @@ public ProcessWithNativeTransaction(TransactionOptions transactionOptions, DbCon
}

public override async Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource,
AsyncCountdownLatch receiveLatch, CancellationToken cancellationToken = default)
AsyncCountdownLatch.Signaler receiveLatch, CancellationToken cancellationToken = default)
{
Message message = null;
var context = new ContextBag();
var hasLatchBeenSignalled = false;

try
{
using (var connection = await connectionFactory.OpenNewConnection(cancellationToken).ConfigureAwait(false))
using (var transaction = connection.BeginTransaction(isolationLevel))
{
MessageReadResult receiveResult;
try
{
receiveResult = await InputQueue.TryReceive(connection, transaction, cancellationToken)
.ConfigureAwait(false);
}
finally
{
receiveLatch.Signal();
hasLatchBeenSignalled = true;
}
var receiveResult = await InputQueue.TryReceive(connection, transaction, cancellationToken).ConfigureAwait(false);
receiveLatch.Signal();

if (receiveResult == MessageReadResult.NoMessage)
{
Expand Down Expand Up @@ -89,13 +79,6 @@ public override async Task ProcessMessage(CancellationTokenSource stopBatchCance
}
failureInfoStorage.RecordFailureInfoForMessage(message.TransportId, ex, context);
}
finally
{
if (!hasLatchBeenSignalled)
{
receiveLatch.Signal();
}
}
}

async Task<bool> TryProcess(Message message, TransportTransaction transportTransaction, ContextBag context, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,19 @@ public ProcessWithNoTransaction(DbConnectionFactory connectionFactory, FailureIn
}

public override async Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource,
AsyncCountdownLatch receiveLatch, CancellationToken cancellationToken = default)
AsyncCountdownLatch.Signaler receiveLatch, CancellationToken cancellationToken = default)
{
Message message = null;
var context = new ContextBag();
var hasLatchBeenSignalled = false;

using (var connection = await connectionFactory.OpenNewConnection(cancellationToken).ConfigureAwait(false))
{
try
{
using (var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted))
{
MessageReadResult receiveResult;
try
{
receiveResult = await InputQueue.TryReceive(connection, transaction, cancellationToken)
.ConfigureAwait(false);
}
finally
{
receiveLatch.Signal();
hasLatchBeenSignalled = true;
}
var receiveResult = await InputQueue.TryReceive(connection, transaction, cancellationToken).ConfigureAwait(false);
receiveLatch.Signal();

if (receiveResult == MessageReadResult.NoMessage)
{
Expand Down Expand Up @@ -89,13 +79,6 @@ await ErrorQueue
// Since this is TransactionMode.None, we don't care whether error handling says handled or retry. Message is gone either way.
_ = await HandleError(ex, message, transportTransaction, 1, context, cancellationToken).ConfigureAwait(false);
}
finally
{
if (!hasLatchBeenSignalled)
{
receiveLatch.Signal();
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,18 @@ public ProcessWithTransactionScope(TransactionOptions transactionOptions, DbConn
}

public override async Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource,
AsyncCountdownLatch receiveLatch, CancellationToken cancellationToken = default)
AsyncCountdownLatch.Signaler receiveLatch, CancellationToken cancellationToken = default)
{
Message message = null;
var context = new ContextBag();
var hasLatchBeenSignalled = false;

try
{
using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew, transactionOptions, TransactionScopeAsyncFlowOption.Enabled))
using (var connection = await connectionFactory.OpenNewConnection(cancellationToken).ConfigureAwait(false))
{
MessageReadResult receiveResult;
try
{
receiveResult = await InputQueue.TryReceive(connection, null, cancellationToken).ConfigureAwait(false);
}
finally
{
receiveLatch.Signal();
hasLatchBeenSignalled = true;
}
var receiveResult = await InputQueue.TryReceive(connection, null, cancellationToken).ConfigureAwait(false);
receiveLatch.Signal();

if (receiveResult == MessageReadResult.NoMessage)
{
Expand Down Expand Up @@ -81,13 +72,6 @@ public override async Task ProcessMessage(CancellationTokenSource stopBatchCance
}
failureInfoStorage.RecordFailureInfoForMessage(message.TransportId, ex, context);
}
finally
{
if (!hasLatchBeenSignalled)
{
receiveLatch.Signal();
}
}
}

async Task<bool> TryProcess(Message message, TransportTransaction transportTransaction, ContextBag context, CancellationToken cancellationToken)
Expand Down