Skip to content

Commit 41ba5bf

Browse files
authored
Merge pull request #762 from danielmarbach/bring-back-tcs
Using TaskCompletionSource for WorkPool instead of SemaphoreSlim
2 parents 87193c4 + 3ea8c45 commit 41ba5bf

File tree

1 file changed

+7
-3
lines changed

1 file changed

+7
-3
lines changed

projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerWorkService.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,16 @@ class WorkPool
4343
readonly ConcurrentQueue<Work> _workQueue;
4444
readonly CancellationTokenSource _tokenSource;
4545
readonly ModelBase _model;
46-
readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0);
46+
CancellationTokenRegistration _tokenRegistration;
47+
volatile TaskCompletionSource<bool> _syncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
4748
private Task _task;
4849

4950
public WorkPool(ModelBase model)
5051
{
5152
_model = model;
5253
_workQueue = new ConcurrentQueue<Work>();
5354
_tokenSource = new CancellationTokenSource();
55+
_tokenRegistration = _tokenSource.Token.Register(() => _syncSource.TrySetCanceled());
5456
}
5557

5658
public void Start()
@@ -61,7 +63,7 @@ public void Start()
6163
public void Enqueue(Work work)
6264
{
6365
_workQueue.Enqueue(work);
64-
_semaphore.Release();
66+
_syncSource.TrySetResult(true);
6567
}
6668

6769
async Task Loop()
@@ -70,7 +72,8 @@ async Task Loop()
7072
{
7173
try
7274
{
73-
await _semaphore.WaitAsync(_tokenSource.Token).ConfigureAwait(false);
75+
await _syncSource.Task.ConfigureAwait(false);
76+
_syncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
7477
}
7578
catch (TaskCanceledException)
7679
{
@@ -87,6 +90,7 @@ async Task Loop()
8790
public void Stop()
8891
{
8992
_tokenSource.Cancel();
93+
_tokenRegistration.Dispose();
9094
}
9195
}
9296
}

0 commit comments

Comments
 (0)