1
1
using System ;
2
2
using System . Collections . Concurrent ;
3
3
using System . Collections . Generic ;
4
+ using System . Threading ;
4
5
using System . Threading . Tasks ;
5
- using System . Timers ;
6
6
7
7
namespace Splunk
8
8
{
@@ -18,7 +18,6 @@ public class BatchManager
18
18
{
19
19
readonly ConcurrentBag < object > events ;
20
20
readonly uint batchSizeCount ;
21
- readonly uint batchIntervalInMilliseconds ;
22
21
readonly Timer timer ;
23
22
readonly Action < List < object > > emitAction ;
24
23
@@ -31,57 +30,44 @@ public class BatchManager
31
30
/// <param name="batchSizeCount">Batch size count.</param>
32
31
/// <param name="batchIntervalInMilliseconds">Batch interval in milliseconds.</param>
33
32
/// <param name="emitAction">Emit action to be invoked at Emit process.</param>
34
- public BatchManager ( uint batchSizeCount , uint batchIntervalInMilliseconds , Action < List < object > > emitAction )
33
+ public BatchManager ( uint batchSizeCount , int batchIntervalInMilliseconds , Action < List < object > > emitAction )
35
34
{
36
35
events = new ConcurrentBag < object > ( ) ;
37
36
this . batchSizeCount = batchSizeCount ;
38
- this . batchIntervalInMilliseconds = batchIntervalInMilliseconds ;
39
37
40
38
if ( batchIntervalInMilliseconds > 0 )
41
- {
42
- timer = new Timer ( batchIntervalInMilliseconds ) ;
43
- timer . AutoReset = false ;
44
- timer . Enabled = true ;
45
- timer . Elapsed += TimerTick ;
46
- timer . Start ( ) ;
47
- }
39
+ timer = new Timer ( EmitTimeChek , null , 0 , batchIntervalInMilliseconds ) ;
48
40
49
41
this . emitAction = emitAction ;
50
42
}
51
43
52
- void TimerTick ( object sender , ElapsedEventArgs e )
44
+ void EmitTimeChek ( object state )
53
45
{
54
- Task
55
- . Factory
56
- . StartNew ( ( ) =>
57
- {
58
- if ( events . Count > 0 )
59
- Emit ( ) ;
60
- } ) . ContinueWith ( task =>
61
- {
62
- timer ? . Start ( ) ;
63
- } ) ;
46
+ if ( events . Count > 0 )
47
+ Emit ( ) ;
64
48
}
65
49
66
50
void Emit ( )
67
51
{
68
- bool continueExtraction = true ;
69
- List < object > emitEvents = new List < object > ( ) ;
70
- while ( continueExtraction )
71
- {
72
- if ( events . Count == 0 )
73
- continueExtraction = false ;
74
- else
52
+ Task . Factory . StartNew ( ( ) => {
53
+ bool continueExtraction = true ;
54
+ List < object > emitEvents = new List < object > ( ) ;
55
+ while ( continueExtraction )
75
56
{
76
- events . TryTake ( out object item ) ;
77
- if ( item != null )
78
- emitEvents . Add ( item ) ;
79
- if ( events . Count == 0 || emitEvents . Count >= batchSizeCount )
57
+ if ( events . Count == 0 )
80
58
continueExtraction = false ;
59
+ else
60
+ {
61
+ events . TryTake ( out object item ) ;
62
+ if ( item != null )
63
+ emitEvents . Add ( item ) ;
64
+ if ( events . Count == 0 || emitEvents . Count >= batchSizeCount )
65
+ continueExtraction = false ;
66
+ }
81
67
}
82
- }
83
- if ( emitEvents . Count > 0 )
84
- emitAction ? . Invoke ( emitEvents ) ;
68
+ if ( emitEvents . Count > 0 )
69
+ emitAction ? . Invoke ( emitEvents ) ;
70
+ } ) ;
85
71
}
86
72
87
73
/// <summary>
0 commit comments