Skip to content

Commit 12e5c6a

Browse files
mauroservientiramonsmitstmasternakSzymonPobiega
committed
Simplify the latch and signaler usage
Co-Authored-By: Ramon Smits <[email protected]> Co-Authored-By: Tomasz Masternak <[email protected]> Co-Authored-By: Szymon Pobiega <[email protected]>
1 parent 1b6cf4e commit 12e5c6a

File tree

2 files changed

+10
-9
lines changed

2 files changed

+10
-9
lines changed

src/NServiceBus.Transport.Sql.Shared/Receiving/AsyncCountdownLatch.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,15 @@ public AsyncCountdownLatch(int count)
2020
}
2121
}
2222

23-
public Task WaitAsync(CancellationToken cancellationToken = default) => completionSource.Task;
23+
#pragma warning disable PS0003
24+
public Task WaitAsync(CancellationToken cancellationToken)
25+
#pragma warning restore PS0003
26+
{
27+
_ = cancellationToken.Register(completionSource.SetResult);
28+
return completionSource.Task;
29+
}
2430

25-
public Signaler GetSignaler() => new Signaler(this);
31+
public Signaler GetSignaler() => new(this);
2632

2733
void Signal()
2834
{

src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -204,13 +204,11 @@ async Task ReceiveMessages(CancellationToken messageReceivingCancellationToken)
204204
? 1
205205
: messageCount;
206206

207-
bool shouldWaitForReceiveTasks = true;
208207
var receiveLatch = new AsyncCountdownLatch(maximumConcurrentProcessing);
209208
for (var i = 0; i < maximumConcurrentProcessing; i++)
210209
{
211210
if (stopBatchCancellationSource.IsCancellationRequested)
212211
{
213-
shouldWaitForReceiveTasks = false;
214212
break;
215213
}
216214

@@ -222,11 +220,8 @@ async Task ReceiveMessages(CancellationToken messageReceivingCancellationToken)
222220
localConcurrencyLimiter, receiveLatch, messageProcessingCancellationTokenSource.Token);
223221
}
224222

225-
if (shouldWaitForReceiveTasks)
226-
{
227-
// Wait for all receive operations to complete before returning (and thus peeking again)
228-
await receiveLatch.WaitAsync(CancellationToken.None).ConfigureAwait(false);
229-
}
223+
// Wait for all receive operations to complete before returning (and thus peeking again)
224+
await receiveLatch.WaitAsync(stopBatchCancellationSource.Token).ConfigureAwait(false);
230225
}
231226

232227
async Task ProcessMessagesSwallowExceptionsAndReleaseConcurrencyLimiter(

0 commit comments

Comments
 (0)