This repository was archived by the owner on Apr 21, 2021. It is now read-only.
File tree Expand file tree Collapse file tree 1 file changed +11
-5
lines changed Expand file tree Collapse file tree 1 file changed +11
-5
lines changed Original file line number Diff line number Diff line change @@ -17,10 +17,10 @@ internal class AsyncConcurrentQueue<T>
1717 private readonly ConcurrentQueue < T > queue = new ConcurrentQueue < T > ( ) ;
1818
1919 /// <summary>
20- /// Keeps any pending Dequeue task to wake up once data arrives
20+ /// Wake up any pending dequeue task
2121 /// </summary>
2222 private TaskCompletionSource < bool > dequeueTask ;
23-
23+ private SemaphoreSlim @dequeueTaskLock = new SemaphoreSlim ( 1 ) ;
2424 private CancellationToken taskCancellationToken ;
2525
2626 internal AsyncConcurrentQueue ( CancellationToken taskCancellationToken )
@@ -36,8 +36,11 @@ internal void Enqueue(T value)
3636 {
3737 queue . Enqueue ( value ) ;
3838
39- //wake up the dequeue task with result
40- dequeueTask ? . TrySetResult ( true ) ;
39+ //signal
40+ dequeueTaskLock . Wait ( ) ;
41+ dequeueTask . TrySetResult ( true ) ;
42+ dequeueTaskLock . Release ( ) ;
43+
4144 }
4245
4346 /// <summary>
@@ -54,10 +57,13 @@ internal async Task<T> DequeueAsync()
5457 return result ;
5558 }
5659
60+ await dequeueTaskLock . WaitAsync ( ) ;
5761 dequeueTask = new TaskCompletionSource < bool > ( ) ;
62+ dequeueTaskLock . Release ( ) ;
63+
5864 taskCancellationToken . Register ( ( ) => dequeueTask . TrySetCanceled ( ) ) ;
5965 await dequeueTask . Task ;
60-
66+
6167 queue . TryDequeue ( out result ) ;
6268 return result ;
6369 }
You can’t perform that action at this time.
0 commit comments