@@ -11,145 +11,183 @@ namespace Open.ChannelExtensions.Tests
1111{
1212 public static class BatchTests
1313 {
14- [ Fact ]
14+ [ Fact ]
1515 public static async Task SimpleBatch2Test ( )
1616 {
17- var c = Channel . CreateUnbounded < int > ( new UnboundedChannelOptions { SingleReader = false , SingleWriter = false } ) ;
18- #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
19- Task . Run ( async ( ) =>
20- {
21- await Task . Delay ( 1000 ) ;
22- c . Writer . TryWrite ( 1 ) ;
23- c . Writer . TryWrite ( 2 ) ;
24- c . Writer . TryWrite ( 3 ) ;
25- c . Writer . TryWrite ( 4 ) ;
26- c . Writer . TryWrite ( 5 ) ;
27- c . Writer . TryWrite ( 6 ) ;
28- c . Writer . Complete ( ) ;
29- } ) ;
30- #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
17+ var c = Channel . CreateUnbounded < int > ( new UnboundedChannelOptions { SingleReader = false , SingleWriter = false } ) ;
18+ _ = Task . Run ( async ( ) =>
19+ {
20+ await Task . Delay ( 1000 ) ;
21+ c . Writer . TryWrite ( 1 ) ;
22+ c . Writer . TryWrite ( 2 ) ;
23+ c . Writer . TryWrite ( 3 ) ;
24+ c . Writer . TryWrite ( 4 ) ;
25+ c . Writer . TryWrite ( 5 ) ;
26+ c . Writer . TryWrite ( 6 ) ;
27+ c . Writer . Complete ( ) ;
28+ } ) ;
3129
30+ await c . Reader
31+ . Batch ( 2 )
32+ . ReadAllAsync ( async ( batch , i ) =>
33+ {
34+ switch ( i )
35+ {
36+ case 0 :
37+ Assert . Equal ( 1 , batch [ 0 ] ) ;
38+ Assert . Equal ( 2 , batch [ 1 ] ) ;
39+ break ;
40+ case 1 :
41+ Assert . Equal ( 3 , batch [ 0 ] ) ;
42+ Assert . Equal ( 4 , batch [ 1 ] ) ;
43+ break ;
44+ case 2 :
45+ Assert . Equal ( 5 , batch [ 0 ] ) ;
46+ Assert . Equal ( 6 , batch [ 1 ] ) ;
47+ break ;
48+ default :
49+ throw new Exception ( "Shouldn't arrive here." ) ;
50+ }
51+ await Task . Delay ( 500 ) ;
52+ } ) ;
53+
54+ }
55+
56+ [ Fact ]
57+ public static async Task Batch2TestWithDelay ( )
58+ {
59+ var c = Channel . CreateUnbounded < int > ( new UnboundedChannelOptions { SingleReader = false , SingleWriter = false } ) ;
60+ _ = Task . Run ( async ( ) =>
61+ {
62+ await Task . Delay ( 1000 ) ;
63+ c . Writer . TryWrite ( 1 ) ;
64+ c . Writer . TryWrite ( 2 ) ;
65+ c . Writer . TryWrite ( 3 ) ;
66+ c . Writer . TryWrite ( 4 ) ;
67+ c . Writer . TryWrite ( 5 ) ;
68+ c . Writer . TryWrite ( 6 ) ;
69+ } ) ;
3270
71+ using var tokenSource = new CancellationTokenSource ( ) ;
72+ var token = tokenSource . Token ;
3373 await c . Reader
34- . Batch ( 2 )
35- . ReadAllAsync ( async ( batch , i ) =>
36- {
37- switch ( i )
74+ . Batch ( 2 )
75+ . ReadAllAsync ( async ( batch , i ) =>
76+ {
77+ switch ( i )
3878 {
39- case 0 :
40- Assert . Equal ( 1 , batch [ 0 ] ) ;
41- Assert . Equal ( 2 , batch [ 1 ] ) ;
42- break ;
43- case 1 :
44- Assert . Equal ( 3 , batch [ 0 ] ) ;
45- Assert . Equal ( 4 , batch [ 1 ] ) ;
46- break ;
47- case 2 :
48- Assert . Equal ( 5 , batch [ 0 ] ) ;
49- Assert . Equal ( 6 , batch [ 1 ] ) ;
50- break ;
51- default :
52- throw new Exception ( "Shouldn't arrive here." ) ;
53- }
54- await Task . Delay ( 500 ) ;
55- } ) ;
79+ case 0 :
80+ Assert . Equal ( 1 , batch [ 0 ] ) ;
81+ Assert . Equal ( 2 , batch [ 1 ] ) ;
82+ break ;
83+ case 1 :
84+ Assert . Equal ( 3 , batch [ 0 ] ) ;
85+ Assert . Equal ( 4 , batch [ 1 ] ) ;
86+ _ = Task . Run ( async ( ) =>
87+ {
88+ await Task . Delay ( 60000 , token ) ;
89+ if ( ! token . IsCancellationRequested ) c . Writer . TryComplete ( new Exception ( "Should have completed successfuly." ) ) ;
90+ } ) ;
91+ break ;
92+ case 2 :
93+ Assert . Equal ( 5 , batch [ 0 ] ) ;
94+ Assert . Equal ( 6 , batch [ 1 ] ) ;
95+ tokenSource . Cancel ( ) ;
96+ c . Writer . Complete ( ) ;
97+ break ;
98+ default :
99+ throw new Exception ( "Shouldn't arrive here." ) ;
100+ }
101+ await Task . Delay ( 500 ) ;
102+ } ) ;
56103
57- }
104+ }
58105
59- [ Fact ]
60- public static async Task Batch2TestWithDelay ( )
61- {
62- var c = Channel . CreateUnbounded < int > ( new UnboundedChannelOptions { SingleReader = false , SingleWriter = false } ) ;
63- #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
64- Task . Run ( async ( ) =>
65- {
66- await Task . Delay ( 1000 ) ;
67- c . Writer . TryWrite ( 1 ) ;
68- c . Writer . TryWrite ( 2 ) ;
69- c . Writer . TryWrite ( 3 ) ;
70- c . Writer . TryWrite ( 4 ) ;
71- c . Writer . TryWrite ( 5 ) ;
72- c . Writer . TryWrite ( 6 ) ;
73- } ) ;
74- #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
75106
76- using var tokenSource = new CancellationTokenSource ( ) ;
77- var token = tokenSource . Token ;
78- await c . Reader
79- . Batch ( 2 )
80- . ReadAllAsync ( async ( batch , i ) =>
81- {
82- switch ( i )
83- {
84- case 0 :
85- Assert . Equal ( 1 , batch [ 0 ] ) ;
86- Assert . Equal ( 2 , batch [ 1 ] ) ;
87- break ;
88- case 1 :
89- Assert . Equal ( 3 , batch [ 0 ] ) ;
90- Assert . Equal ( 4 , batch [ 1 ] ) ;
91- _ = Task . Run ( async ( ) => {
92- await Task . Delay ( 60000 , token ) ;
93- if ( ! token . IsCancellationRequested ) c . Writer . TryComplete ( new Exception ( "Should have completed successfuly." ) ) ;
94- } ) ;
95- break ;
96- case 2 :
97- Assert . Equal ( 5 , batch [ 0 ] ) ;
98- Assert . Equal ( 6 , batch [ 1 ] ) ;
99- tokenSource . Cancel ( ) ;
100- c . Writer . Complete ( ) ;
101- break ;
102- default :
103- throw new Exception ( "Shouldn't arrive here." ) ;
104- }
105- await Task . Delay ( 500 ) ;
106- } ) ;
107+ [ Fact ]
108+ public static async Task ForceBatchTest ( )
109+ {
110+ var c = Channel . CreateUnbounded < int > ( new UnboundedChannelOptions { SingleReader = false , SingleWriter = false } ) ;
111+ _ = Task . Run ( async ( ) =>
112+ {
113+ await Task . Delay ( 1000 ) ;
114+ c . Writer . TryWrite ( 1 ) ;
115+ c . Writer . TryWrite ( 2 ) ;
116+ c . Writer . TryWrite ( 3 ) ;
117+ c . Writer . TryWrite ( 4 ) ;
118+ c . Writer . TryWrite ( 5 ) ;
119+ } ) ;
107120
108- }
121+ using var tokenSource = new CancellationTokenSource ( 10000 ) ;
122+ var reader = c . Reader . Batch ( 3 ) ;
123+ Assert . Equal ( 2 , await reader . ReadAllAsync ( tokenSource . Token , async ( batch , i ) =>
124+ {
125+ switch ( i )
126+ {
127+ case 0 :
128+ Assert . Equal ( 1 , batch [ 0 ] ) ;
129+ Assert . Equal ( 2 , batch [ 1 ] ) ;
130+ Assert . Equal ( 3 , batch [ 2 ] ) ;
131+ await Task . Delay ( 500 ) ;
132+ reader . ForceBatch ( ) ;
133+ break ;
134+ case 1 :
135+ Assert . Equal ( 2 , batch . Count ) ;
136+ Assert . Equal ( 4 , batch [ 0 ] ) ;
137+ Assert . Equal ( 5 , batch [ 1 ] ) ;
138+ c . Writer . Complete ( ) ;
139+ break ;
140+ default :
141+ throw new Exception ( "Shouldn't arrive here." ) ;
142+ }
143+ await Task . Delay ( 500 ) ;
144+ } ) ) ;
109145
146+ }
110147
111- [ Fact ]
112- public static async Task ForceBatchTest ( )
113- {
114- var c = Channel . CreateUnbounded < int > ( new UnboundedChannelOptions { SingleReader = false , SingleWriter = false } ) ;
115- #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
116- Task . Run ( async ( ) =>
117- {
118- await Task . Delay ( 1000 ) ;
119- c . Writer . TryWrite ( 1 ) ;
120- c . Writer . TryWrite ( 2 ) ;
121- c . Writer . TryWrite ( 3 ) ;
122- c . Writer . TryWrite ( 4 ) ;
123- c . Writer . TryWrite ( 5 ) ;
124- } ) ;
125- #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
148+ [ Fact ]
149+ public static async Task ForceBatchTest2 ( )
150+ {
151+ var c = Channel . CreateUnbounded < int > ( new UnboundedChannelOptions { SingleReader = false , SingleWriter = false } ) ;
152+ var reader = c . Reader . Batch ( 3 ) ;
153+ _ = Task . Run ( async ( ) =>
154+ {
155+ await Task . Delay ( 1000 ) ;
156+ c . Writer . TryWrite ( 1 ) ;
157+ c . Writer . TryWrite ( 2 ) ;
158+ c . Writer . TryWrite ( 3 ) ;
159+ c . Writer . TryWrite ( 4 ) ;
160+ c . Writer . TryWrite ( 5 ) ;
161+ Debug . WriteLine ( "Writing Complete." ) ;
126162
127- using var tokenSource = new CancellationTokenSource ( ) ;
128- var token = tokenSource . Token ;
129- var reader = c . Reader . Batch ( 3 ) ;
130- Assert . Equal ( 2 , await reader . ReadAllAsync ( async ( batch , i ) =>
131- {
132- switch ( i )
133- {
134- case 0 :
135- Assert . Equal ( 1 , batch [ 0 ] ) ;
136- Assert . Equal ( 2 , batch [ 1 ] ) ;
137- Assert . Equal ( 3 , batch [ 2 ] ) ;
138- await Task . Delay ( 500 ) ;
139- reader . ForceBatch ( ) ;
140- break ;
141- case 1 :
142- Assert . Equal ( 2 , batch . Count ) ;
143- Assert . Equal ( 4 , batch [ 0 ] ) ;
144- Assert . Equal ( 5 , batch [ 1 ] ) ;
145- c . Writer . Complete ( ) ;
146- break ;
147- default :
148- throw new Exception ( "Shouldn't arrive here." ) ;
149- }
150- await Task . Delay ( 500 ) ;
151- } ) ) ;
163+ await Task . Delay ( 1000 ) ;
164+ Assert . True ( reader . ForceBatch ( ) ) ;
165+ Debug . WriteLine ( "Batch Forced." ) ;
166+ } ) ;
152167
153- }
154- }
168+ using var tokenSource = new CancellationTokenSource ( 6000 ) ;
169+ Assert . Equal ( 2 , await reader . ReadAllAsync ( tokenSource . Token , async ( batch , i ) =>
170+ {
171+ switch ( i )
172+ {
173+ case 0 :
174+ Assert . Equal ( 1 , batch [ 0 ] ) ;
175+ Assert . Equal ( 2 , batch [ 1 ] ) ;
176+ Assert . Equal ( 3 , batch [ 2 ] ) ;
177+ Debug . WriteLine ( "First batch received." ) ;
178+ break ;
179+ case 1 :
180+ Assert . Equal ( 2 , batch . Count ) ;
181+ Assert . Equal ( 4 , batch [ 0 ] ) ;
182+ Assert . Equal ( 5 , batch [ 1 ] ) ;
183+ Debug . WriteLine ( "Second batch received." ) ;
184+ c . Writer . Complete ( ) ;
185+ break ;
186+ default :
187+ throw new Exception ( "Shouldn't arrive here." ) ;
188+ }
189+ await Task . Delay ( 500 ) ;
190+ } ) ) ;
191+ }
192+ }
155193}
0 commit comments