1313// limitations under the License.
1414
1515using System ;
16- using System . Collections . Concurrent ;
1716using System . Collections . Generic ;
1817using System . IO ;
1918using System . Linq ;
2019using System . Net ;
2120using System . Net . Http ;
22- using System . Threading ;
2321using System . Threading . Tasks ;
24- using Serilog . Core ;
2522using Serilog . Debugging ;
2623using Serilog . Events ;
2724using Serilog . Formatting ;
25+ using Serilog . Sinks . PeriodicBatching ;
2826
2927namespace Serilog . Sinks . Splunk
3028{
3129 /// <summary>
3230 /// A sink to log to the Event Collector available in Splunk 6.3
3331 /// </summary>
34- public class EventCollectorSink : ILogEventSink , IDisposable
32+ public class EventCollectorSink : PeriodicBatchingSink
3533 {
3634 private readonly string _splunkHost ;
3735 private readonly string _uriPath ;
38- private readonly int _batchSizeLimitLimit ;
3936 private readonly ITextFormatter _jsonFormatter ;
40- private readonly ConcurrentQueue < LogEvent > _queue ;
4137 private readonly EventCollectorClient _httpClient ;
4238
39+
4340 /// <summary>
4441 /// Taken from Splunk.Logging.Common
4542 /// </summary>
@@ -115,6 +112,7 @@ public EventCollectorSink(
115112 messageHandler )
116113 {
117114 }
115+
118116 /// <summary>
119117 /// Creates a new instance of the sink with Customfields
120118 /// </summary>
@@ -152,12 +150,11 @@ public EventCollectorSink(
152150 uriPath ,
153151 batchIntervalInSeconds ,
154152 batchSizeLimit ,
155- new SplunkJsonFormatter ( renderTemplate , formatProvider , source , sourceType , host , index , fields ) ,
153+ new SplunkJsonFormatter ( renderTemplate , formatProvider , source , sourceType , host , index , fields ) ,
156154 messageHandler )
157155 {
158156 }
159157
160-
161158 /// <summary>
162159 /// Creates a new instance of the sink
163160 /// </summary>
@@ -176,67 +173,25 @@ public EventCollectorSink(
176173 int batchSizeLimit ,
177174 ITextFormatter jsonFormatter ,
178175 HttpMessageHandler messageHandler = null )
176+ : base ( batchSizeLimit , TimeSpan . FromSeconds ( batchIntervalInSeconds ) )
179177 {
180178 _uriPath = uriPath ;
181179 _splunkHost = splunkHost ;
182- _queue = new ConcurrentQueue < LogEvent > ( ) ;
183180 _jsonFormatter = jsonFormatter ;
184- _batchSizeLimitLimit = batchSizeLimit ;
185181
186- var batchInterval = TimeSpan . FromSeconds ( batchIntervalInSeconds ) ;
187182 _httpClient = messageHandler != null
188183 ? new EventCollectorClient ( eventCollectorToken , messageHandler )
189184 : new EventCollectorClient ( eventCollectorToken ) ;
190-
191- var cancellationToken = new CancellationToken ( ) ;
192-
193- RepeatAction . OnInterval (
194- batchInterval ,
195- async ( ) => await ProcessQueue ( ) ,
196- cancellationToken ) ;
197185 }
198186
199187 /// <summary>
200- /// Emits the provided log event from a sink
188+ /// Emit a batch of log events, running asynchronously.
201189 /// </summary>
202- /// <param name="logEvent"></param>
203- public void Emit ( LogEvent logEvent )
204- {
205- if ( logEvent == null ) throw new ArgumentNullException ( nameof ( logEvent ) ) ;
206-
207- _queue . Enqueue ( logEvent ) ;
208- }
209-
210- private async Task ProcessQueue ( )
211- {
212- try
213- {
214- do
215- {
216- var count = 0 ;
217- var events = new Queue < LogEvent > ( ) ;
218- LogEvent next ;
219-
220- while ( count < _batchSizeLimitLimit && _queue . TryDequeue ( out next ) )
221- {
222- count ++ ;
223- events . Enqueue ( next ) ;
224- }
225-
226- if ( events . Count == 0 )
227- return ;
228-
229- await Send ( events ) ;
230-
231- } while ( true ) ;
232- }
233- catch ( Exception ex )
234- {
235- SelfLog . WriteLine ( "Exception while emitting batch from {0}: {1}" , this , ex ) ;
236- }
237- }
238-
239- private async Task Send ( IEnumerable < LogEvent > events )
190+ /// <param name="events">The events to emit.</param>
191+ /// <remarks>
192+ /// Override either <see cref="PeriodicBatchingSink.EmitBatch" /> or <see cref="PeriodicBatchingSink.EmitBatchAsync" />, not both.
193+ /// </remarks>
194+ protected override async Task EmitBatchAsync ( IEnumerable < LogEvent > events )
240195 {
241196 var allEvents = new StringWriter ( ) ;
242197
@@ -248,56 +203,22 @@ private async Task Send(IEnumerable<LogEvent> events)
248203 var request = new EventCollectorRequest ( _splunkHost , allEvents . ToString ( ) , _uriPath ) ;
249204 var response = await _httpClient . SendAsync ( request ) . ConfigureAwait ( false ) ;
250205
251- if ( response . IsSuccessStatusCode )
252- {
253- //Do Nothing?
254- }
255- else
206+ if ( ! response . IsSuccessStatusCode )
256207 {
257208 //Application Errors sent via HTTP Event Collector
258209 if ( HttpEventCollectorApplicationErrors . Any ( x => x == response . StatusCode ) )
259210 {
211+ // By not throwing an exception here the PeriodicBatchingSink will assume the batch succeeded and not send it again.
260212 SelfLog . WriteLine (
261213 "A status code of {0} was received when attempting to send to {1}. The event has been discarded and will not be placed back in the queue." ,
262214 response . StatusCode . ToString ( ) , _splunkHost ) ;
263215 }
264216 else
265217 {
266- //Put the item back in the queue & retry on next go
267- SelfLog . WriteLine (
268- "A status code of {0} was received when attempting to send to {1}. The event has been placed back in the queue" ,
269- response . StatusCode . ToString ( ) , _splunkHost ) ;
270-
271- foreach ( var logEvent in events )
272- {
273- _queue . Enqueue ( logEvent ) ;
274- }
218+ // EnsureSuccessStatusCode will throw an exception and the PeriodicBatchingSink will catch/log the exception and retry the batch.
219+ response . EnsureSuccessStatusCode ( ) ;
275220 }
276221 }
277222 }
278-
279- /// <inheritdoc/>
280- public void Dispose ( )
281- {
282- Dispose ( true ) ;
283- }
284-
285- /// <inheritdoc/>
286- protected virtual void Dispose ( bool disposing )
287- {
288- if ( ! disposing ) return ;
289-
290- var remainingEvents = new List < LogEvent > ( ) ;
291-
292- while ( ! _queue . IsEmpty )
293- {
294- LogEvent next ;
295- _queue . TryDequeue ( out next ) ;
296- remainingEvents . Add ( next ) ;
297- }
298-
299- Send ( remainingEvents ) . Wait ( ) ;
300- _httpClient . Dispose ( ) ;
301- }
302223 }
303224}
0 commit comments