11using System ;
2- using System . Collections . Generic ;
3- using System . Linq ;
4- using System . Runtime . InteropServices ;
5- using System . Text ;
62using System . Threading ;
73using System . Threading . Tasks ;
84using BitFaster . Caching . Buffers ;
95using FluentAssertions ;
106using Xunit ;
11-
7+ using Xunit . Abstractions ;
8+
129namespace BitFaster . Caching . UnitTests . Buffers
1310{
1411 public class MpscBoundedBufferTests
1512 {
16- private readonly MpscBoundedBuffer < string > buffer = new MpscBoundedBuffer < string > ( 10 ) ;
13+ private readonly ITestOutputHelper testOutputHelper ;
14+ private static readonly TimeSpan Timeout = TimeSpan . FromSeconds ( 30 ) ;
15+ private readonly MpscBoundedBuffer < string > buffer = new MpscBoundedBuffer < string > ( 10 ) ;
16+
17+ public MpscBoundedBufferTests ( ITestOutputHelper testOutputHelper )
18+ {
19+ this . testOutputHelper = testOutputHelper ;
20+ }
1721
1822 [ Fact ]
1923 public void WhenSizeIsLessThan1CtorThrows ( )
@@ -173,9 +177,11 @@ await Threaded.Run(4, () =>
173177 } ) ;
174178 }
175179
176- [ Fact ( Timeout = 5000 ) ]
180+ [ Fact ]
177181 public async Task WhileBufferIsFilledItemsCanBeTaken ( )
178182 {
183+ this . testOutputHelper . WriteLine ( $ "ProcessorCount={ Environment . ProcessorCount } .") ;
184+
179185 var buffer = new MpscBoundedBuffer < string > ( 1024 ) ;
180186
181187 var fill = Threaded . Run ( 4 , ( ) =>
@@ -184,34 +190,42 @@ public async Task WhileBufferIsFilledItemsCanBeTaken()
184190 int count = 0 ;
185191 while ( count < 256 )
186192 {
187- while ( true )
188- {
189- if ( buffer . TryAdd ( "hello" ) == BufferStatus . Success )
190- {
191- break ;
192- }
193- spin . SpinOnce ( ) ;
193+ while ( true )
194+ {
195+ if ( buffer . TryAdd ( "hello" ) == BufferStatus . Success )
196+ {
197+ break ;
198+ }
199+ spin . SpinOnce ( ) ;
194200 }
195201 count ++ ;
196202 }
197203 } ) ;
198204
199- int taken = 0 ;
200-
201- while ( taken < 1024 )
202- {
203- if ( buffer . TryTake ( out var _ ) == BufferStatus . Success )
204- {
205- taken ++ ;
205+ var take = Task . Run ( ( ) =>
206+ {
207+ int taken = 0 ;
208+
209+ while ( taken < 1024 )
210+ {
211+ var spin = new SpinWait ( ) ;
212+ if ( buffer . TryTake ( out var _ ) == BufferStatus . Success )
213+ {
214+ taken ++ ;
215+ }
216+ spin . SpinOnce ( ) ;
206217 }
207- }
218+ } ) ;
208219
209- await fill ;
220+ await fill . TimeoutAfter ( Timeout , $ "fill timed out") ;
221+ await take . TimeoutAfter ( Timeout , "take timed out" ) ;
210222 }
211223
212- [ Fact ( Timeout = 5000 ) ]
224+ [ Fact ]
213225 public async Task WhileBufferIsFilledBufferCanBeDrained ( )
214226 {
227+ this . testOutputHelper . WriteLine ( $ "ProcessorCount={ Environment . ProcessorCount } .") ;
228+
215229 var buffer = new MpscBoundedBuffer < string > ( 1024 ) ;
216230
217231 var fill = Threaded . Run ( 4 , ( ) =>
@@ -232,15 +246,19 @@ public async Task WhileBufferIsFilledBufferCanBeDrained()
232246 }
233247 } ) ;
234248
235- int drained = 0 ;
236- var drainBuffer = new ArraySegment < string > ( new string [ 1024 ] ) ;
237-
238- while ( drained < 1024 )
239- {
240- drained += buffer . DrainTo ( drainBuffer ) ;
241- }
249+ var drain = Task . Run ( ( ) =>
250+ {
251+ int drained = 0 ;
252+ var drainBuffer = new ArraySegment < string > ( new string [ 1024 ] ) ;
253+
254+ while ( drained < 1024 )
255+ {
256+ drained += buffer . DrainTo ( drainBuffer ) ;
257+ }
258+ } ) ;
242259
243- await fill ;
260+ await fill . TimeoutAfter ( Timeout , "fill timed out" ) ;
261+ await drain . TimeoutAfter ( Timeout , "drain timed out" ) ;
244262 }
245263 }
246264}
0 commit comments