Skip to content

Commit 844553a

Browse files
xwipeoutxmauroservienti
authored andcommitted
Replace syncronous countdown event with an async countdown latch based on TCS
[wip] initial cut of receiver, using tasks Change from tasks to countdown event
1 parent 092c9e7 commit 844553a

File tree

6 files changed

+81
-12
lines changed

6 files changed

+81
-12
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
namespace NServiceBus.Transport.Sql.Shared;
2+
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
6+
class AsyncCountdownLatch
7+
{
8+
int count;
9+
readonly TaskCompletionSource completionSource;
10+
11+
public AsyncCountdownLatch(int count)
12+
{
13+
this.count = count;
14+
completionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
15+
16+
if (count <= 0)
17+
{
18+
completionSource.SetResult();
19+
}
20+
}
21+
22+
public Task WaitAsync() => completionSource.Task;
23+
24+
public void Signal()
25+
{
26+
if (Interlocked.Decrement(ref count) == 0)
27+
{
28+
completionSource.SetResult();
29+
}
30+
}
31+
}

src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -204,10 +204,13 @@ async Task ReceiveMessages(CancellationToken messageReceivingCancellationToken)
204204
? 1
205205
: messageCount;
206206

207+
bool shouldWaitForReceiveTasks = true;
208+
var receiveLatch = new AsyncCountdownLatch(maximumConcurrentProcessing);
207209
for (var i = 0; i < maximumConcurrentProcessing; i++)
208210
{
209211
if (stopBatchCancellationSource.IsCancellationRequested)
210212
{
213+
shouldWaitForReceiveTasks = false;
211214
break;
212215
}
213216

@@ -216,13 +219,19 @@ async Task ReceiveMessages(CancellationToken messageReceivingCancellationToken)
216219
await localConcurrencyLimiter.WaitAsync(messageReceivingCancellationToken).ConfigureAwait(false);
217220

218221
_ = ProcessMessagesSwallowExceptionsAndReleaseConcurrencyLimiter(stopBatchCancellationSource,
219-
localConcurrencyLimiter, messageProcessingCancellationTokenSource.Token);
222+
localConcurrencyLimiter, receiveLatch, messageProcessingCancellationTokenSource.Token);
223+
}
224+
225+
if (shouldWaitForReceiveTasks)
226+
{
227+
// Wait for all receive operations to complete before returning (and thus peeking again)
228+
await receiveLatch.WaitAsync().ConfigureAwait(false);
220229
}
221230
}
222231

223232
async Task ProcessMessagesSwallowExceptionsAndReleaseConcurrencyLimiter(
224233
CancellationTokenSource stopBatchCancellationTokenSource, SemaphoreSlim localConcurrencyLimiter,
225-
CancellationToken messageProcessingCancellationToken)
234+
AsyncCountdownLatch receiveLatch, CancellationToken messageProcessingCancellationToken)
226235
{
227236
try
228237
{
@@ -232,8 +241,8 @@ async Task ProcessMessagesSwallowExceptionsAndReleaseConcurrencyLimiter(
232241
// in combination with TransactionScope will apply connection pooling and enlistment synchronous in ctor.
233242
await Task.Yield();
234243

235-
await processStrategy.ProcessMessage(stopBatchCancellationTokenSource,
236-
messageProcessingCancellationToken)
244+
await processStrategy.ProcessMessage(stopBatchCancellationTokenSource, receiveLatch,
245+
messageProcessingCancellationToken)
237246
.ConfigureAwait(false);
238247

239248
messageProcessingCircuitBreaker.Success();

src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessStrategy.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ public void Init(TableBasedQueue inputQueue, TableBasedQueue errorQueue, OnMessa
3535
this.criticalError = criticalError;
3636
}
3737

38-
public abstract Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource, CancellationToken cancellationToken = default);
38+
public abstract Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource,
39+
AsyncCountdownLatch receiveLatch, CancellationToken cancellationToken = default);
3940

4041
protected async Task<bool> TryHandleMessage(Message message, TransportTransaction transportTransaction, ContextBag context, CancellationToken cancellationToken = default)
4142
{

src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessWithNativeTransaction.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ public ProcessWithNativeTransaction(TransactionOptions transactionOptions, DbCon
2121
isolationLevel = IsolationLevelMapper.Map(transactionOptions.IsolationLevel);
2222
}
2323

24-
public override async Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource, CancellationToken cancellationToken = default)
24+
public override async Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource,
25+
AsyncCountdownLatch receiveLatch, CancellationToken cancellationToken = default)
2526
{
2627
Message message = null;
2728
var context = new ContextBag();
@@ -31,7 +32,16 @@ public override async Task ProcessMessage(CancellationTokenSource stopBatchCance
3132
using (var connection = await connectionFactory.OpenNewConnection(cancellationToken).ConfigureAwait(false))
3233
using (var transaction = connection.BeginTransaction(isolationLevel))
3334
{
34-
var receiveResult = await InputQueue.TryReceive(connection, transaction, cancellationToken).ConfigureAwait(false);
35+
MessageReadResult receiveResult;
36+
try
37+
{
38+
receiveResult = await InputQueue.TryReceive(connection, transaction, cancellationToken)
39+
.ConfigureAwait(false);
40+
}
41+
finally
42+
{
43+
receiveLatch.Signal();
44+
}
3545

3646
if (receiveResult == MessageReadResult.NoMessage)
3747
{

src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessWithNoTransaction.cs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ public ProcessWithNoTransaction(DbConnectionFactory connectionFactory, FailureIn
1616
this.exceptionClassifier = exceptionClassifier;
1717
}
1818

19-
public override async Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource, CancellationToken cancellationToken = default)
19+
public override async Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource,
20+
AsyncCountdownLatch receiveLatch, CancellationToken cancellationToken = default)
2021
{
2122
Message message = null;
2223
var context = new ContextBag();
@@ -27,8 +28,16 @@ public override async Task ProcessMessage(CancellationTokenSource stopBatchCance
2728
{
2829
using (var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted))
2930
{
30-
var receiveResult = await InputQueue.TryReceive(connection, transaction, cancellationToken)
31-
.ConfigureAwait(false);
31+
MessageReadResult receiveResult;
32+
try
33+
{
34+
receiveResult = await InputQueue.TryReceive(connection, transaction, cancellationToken)
35+
.ConfigureAwait(false);
36+
}
37+
finally
38+
{
39+
receiveLatch.Signal();
40+
}
3241

3342
if (receiveResult == MessageReadResult.NoMessage)
3443
{

src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessWithTransactionScope.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ public ProcessWithTransactionScope(TransactionOptions transactionOptions, DbConn
1717
this.exceptionClassifier = exceptionClassifier;
1818
}
1919

20-
public override async Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource, CancellationToken cancellationToken = default)
20+
public override async Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource,
21+
AsyncCountdownLatch receiveLatch, CancellationToken cancellationToken = default)
2122
{
2223
Message message = null;
2324
var context = new ContextBag();
@@ -27,7 +28,15 @@ public override async Task ProcessMessage(CancellationTokenSource stopBatchCance
2728
using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew, transactionOptions, TransactionScopeAsyncFlowOption.Enabled))
2829
using (var connection = await connectionFactory.OpenNewConnection(cancellationToken).ConfigureAwait(false))
2930
{
30-
var receiveResult = await InputQueue.TryReceive(connection, null, cancellationToken).ConfigureAwait(false);
31+
MessageReadResult receiveResult;
32+
try
33+
{
34+
receiveResult = await InputQueue.TryReceive(connection, null, cancellationToken).ConfigureAwait(false);
35+
}
36+
finally
37+
{
38+
receiveLatch.Signal();
39+
}
3140

3241
if (receiveResult == MessageReadResult.NoMessage)
3342
{

0 commit comments

Comments
 (0)