55namespace EventHook . Helpers
66{
77 /// <summary>
8- /// A concurrent queue facilitating async dequeue
9- /// Since our consumer is always single threaded no locking is needed
8+ /// A concurrent queue facilitating async dequeue with minimal locking
9+ /// Assumes single/multi-threaded producer and a single- threaded consumer
1010 /// </summary>
1111 /// <typeparam name="T"></typeparam>
1212 internal class AsyncQueue < T >
@@ -25,10 +25,10 @@ internal AsyncQueue(CancellationToken taskCancellationToken)
2525 /// <summary>
2626 /// Keeps any pending Dequeue task to wake up once data arrives
2727 /// </summary>
28- TaskCompletionSource < T > dequeueTask ;
28+ TaskCompletionSource < bool > dequeueTask ;
2929
3030 /// <summary>
31- /// Assumes a single threaded producer!
31+ /// Supports multi- threaded producers
3232 /// </summary>
3333 /// <param name="value"></param>
3434 internal void Enqueue ( T value )
@@ -39,15 +39,12 @@ internal void Enqueue(T value)
3939 if ( dequeueTask != null
4040 && ! dequeueTask . Task . IsCompleted )
4141 {
42- T result ;
43- queue . TryDequeue ( out result ) ;
44- dequeueTask . SetResult ( result ) ;
42+ dequeueTask . SetResult ( true ) ;
4543 }
46-
4744 }
4845
4946 /// <summary>
50- /// Assumes a single threaded consumer!
47+ /// Assumes a single- threaded consumer!
5148 /// </summary>
5249 /// <returns></returns>
5350 internal async Task < T > DequeueAsync ( )
@@ -60,11 +57,12 @@ internal async Task<T> DequeueAsync()
6057 return result ;
6158 }
6259
63- dequeueTask = new TaskCompletionSource < T > ( ) ;
60+ dequeueTask = new TaskCompletionSource < bool > ( ) ;
6461 taskCancellationToken . Register ( ( ) => dequeueTask . TrySetCanceled ( ) ) ;
65- result = await dequeueTask . Task ;
62+ await dequeueTask . Task ;
6663 dequeueTask = null ;
6764
65+ queue . TryDequeue ( out result ) ;
6866 return result ;
6967 }
7068
0 commit comments