@@ -84,44 +84,73 @@ public async Task WhenEmitMultipleTimes_ThenRelaysToInnerSink()
84
84
}
85
85
86
86
[ Fact ]
87
- public async Task WhenQueueFull_ThenDropsEvents ( )
87
+ public async Task GivenDefaultConfig_WhenQueueOverCapacity_DoesNotBlock ( )
88
88
{
89
- using ( var sink = new BackgroundWorkerSink ( _logger , 1 , false ) )
89
+ var batchTiming = Stopwatch . StartNew ( ) ;
90
+ using ( var sink = new BackgroundWorkerSink ( _logger , 1 , blockWhenFull : false /*default*/ ) )
90
91
{
91
- // Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity
92
- // after the first event is popped
93
- _innerSink . DelayEmit = TimeSpan . FromMilliseconds ( 300 ) ;
94
-
95
- var events = new List < LogEvent >
96
- {
97
- CreateEvent ( ) ,
98
- CreateEvent ( ) ,
99
- CreateEvent ( ) ,
100
- CreateEvent ( ) ,
101
- CreateEvent ( )
102
- } ;
103
- events . ForEach ( e =>
92
+ // Cause a delay when emmitting to the inner sink, allowing us to easily fill the queue to capacity
93
+ // while the first event is being propagated
94
+ var acceptInterval = TimeSpan . FromMilliseconds ( 500 ) ;
95
+ _innerSink . DelayEmit = acceptInterval ;
96
+ var tenSecondsWorth = 10_000 / acceptInterval . TotalMilliseconds + 1 ;
97
+ for ( int i = 0 ; i < tenSecondsWorth ; i ++ )
104
98
{
105
- var sw = Stopwatch . StartNew ( ) ;
106
- sink . Emit ( e ) ;
107
- sw . Stop ( ) ;
99
+ var emissionTiming = Stopwatch . StartNew ( ) ;
100
+ sink . Emit ( CreateEvent ( ) ) ;
101
+ emissionTiming . Stop ( ) ;
108
102
109
- Assert . True ( sw . ElapsedMilliseconds < 200 , "Should not block the caller when the queue is full" ) ;
110
- } ) ;
103
+ // Should not block the caller when the queue is full
104
+ Assert . InRange ( emissionTiming . ElapsedMilliseconds , 0 , 200 ) ;
105
+ }
111
106
112
- // If we *weren't* dropped events, the delay in the inner sink would mean the 5 events would take
113
- // at least 15 seconds to process
114
- await Task . Delay ( TimeSpan . FromSeconds ( 2 ) ) ;
107
+ // Allow at least one to propagate
108
+ await Task . Delay ( TimeSpan . FromSeconds ( 1 ) ) . ConfigureAwait ( false ) ;
109
+ }
110
+ // Sanity check the overall timing
111
+ batchTiming . Stop ( ) ;
112
+ // Need to add a significant fudge factor as AppVeyor build can result in `await` taking quite some time
113
+ Assert . InRange ( batchTiming . ElapsedMilliseconds , 950 , 2050 ) ;
114
+ }
115
+
116
+ [ Fact ]
117
+ public async Task GivenDefaultConfig_WhenRequestsOverCapacity_ThenDropsEventsAndRecovers ( )
118
+ {
119
+ using ( var sink = new BackgroundWorkerSink ( _logger , 1 , blockWhenFull : false /*default*/ ) )
120
+ {
121
+ var acceptInterval = TimeSpan . FromMilliseconds ( 200 ) ;
122
+ _innerSink . DelayEmit = acceptInterval ;
115
123
116
- // Events should be dropped
117
- Assert . Equal ( 2 , _innerSink . Events . Count ) ;
124
+ for ( int i = 0 ; i < 2 ; i ++ )
125
+ {
126
+ sink . Emit ( CreateEvent ( ) ) ;
127
+ sink . Emit ( CreateEvent ( ) ) ;
128
+ await Task . Delay ( acceptInterval ) ;
129
+ sink . Emit ( CreateEvent ( ) ) ;
130
+ }
131
+ // Wait for the buffer and propagation to complete
132
+ await Task . Delay ( TimeSpan . FromSeconds ( 1 ) ) ;
133
+ // Now verify things are back to normal; emit an event...
134
+ var finalEvent = CreateEvent ( ) ;
135
+ sink . Emit ( finalEvent ) ;
136
+ // ... give adequate time for it to be guaranteed to have percolated through
137
+ await Task . Delay ( TimeSpan . FromSeconds ( 1 ) ) ;
138
+
139
+ // At least one of the preceding events should not have made it through
140
+ var propagatedExcludingFinal =
141
+ from e in _innerSink . Events
142
+ where ! Object . ReferenceEquals ( finalEvent , e )
143
+ select e ;
144
+ Assert . InRange ( 2 , 2 * 3 / 2 - 1 , propagatedExcludingFinal . Count ( ) ) ;
145
+ // Final event should have made it through
146
+ Assert . Contains ( _innerSink . Events , x => Object . ReferenceEquals ( finalEvent , x ) ) ;
118
147
}
119
148
}
120
149
121
150
[ Fact ]
122
- public async Task WhenQueueFull_ThenBlocks ( )
151
+ public async Task GivenConfiguredToBlock_WhenQueueFilled_ThenBlocks ( )
123
152
{
124
- using ( var sink = new BackgroundWorkerSink ( _logger , 1 , true ) )
153
+ using ( var sink = new BackgroundWorkerSink ( _logger , 1 , blockWhenFull : true ) )
125
154
{
126
155
// Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity
127
156
// after the first event is popped
0 commit comments