@@ -41,7 +41,7 @@ public class EventCollectorSink : ILogEventSink, IDisposable
41
41
private readonly int _batchSizeLimitLimit ;
42
42
private readonly SplunkJsonFormatter _jsonFormatter ;
43
43
private readonly ConcurrentQueue < LogEvent > _queue ;
44
- private readonly EventCollectorClient _httpClient ;
44
+ private readonly EventCollectorClient _httpClient ;
45
45
46
46
/// <summary>
47
47
/// Taken from Splunk.Logging.Common
@@ -69,21 +69,24 @@ public EventCollectorSink(
69
69
int batchSizeLimit = 100 ,
70
70
IFormatProvider formatProvider = null ,
71
71
bool renderTemplate = true
72
- )
72
+ )
73
73
{
74
74
_splunkHost = splunkHost ;
75
75
_eventCollectorToken = eventCollectorToken ;
76
76
_queue = new ConcurrentQueue < LogEvent > ( ) ;
77
77
_jsonFormatter = new SplunkJsonFormatter ( renderMessage : true , formatProvider : formatProvider , renderTemplate : renderTemplate ) ;
78
78
_batchSizeLimitLimit = batchSizeLimit ;
79
+
79
80
var batchInterval = TimeSpan . FromSeconds ( batchIntervalInSeconds ) ;
80
81
81
82
_httpClient = new EventCollectorClient ( _eventCollectorToken ) ;
82
83
83
- //TODO: Implement handling similar to the Seq HTTP sink, including dispose flush
84
-
85
- RepeatAction . OnInterval ( batchInterval , ( ) => ProcessQueue ( ) . Wait ( ) , new CancellationToken ( ) ) ;
84
+ var cancellationToken = new CancellationToken ( ) ;
86
85
86
+ RepeatAction . OnInterval (
87
+ batchInterval ,
88
+ async ( ) => await ProcessQueue ( ) ,
89
+ cancellationToken ) ;
87
90
}
88
91
89
92
/// <summary>
@@ -153,56 +156,84 @@ private async Task ProcessQueue()
153
156
if ( events . Count == 0 )
154
157
return ;
155
158
156
- string allEvents = string . Empty ;
159
+ await Send ( events ) ;
157
160
158
- foreach ( var logEvent in events )
159
- {
160
- var sw = new StringWriter ( ) ;
161
- _jsonFormatter . Format ( logEvent , sw ) ;
161
+ } while ( true ) ;
162
+ }
163
+ catch ( Exception ex )
164
+ {
165
+ SelfLog . WriteLine ( "Exception while emitting batch from {0}: {1}" , this , ex ) ;
166
+ }
167
+ }
162
168
163
- var serialisedEvent = sw . ToString ( ) ;
164
-
165
- var splunkEvent = new SplunkEvent ( serialisedEvent , _source , _sourceType , _host , _index ) ;
169
+ private async Task Send ( IEnumerable < LogEvent > events )
170
+ {
171
+ string allEvents = string . Empty ;
166
172
167
- allEvents = $ "{ allEvents } { splunkEvent . Payload } ";
173
+ foreach ( var logEvent in events )
174
+ {
175
+ var sw = new StringWriter ( ) ;
176
+ _jsonFormatter . Format ( logEvent , sw ) ;
168
177
169
- }
170
- var request = new EventCollectorRequest ( _splunkHost , allEvents ) ;
171
-
172
- var response = await _httpClient . SendAsync ( request ) ;
178
+ var serialisedEvent = sw . ToString ( ) ;
173
179
174
- if ( response . IsSuccessStatusCode )
175
- { //Do Nothing?
176
- }
177
- else
178
- {
179
- //Application Errors sent via HTTP Event Collector
180
- if ( HttpEventCollectorApplicationErrors . Any ( x => x == response . StatusCode ) )
181
- {
182
- SelfLog . WriteLine ( "A status code of {0} was received when attempting to send to {1}. The event has been discarded and will not be placed back in the queue." , response . StatusCode . ToString ( ) , _splunkHost ) ;
183
- }
184
- else
185
- {
186
- //Put the item back in the queue & retry on next go
187
- SelfLog . WriteLine ( "A status code of {0} was received when attempting to send to {1}. The event has been placed back in the queue" , response . StatusCode . ToString ( ) , _splunkHost ) ;
188
-
189
- foreach ( var logEvent in events )
190
- {
191
- _queue . Enqueue ( logEvent ) ;
192
- }
193
- }
194
- }
195
- } while ( true ) ;
180
+ var splunkEvent = new SplunkEvent ( serialisedEvent , _source , _sourceType , _host , _index ) ;
181
+
182
+ allEvents = $ "{ allEvents } { splunkEvent . Payload } ";
196
183
}
197
- catch ( Exception ex )
184
+ var request = new EventCollectorRequest ( _splunkHost , allEvents ) ;
185
+
186
+ var response = await _httpClient . SendAsync ( request ) ;
187
+
188
+ if ( response . IsSuccessStatusCode )
198
189
{
199
- SelfLog . WriteLine ( "Exception while emitting batch from {0}: {1}" , this , ex ) ;
190
+ //Do Nothing?
191
+ }
192
+ else
193
+ {
194
+ //Application Errors sent via HTTP Event Collector
195
+ if ( HttpEventCollectorApplicationErrors . Any ( x => x == response . StatusCode ) )
196
+ {
197
+ SelfLog . WriteLine (
198
+ "A status code of {0} was received when attempting to send to {1}. The event has been discarded and will not be placed back in the queue." ,
199
+ response . StatusCode . ToString ( ) , _splunkHost ) ;
200
+ }
201
+ else
202
+ {
203
+ //Put the item back in the queue & retry on next go
204
+ SelfLog . WriteLine (
205
+ "A status code of {0} was received when attempting to send to {1}. The event has been placed back in the queue" ,
206
+ response . StatusCode . ToString ( ) , _splunkHost ) ;
207
+
208
+ foreach ( var logEvent in events )
209
+ {
210
+ _queue . Enqueue ( logEvent ) ;
211
+ }
212
+ }
200
213
}
201
214
}
202
215
203
216
/// <inheritdoc/>
204
217
public void Dispose ( )
205
218
{
219
+ Dispose ( true ) ;
220
+ }
221
+
222
+ /// <inheritdoc/>
223
+ protected virtual void Dispose ( bool disposing )
224
+ {
225
+ if ( ! disposing ) return ;
226
+
227
+ var remainingEvents = new List < LogEvent > ( ) ;
228
+
229
+ while ( ! _queue . IsEmpty )
230
+ {
231
+ LogEvent next ;
232
+ _queue . TryDequeue ( out next ) ;
233
+ remainingEvents . Add ( next ) ;
234
+ }
235
+
236
+ Send ( remainingEvents ) . Wait ( ) ;
206
237
_httpClient . Dispose ( ) ;
207
238
}
208
239
}
0 commit comments