Skip to content

Commit 1b6cf4e

Browse files
committed
Refactor to a disposable signaler class
1 parent 4b94d48 commit 1b6cf4e

File tree

6 files changed

+36
-62
lines changed

6 files changed

+36
-62
lines changed

src/NServiceBus.Transport.Sql.Shared/Receiving/AsyncCountdownLatch.cs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
namespace NServiceBus.Transport.Sql.Shared;
22

3+
using System;
34
using System.Threading;
45
using System.Threading.Tasks;
56

@@ -21,11 +22,33 @@ public AsyncCountdownLatch(int count)
2122

2223
public Task WaitAsync(CancellationToken cancellationToken = default) => completionSource.Task;
2324

24-
public void Signal()
25+
public Signaler GetSignaler() => new Signaler(this);
26+
27+
void Signal()
2528
{
2629
if (Interlocked.Decrement(ref count) == 0)
2730
{
2831
completionSource.SetResult();
2932
}
3033
}
34+
35+
public class Signaler(AsyncCountdownLatch parent) : IDisposable
36+
{
37+
bool signalled;
38+
39+
public void Signal()
40+
{
41+
parent.Signal();
42+
signalled = true;
43+
}
44+
45+
public void Dispose()
46+
{
47+
if (!signalled)
48+
{
49+
parent.Signal();
50+
signalled = true;
51+
}
52+
}
53+
}
3154
}

src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ async Task ProcessMessagesSwallowExceptionsAndReleaseConcurrencyLimiter(
233233
CancellationTokenSource stopBatchCancellationTokenSource, SemaphoreSlim localConcurrencyLimiter,
234234
AsyncCountdownLatch receiveLatch, CancellationToken messageProcessingCancellationToken)
235235
{
236+
using var latchSignaler = receiveLatch.GetSignaler();
236237
try
237238
{
238239
try
@@ -241,7 +242,7 @@ async Task ProcessMessagesSwallowExceptionsAndReleaseConcurrencyLimiter(
241242
// in combination with TransactionScope will apply connection pooling and enlistment synchronous in ctor.
242243
await Task.Yield();
243244

244-
await processStrategy.ProcessMessage(stopBatchCancellationTokenSource, receiveLatch,
245+
await processStrategy.ProcessMessage(stopBatchCancellationTokenSource, latchSignaler,
245246
messageProcessingCancellationToken)
246247
.ConfigureAwait(false);
247248

src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessStrategy.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public void Init(TableBasedQueue inputQueue, TableBasedQueue errorQueue, OnMessa
3636
}
3737

3838
public abstract Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource,
39-
AsyncCountdownLatch receiveLatch, CancellationToken cancellationToken = default);
39+
AsyncCountdownLatch.Signaler receiveLatch, CancellationToken cancellationToken = default);
4040

4141
protected async Task<bool> TryHandleMessage(Message message, TransportTransaction transportTransaction, ContextBag context, CancellationToken cancellationToken = default)
4242
{

src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessWithNativeTransaction.cs

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,28 +22,18 @@ public ProcessWithNativeTransaction(TransactionOptions transactionOptions, DbCon
2222
}
2323

2424
public override async Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource,
25-
AsyncCountdownLatch receiveLatch, CancellationToken cancellationToken = default)
25+
AsyncCountdownLatch.Signaler receiveLatch, CancellationToken cancellationToken = default)
2626
{
2727
Message message = null;
2828
var context = new ContextBag();
29-
var hasLatchBeenSignalled = false;
3029

3130
try
3231
{
3332
using (var connection = await connectionFactory.OpenNewConnection(cancellationToken).ConfigureAwait(false))
3433
using (var transaction = connection.BeginTransaction(isolationLevel))
3534
{
36-
MessageReadResult receiveResult;
37-
try
38-
{
39-
receiveResult = await InputQueue.TryReceive(connection, transaction, cancellationToken)
40-
.ConfigureAwait(false);
41-
}
42-
finally
43-
{
44-
receiveLatch.Signal();
45-
hasLatchBeenSignalled = true;
46-
}
35+
var receiveResult = await InputQueue.TryReceive(connection, transaction, cancellationToken).ConfigureAwait(false);
36+
receiveLatch.Signal();
4737

4838
if (receiveResult == MessageReadResult.NoMessage)
4939
{
@@ -89,13 +79,6 @@ public override async Task ProcessMessage(CancellationTokenSource stopBatchCance
8979
}
9080
failureInfoStorage.RecordFailureInfoForMessage(message.TransportId, ex, context);
9181
}
92-
finally
93-
{
94-
if (!hasLatchBeenSignalled)
95-
{
96-
receiveLatch.Signal();
97-
}
98-
}
9982
}
10083

10184
async Task<bool> TryProcess(Message message, TransportTransaction transportTransaction, ContextBag context, CancellationToken cancellationToken)

src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessWithNoTransaction.cs

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,29 +17,19 @@ public ProcessWithNoTransaction(DbConnectionFactory connectionFactory, FailureIn
1717
}
1818

1919
public override async Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource,
20-
AsyncCountdownLatch receiveLatch, CancellationToken cancellationToken = default)
20+
AsyncCountdownLatch.Signaler receiveLatch, CancellationToken cancellationToken = default)
2121
{
2222
Message message = null;
2323
var context = new ContextBag();
24-
var hasLatchBeenSignalled = false;
2524

2625
using (var connection = await connectionFactory.OpenNewConnection(cancellationToken).ConfigureAwait(false))
2726
{
2827
try
2928
{
3029
using (var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted))
3130
{
32-
MessageReadResult receiveResult;
33-
try
34-
{
35-
receiveResult = await InputQueue.TryReceive(connection, transaction, cancellationToken)
36-
.ConfigureAwait(false);
37-
}
38-
finally
39-
{
40-
receiveLatch.Signal();
41-
hasLatchBeenSignalled = true;
42-
}
31+
var receiveResult = await InputQueue.TryReceive(connection, transaction, cancellationToken).ConfigureAwait(false);
32+
receiveLatch.Signal();
4333

4434
if (receiveResult == MessageReadResult.NoMessage)
4535
{
@@ -89,13 +79,6 @@ await ErrorQueue
8979
// Since this is TransactionMode.None, we don't care whether error handling says handled or retry. Message is gone either way.
9080
_ = await HandleError(ex, message, transportTransaction, 1, context, cancellationToken).ConfigureAwait(false);
9181
}
92-
finally
93-
{
94-
if (!hasLatchBeenSignalled)
95-
{
96-
receiveLatch.Signal();
97-
}
98-
}
9982
}
10083
}
10184

src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessWithTransactionScope.cs

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,18 @@ public ProcessWithTransactionScope(TransactionOptions transactionOptions, DbConn
1818
}
1919

2020
public override async Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource,
21-
AsyncCountdownLatch receiveLatch, CancellationToken cancellationToken = default)
21+
AsyncCountdownLatch.Signaler receiveLatch, CancellationToken cancellationToken = default)
2222
{
2323
Message message = null;
2424
var context = new ContextBag();
25-
var hasLatchBeenSignalled = false;
2625

2726
try
2827
{
2928
using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew, transactionOptions, TransactionScopeAsyncFlowOption.Enabled))
3029
using (var connection = await connectionFactory.OpenNewConnection(cancellationToken).ConfigureAwait(false))
3130
{
32-
MessageReadResult receiveResult;
33-
try
34-
{
35-
receiveResult = await InputQueue.TryReceive(connection, null, cancellationToken).ConfigureAwait(false);
36-
}
37-
finally
38-
{
39-
receiveLatch.Signal();
40-
hasLatchBeenSignalled = true;
41-
}
31+
var receiveResult = await InputQueue.TryReceive(connection, null, cancellationToken).ConfigureAwait(false);
32+
receiveLatch.Signal();
4233

4334
if (receiveResult == MessageReadResult.NoMessage)
4435
{
@@ -81,13 +72,6 @@ public override async Task ProcessMessage(CancellationTokenSource stopBatchCance
8172
}
8273
failureInfoStorage.RecordFailureInfoForMessage(message.TransportId, ex, context);
8374
}
84-
finally
85-
{
86-
if (!hasLatchBeenSignalled)
87-
{
88-
receiveLatch.Signal();
89-
}
90-
}
9175
}
9276

9377
async Task<bool> TryProcess(Message message, TransportTransaction transportTransaction, ContextBag context, CancellationToken cancellationToken)

0 commit comments

Comments
 (0)