Skip to content

Commit 96acfec

Browse files
mauroservientixwipeoutxSzymonPobiegaramonsmitstmasternak
authored
Optimize peeks by replacing synchronous countdown event with an async countdown latch based on TCS (#1627)
* Replace syncronous countdown event with an async countdown latch based on TCS [wip] initial cut of receiver, using tasks Change from tasks to countdown event * Add a cancellation token to the WaitAsync method * Moved the latch signal up the stack, to ensure it signals when the connection setup fails. * Refactor to a disposable signaler class * Simplify the latch and signaler usage Co-Authored-By: Ramon Smits <[email protected]> Co-Authored-By: Tomasz Masternak <[email protected]> Co-Authored-By: Szymon Pobiega <[email protected]> * Dispose registration * Signaler struct * Slightly more defensive pattern since we care about the completion source transitioning into the completed state regardless whether it comes from cancellation or signal * Rename AsyncCountdownLatch to ReceiveCountdownEvent * Use primary constructor(s) * Simplify usings * Rename the method argument to receiveCountdownEventSignaler --------- Co-authored-by: Steve Leigh <[email protected]> Co-authored-by: SzymonPobiega <[email protected]> Co-authored-by: Ramon Smits <[email protected]> Co-authored-by: Tomasz Masternak <[email protected]> Co-authored-by: Szymon Pobiega <[email protected]> Co-authored-by: Daniel Marbach <[email protected]>
1 parent 021698d commit 96acfec

File tree

9 files changed

+106
-84
lines changed

9 files changed

+106
-84
lines changed

src/NServiceBus.Transport.Sql.Shared/Receiving/FailureInfoStorage.cs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,12 @@
33
using System;
44
using System.Collections.Generic;
55
using System.Runtime.ExceptionServices;
6-
using NServiceBus.Extensibility;
6+
using Extensibility;
77

88
// The data structure has fixed maximum size. When the data structure reaches its maximum size,
99
// the least recently used (LRU) message processing failure is removed from the storage.
10-
class FailureInfoStorage
10+
class FailureInfoStorage(int maxElements)
1111
{
12-
public FailureInfoStorage(int maxElements)
13-
{
14-
this.maxElements = maxElements;
15-
}
16-
1712
public void RecordFailureInfoForMessage(string messageId, Exception exception, ContextBag context)
1813
{
1914
lock (lockObject)
@@ -76,8 +71,6 @@ public void ClearFailureInfoForMessage(string messageId)
7671
LinkedList<string> leastRecentlyUsedMessages = new LinkedList<string>();
7772
object lockObject = new object();
7873

79-
int maxElements;
80-
8174
class FailureInfoNode
8275
{
8376
public FailureInfoNode(string messageId, ProcessingFailureInfo failureInfo)

src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ async Task ReceiveMessages(CancellationToken messageReceivingCancellationToken)
204204
? 1
205205
: messageCount;
206206

207+
var receiveLatch = new ReceiveCountdownEvent(maximumConcurrentProcessing);
207208
for (var i = 0; i < maximumConcurrentProcessing; i++)
208209
{
209210
if (stopBatchCancellationSource.IsCancellationRequested)
@@ -216,14 +217,18 @@ async Task ReceiveMessages(CancellationToken messageReceivingCancellationToken)
216217
await localConcurrencyLimiter.WaitAsync(messageReceivingCancellationToken).ConfigureAwait(false);
217218

218219
_ = ProcessMessagesSwallowExceptionsAndReleaseConcurrencyLimiter(stopBatchCancellationSource,
219-
localConcurrencyLimiter, messageProcessingCancellationTokenSource.Token);
220+
localConcurrencyLimiter, receiveLatch, messageProcessingCancellationTokenSource.Token);
220221
}
222+
223+
// Wait for all receive operations to complete before returning (and thus peeking again)
224+
await receiveLatch.WaitAsync(stopBatchCancellationSource.Token).ConfigureAwait(false);
221225
}
222226

223227
async Task ProcessMessagesSwallowExceptionsAndReleaseConcurrencyLimiter(
224228
CancellationTokenSource stopBatchCancellationTokenSource, SemaphoreSlim localConcurrencyLimiter,
225-
CancellationToken messageProcessingCancellationToken)
229+
ReceiveCountdownEvent receiveLatch, CancellationToken messageProcessingCancellationToken)
226230
{
231+
using var latchSignaler = receiveLatch.GetSignaler();
227232
try
228233
{
229234
try
@@ -232,8 +237,8 @@ async Task ProcessMessagesSwallowExceptionsAndReleaseConcurrencyLimiter(
232237
// in combination with TransactionScope will apply connection pooling and enlistment synchronous in ctor.
233238
await Task.Yield();
234239

235-
await processStrategy.ProcessMessage(stopBatchCancellationTokenSource,
236-
messageProcessingCancellationToken)
240+
await processStrategy.ProcessMessage(stopBatchCancellationTokenSource, latchSignaler,
241+
messageProcessingCancellationToken)
237242
.ConfigureAwait(false);
238243

239244
messageProcessingCircuitBreaker.Success();

src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessStrategy.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ namespace NServiceBus.Transport.Sql.Shared
55
using System.Threading;
66
using System.Threading.Tasks;
77
using Faults;
8-
using NServiceBus.Extensibility;
9-
using NServiceBus.Logging;
8+
using Extensibility;
9+
using Logging;
1010
using Unicast.Queuing;
1111

1212
abstract class ProcessStrategy
@@ -35,7 +35,8 @@ public void Init(TableBasedQueue inputQueue, TableBasedQueue errorQueue, OnMessa
3535
this.criticalError = criticalError;
3636
}
3737

38-
public abstract Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource, CancellationToken cancellationToken = default);
38+
public abstract Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource,
39+
ReceiveCountdownEvent.Signaler receiveCountdownEventSignaler, CancellationToken cancellationToken = default);
3940

4041
protected async Task<bool> TryHandleMessage(Message message, TransportTransaction transportTransaction, ContextBag context, CancellationToken cancellationToken = default)
4142
{

src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessWithNativeTransaction.cs

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,11 @@ namespace NServiceBus.Transport.Sql.Shared
88

99
using IsolationLevel = System.Data.IsolationLevel;
1010

11-
class ProcessWithNativeTransaction : ProcessStrategy
11+
class ProcessWithNativeTransaction(TransactionOptions transactionOptions, DbConnectionFactory connectionFactory, FailureInfoStorage failureInfoStorage, TableBasedQueueCache tableBasedQueueCache, IExceptionClassifier exceptionClassifier, bool transactionForReceiveOnly = false)
12+
: ProcessStrategy(tableBasedQueueCache, exceptionClassifier, failureInfoStorage)
1213
{
13-
public ProcessWithNativeTransaction(TransactionOptions transactionOptions, DbConnectionFactory connectionFactory, FailureInfoStorage failureInfoStorage, TableBasedQueueCache tableBasedQueueCache, IExceptionClassifier exceptionClassifier, bool transactionForReceiveOnly = false)
14-
: base(tableBasedQueueCache, exceptionClassifier, failureInfoStorage)
15-
{
16-
this.connectionFactory = connectionFactory;
17-
this.failureInfoStorage = failureInfoStorage;
18-
this.exceptionClassifier = exceptionClassifier;
19-
this.transactionForReceiveOnly = transactionForReceiveOnly;
20-
21-
isolationLevel = IsolationLevelMapper.Map(transactionOptions.IsolationLevel);
22-
}
23-
24-
public override async Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource, CancellationToken cancellationToken = default)
14+
public override async Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource,
15+
ReceiveCountdownEvent.Signaler receiveCountdownEventSignaler, CancellationToken cancellationToken = default)
2516
{
2617
Message message = null;
2718
var context = new ContextBag();
@@ -32,6 +23,7 @@ public override async Task ProcessMessage(CancellationTokenSource stopBatchCance
3223
using (var transaction = connection.BeginTransaction(isolationLevel))
3324
{
3425
var receiveResult = await InputQueue.TryReceive(connection, transaction, cancellationToken).ConfigureAwait(false);
26+
receiveCountdownEventSignaler.Signal();
3527

3628
if (receiveResult == MessageReadResult.NoMessage)
3729
{
@@ -102,11 +94,9 @@ async Task<bool> TryProcess(Message message, TransportTransaction transportTrans
10294
}
10395
}
10496

105-
IsolationLevel isolationLevel;
106-
DbConnectionFactory connectionFactory;
107-
FailureInfoStorage failureInfoStorage;
108-
readonly IExceptionClassifier exceptionClassifier;
109-
bool transactionForReceiveOnly;
97+
IsolationLevel isolationLevel = IsolationLevelMapper.Map(transactionOptions.IsolationLevel);
98+
FailureInfoStorage failureInfoStorage = failureInfoStorage;
99+
readonly IExceptionClassifier exceptionClassifier = exceptionClassifier;
110100
internal static string ReceiveOnlyTransactionMode = "SqlTransport.ReceiveOnlyTransactionMode";
111101
}
112102
}

src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessWithNoTransaction.cs

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,11 @@ namespace NServiceBus.Transport.Sql.Shared
66
using System.Threading.Tasks;
77
using Extensibility;
88

9-
class ProcessWithNoTransaction : ProcessStrategy
9+
class ProcessWithNoTransaction(DbConnectionFactory connectionFactory, FailureInfoStorage failureInfoStorage, TableBasedQueueCache tableBasedQueueCache, IExceptionClassifier exceptionClassifier)
10+
: ProcessStrategy(tableBasedQueueCache, exceptionClassifier, failureInfoStorage)
1011
{
11-
public ProcessWithNoTransaction(DbConnectionFactory connectionFactory, FailureInfoStorage failureInfoStorage, TableBasedQueueCache tableBasedQueueCache, IExceptionClassifier exceptionClassifier)
12-
: base(tableBasedQueueCache, exceptionClassifier, failureInfoStorage)
13-
{
14-
this.connectionFactory = connectionFactory;
15-
this.failureInfoStorage = failureInfoStorage;
16-
this.exceptionClassifier = exceptionClassifier;
17-
}
18-
19-
public override async Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource, CancellationToken cancellationToken = default)
12+
public override async Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource,
13+
ReceiveCountdownEvent.Signaler receiveCountdownEventSignaler, CancellationToken cancellationToken = default)
2014
{
2115
Message message = null;
2216
var context = new ContextBag();
@@ -27,8 +21,8 @@ public override async Task ProcessMessage(CancellationTokenSource stopBatchCance
2721
{
2822
using (var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted))
2923
{
30-
var receiveResult = await InputQueue.TryReceive(connection, transaction, cancellationToken)
31-
.ConfigureAwait(false);
24+
var receiveResult = await InputQueue.TryReceive(connection, transaction, cancellationToken).ConfigureAwait(false);
25+
receiveCountdownEventSignaler.Signal();
3226

3327
if (receiveResult == MessageReadResult.NoMessage)
3428
{
@@ -81,8 +75,7 @@ await ErrorQueue
8175
}
8276
}
8377

84-
readonly DbConnectionFactory connectionFactory;
85-
readonly FailureInfoStorage failureInfoStorage;
86-
readonly IExceptionClassifier exceptionClassifier;
78+
readonly FailureInfoStorage failureInfoStorage = failureInfoStorage;
79+
readonly IExceptionClassifier exceptionClassifier = exceptionClassifier;
8780
}
8881
}

src/NServiceBus.Transport.Sql.Shared/Receiving/ProcessWithTransactionScope.cs

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,11 @@
66
using System.Transactions;
77
using Extensibility;
88

9-
class ProcessWithTransactionScope : ProcessStrategy
9+
class ProcessWithTransactionScope(TransactionOptions transactionOptions, DbConnectionFactory connectionFactory, FailureInfoStorage failureInfoStorage, TableBasedQueueCache tableBasedQueueCache, IExceptionClassifier exceptionClassifier)
10+
: ProcessStrategy(tableBasedQueueCache, exceptionClassifier, failureInfoStorage)
1011
{
11-
public ProcessWithTransactionScope(TransactionOptions transactionOptions, DbConnectionFactory connectionFactory, FailureInfoStorage failureInfoStorage, TableBasedQueueCache tableBasedQueueCache, IExceptionClassifier exceptionClassifier)
12-
: base(tableBasedQueueCache, exceptionClassifier, failureInfoStorage)
13-
{
14-
this.transactionOptions = transactionOptions;
15-
this.connectionFactory = connectionFactory;
16-
this.failureInfoStorage = failureInfoStorage;
17-
this.exceptionClassifier = exceptionClassifier;
18-
}
19-
20-
public override async Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource, CancellationToken cancellationToken = default)
12+
public override async Task ProcessMessage(CancellationTokenSource stopBatchCancellationTokenSource,
13+
ReceiveCountdownEvent.Signaler receiveCountdownEventSignaler, CancellationToken cancellationToken = default)
2114
{
2215
Message message = null;
2316
var context = new ContextBag();
@@ -28,6 +21,7 @@ public override async Task ProcessMessage(CancellationTokenSource stopBatchCance
2821
using (var connection = await connectionFactory.OpenNewConnection(cancellationToken).ConfigureAwait(false))
2922
{
3023
var receiveResult = await InputQueue.TryReceive(connection, null, cancellationToken).ConfigureAwait(false);
24+
receiveCountdownEventSignaler.Signal();
3125

3226
if (receiveResult == MessageReadResult.NoMessage)
3327
{
@@ -95,9 +89,7 @@ async Task<bool> TryProcess(Message message, TransportTransaction transportTrans
9589
}
9690
}
9791

98-
TransactionOptions transactionOptions;
99-
DbConnectionFactory connectionFactory;
100-
FailureInfoStorage failureInfoStorage;
101-
readonly IExceptionClassifier exceptionClassifier;
92+
FailureInfoStorage failureInfoStorage = failureInfoStorage;
93+
readonly IExceptionClassifier exceptionClassifier = exceptionClassifier;
10294
}
10395
}

src/NServiceBus.Transport.Sql.Shared/Receiving/QueuePeeker.cs

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,8 @@
66
using System.Transactions;
77
using Logging;
88

9-
class QueuePeeker : IPeekMessagesInQueue
9+
class QueuePeeker(DbConnectionFactory connectionFactory, IExceptionClassifier exceptionClassifier, TimeSpan peekDelay) : IPeekMessagesInQueue
1010
{
11-
public QueuePeeker(DbConnectionFactory connectionFactory, IExceptionClassifier exceptionClassifier, TimeSpan peekDelay)
12-
{
13-
this.connectionFactory = connectionFactory;
14-
this.exceptionClassifier = exceptionClassifier;
15-
this.peekDelay = peekDelay;
16-
}
17-
1811
public async Task<int> Peek(TableBasedQueue inputQueue, RepeatedFailuresOverTimeCircuitBreaker circuitBreaker, CancellationToken cancellationToken = default)
1912
{
2013
var messageCount = 0;
@@ -50,10 +43,6 @@ public async Task<int> Peek(TableBasedQueue inputQueue, RepeatedFailuresOverTime
5043
return messageCount;
5144
}
5245

53-
readonly DbConnectionFactory connectionFactory;
54-
readonly IExceptionClassifier exceptionClassifier;
55-
readonly TimeSpan peekDelay;
56-
5746
static readonly ILog Logger = LogManager.GetLogger<QueuePeeker>();
5847
}
5948
}

src/NServiceBus.Transport.Sql.Shared/Receiving/QueuePurger.cs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,14 @@
33
using System.Threading;
44
using System.Threading.Tasks;
55

6-
class QueuePurger : IPurgeQueues
6+
class QueuePurger(DbConnectionFactory connectionFactory) : IPurgeQueues
77
{
8-
public QueuePurger(DbConnectionFactory connectionFactory)
9-
{
10-
this.connectionFactory = connectionFactory;
11-
}
12-
138
public virtual async Task<int> Purge(TableBasedQueue queue, CancellationToken cancellationToken = default)
149
{
1510
using (var connection = await connectionFactory.OpenNewConnection(cancellationToken).ConfigureAwait(false))
1611
{
1712
return await queue.Purge(connection, cancellationToken).ConfigureAwait(false);
1813
}
1914
}
20-
21-
DbConnectionFactory connectionFactory;
2215
}
2316
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
namespace NServiceBus.Transport.Sql.Shared;
2+
3+
using System;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
7+
class ReceiveCountdownEvent
8+
{
9+
int count;
10+
readonly TaskCompletionSource completionSource;
11+
12+
public ReceiveCountdownEvent(int count)
13+
{
14+
this.count = count;
15+
completionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
16+
17+
if (count <= 0)
18+
{
19+
completionSource.SetResult();
20+
}
21+
}
22+
23+
public async Task WaitAsync(CancellationToken cancellationToken = default)
24+
{
25+
var registration = cancellationToken.Register(static state => ((TaskCompletionSource)state).TrySetResult(), completionSource);
26+
await using var _ = registration.ConfigureAwait(false);
27+
await completionSource.Task.ConfigureAwait(false);
28+
}
29+
30+
public Signaler GetSignaler() => new(this);
31+
32+
void Signal()
33+
{
34+
if (Interlocked.Decrement(ref count) == 0)
35+
{
36+
_ = completionSource.TrySetResult();
37+
}
38+
}
39+
40+
public struct Signaler(ReceiveCountdownEvent parent) : IDisposable
41+
{
42+
bool signalled;
43+
44+
public void Signal()
45+
{
46+
if (signalled)
47+
{
48+
return;
49+
}
50+
51+
parent.Signal();
52+
signalled = true;
53+
}
54+
55+
public void Dispose()
56+
{
57+
if (signalled)
58+
{
59+
return;
60+
}
61+
62+
parent.Signal();
63+
signalled = true;
64+
}
65+
}
66+
}

0 commit comments

Comments
 (0)