File tree Expand file tree Collapse file tree 2 files changed +49
-10
lines changed
Open.ChannelExtensions.Tests Expand file tree Collapse file tree 2 files changed +49
-10
lines changed Original file line number Diff line number Diff line change 11using System ;
2+ using System . Collections . Generic ;
23using System . Diagnostics ;
34using System . Linq ;
45using System . Threading . Tasks ;
@@ -27,15 +28,41 @@ static async Task Main()
2728 }
2829
2930 {
30- Console . WriteLine ( "Standard Channel operation test..." ) ;
31+ Console . WriteLine ( "Batch + join test 1 ..." ) ;
3132 var sw = Stopwatch . StartNew ( ) ;
32- var total = await Enumerable
33- . Repeat ( ( Func < int , ValueTask < int > > ) Delay , repeat )
34- . Select ( ( t , i ) => t ( i ) )
35- . ToChannelAsync ( singleReader : true )
36- . ReadAll ( Dummy ) ;
33+ var range = Enumerable
34+ . Range ( 0 , 2000000 ) ;
35+
36+ var result = new List < int > ( 2000000 ) ;
37+
38+ var total = await range
39+ . ToChannel ( )
40+ . Batch ( 5000 )
41+ . Join ( )
42+ . ReadAll ( i=> result . Add ( i ) ) ;
43+
3744 sw . Stop ( ) ;
38- Debug . Assert ( total == repeat ) ;
45+ Debug . Assert ( result . SequenceEqual ( range ) ) ;
46+ Console . WriteLine ( sw . Elapsed ) ;
47+ Console . WriteLine ( ) ;
48+ }
49+
50+ {
51+ Console . WriteLine ( "Batch + join test 2..." ) ;
52+ var sw = Stopwatch . StartNew ( ) ;
53+ var range = Enumerable
54+ . Range ( 0 , 2000000 ) ;
55+
56+ var result = new List < int > ( 2000000 ) ;
57+
58+ var total = await range
59+ . ToChannel ( )
60+ . Batch ( 50 )
61+ . Join ( )
62+ . ReadAll ( i => result . Add ( i ) ) ;
63+
64+ sw . Stop ( ) ;
65+ Debug . Assert ( result . SequenceEqual ( range ) ) ;
3966 Console . WriteLine ( sw . Elapsed ) ;
4067 Console . WriteLine ( ) ;
4168 }
Original file line number Diff line number Diff line change @@ -95,8 +95,20 @@ async ValueTask<bool> WaitCore()
9595 }
9696 }
9797
98- public static ChannelReader < T > Join < TList , T > ( this ChannelReader < TList > source )
99- where TList : IEnumerable < T >
100- => new JoiningChannelReader < TList , T > ( source ) ;
98+ public static ChannelReader < T > Join < T > ( this ChannelReader < IEnumerable < T > > source )
99+ => new JoiningChannelReader < IEnumerable < T > , T > ( source ) ;
100+
101+ public static ChannelReader < T > Join < T > ( this ChannelReader < ICollection < T > > source )
102+ => new JoiningChannelReader < ICollection < T > , T > ( source ) ;
103+
104+ public static ChannelReader < T > Join < T > ( this ChannelReader < IList < T > > source )
105+ => new JoiningChannelReader < IList < T > , T > ( source ) ;
106+
107+ public static ChannelReader < T > Join < T > ( this ChannelReader < List < T > > source )
108+ => new JoiningChannelReader < List < T > , T > ( source ) ;
109+
110+ public static ChannelReader < T > Join < T > ( this ChannelReader < T [ ] > source )
111+ => new JoiningChannelReader < T [ ] , T > ( source ) ;
112+
101113 }
102114}
You can’t perform that action at this time.
0 commit comments