@@ -26,47 +26,50 @@ public static Task<long> ReadAllConcurrentlyAsync<T>(this ChannelReader<T> reade
2626 if ( maxConcurrency == 1 )
2727 return reader . ReadAllAsync ( receiver , cancellationToken , true ) . AsTask ( ) ;
2828
29- #pragma warning disable IDE0079 // Remove unnecessary suppression
30- #pragma warning disable CA2000 // Dispose objects before losing scope
31- var tokenSource = CancellationTokenSource . CreateLinkedTokenSource ( cancellationToken ) ;
32- #pragma warning restore CA2000 // Dispose objects before losing scope
33- #pragma warning restore IDE0079 // Remove unnecessary suppression
34- CancellationToken token = tokenSource . Token ;
35- var readers = new Task < long > [ maxConcurrency ] ;
36- for ( int r = 0 ; r < maxConcurrency ; ++ r )
37- readers [ r ] = Read ( ) ;
29+ // Leverage async to simplify this function and make it easy to dispose the token source.
30+ return ReadAllConcurrentlyAsyncCore ( reader , maxConcurrency , receiver , cancellationToken ) ;
3831
39- return Task
40- . WhenAll ( readers )
41- . ContinueWith ( t =>
42- {
43- tokenSource . Dispose ( ) ;
44- #pragma warning disable IDE0079 // Remove unnecessary suppression
45- #pragma warning disable CA1849 // Call async methods when in an async method
46- return t . IsFaulted
32+ static async Task < long > ReadAllConcurrentlyAsyncCore (
33+ ChannelReader < T > reader , int maxConcurrency , Func < T , ValueTask > receiver , CancellationToken cancellationToken )
34+ {
35+ using var tokenSource = CancellationTokenSource . CreateLinkedTokenSource ( cancellationToken ) ;
36+ CancellationToken token = tokenSource . Token ;
37+ var readers = new Task < long > [ maxConcurrency ] ;
38+ var scheduler = TaskScheduler . Current ;
39+ for ( int r = 0 ; r < maxConcurrency ; ++ r )
40+ readers [ r ] = Read ( ) ;
41+
42+ // This produces the most accurate/reliable exception and cancellation results.
43+ return await Task
44+ . WhenAll ( readers )
45+ . ContinueWith (
46+ t => t . IsFaulted
4747 ? Task . FromException < long > ( t . Exception ! )
4848 : t . IsCanceled
4949 ? Task . FromCanceled < long > ( token )
50- : Task . FromResult ( t . Result . Sum ( ) ) ;
51- #pragma warning restore CA1849 // Call async methods when in an async method
52- #pragma warning restore IDE0079 // Remove unnecessary suppression
53- } ,
54- CancellationToken . None ,
55- TaskContinuationOptions . ExecuteSynchronously ,
56- TaskScheduler . Current )
57- . Unwrap ( ) ;
50+ : SumAsync ( t ) ,
51+ CancellationToken . None ,
52+ TaskContinuationOptions . ExecuteSynchronously ,
53+ scheduler )
54+ . Unwrap ( )
55+ . ConfigureAwait ( false ) ;
5856
59- async Task < long > Read ( )
60- {
61- try
57+ async Task < long > Read ( )
6258 {
63- return await reader . ReadUntilCancelledAsync ( token , ( T item , long _ ) => receiver ( item ) , true ) . ConfigureAwait ( false ) ;
64- }
65- catch
66- {
67- await tokenSource . CancelAsync ( ) . ConfigureAwait ( false ) ;
68- throw ;
59+ try
60+ {
61+ return await reader . ReadUntilCancelledAsync ( token , ( T item , long _ ) => receiver ( item ) , true ) . ConfigureAwait ( false ) ;
62+ }
63+ catch ( Exception ex )
64+ {
65+ Debug . WriteLine ( ex . ToString ( ) ) ;
66+ await tokenSource . CancelAsync ( ) . ConfigureAwait ( false ) ;
67+ throw ;
68+ }
6969 }
70+
71+ static async Task < long > SumAsync ( Task < long [ ] > counts )
72+ => ( await counts . ConfigureAwait ( false ) ) . Sum ( ) ;
7073 }
7174 }
7275
0 commit comments