Skip to content

Commit 442c6a5

Browse files
committed
Initial cut at streaming
1 parent cf4aacf commit 442c6a5

File tree

3 files changed

+63
-48
lines changed

3 files changed

+63
-48
lines changed

sample/Serilog.Sinks.Splunk.Sample/Program.cs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Linq;
23
using System.Net;
34
using Splunk.Client;
45

@@ -17,16 +18,21 @@ static void Main(string[] args)
1718
var eco = new ViaEventCollectorWithExtendedOptions();
1819

1920
eco.Configure();
20-
2121
//ec.Configure();
22-
2322
//udp.Configure();
2423
//tcp.Configure();
2524

2625
Log.Information("Simulation running, press any key to exit.");
2726

2827
stub.Run();
2928

29+
var range = Enumerable.Range(0, 10000);
30+
31+
foreach (var i in range)
32+
{
33+
Log.Information("Say hello to {0}", i);
34+
}
35+
3036
Console.ReadLine();
3137
}
3238
}
@@ -39,7 +45,10 @@ public void Configure()
3945

4046
Log.Logger = new LoggerConfiguration()
4147
.WriteTo.LiterateConsole()
42-
.WriteTo.SplunkViaEventCollector("https://mysplunk:8088/services/collector", "685546AE-0278-4786-97C4-5971676D5D70",renderTemplate:false)
48+
.WriteTo.SplunkViaEventCollector("https://mysplunk:8088/services/collector", "685546AE-0278-4786-97C4-5971676D5D70",
49+
renderTemplate:false,
50+
batchSizeLimit:150,
51+
batchIntervalInSeconds:5)
4352
.Enrich.WithThreadId()
4453
.Enrich.WithProperty("Serilog.Sinks.Splunk.Sample", "ViaEventCollector")
4554
.MinimumLevel.Debug()

src/Serilog.Sinks.Splunk/Sinks/Splunk/EventCollectorRequest.cs

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,36 +4,24 @@
44

55
namespace Serilog.Sinks.Splunk
66
{
7-
internal class EventCollectorRequest : HttpRequestMessage
7+
internal class SplunkEvent
88
{
9-
internal EventCollectorRequest(string splunkHost, string jsonPayLoad)
10-
{
9+
private string _payload;
1110

12-
var stringContent = new StringContent(jsonPayLoad, Encoding.UTF8, "application/json");
13-
RequestUri = new Uri(splunkHost);
14-
Content = stringContent;
15-
Method = HttpMethod.Post;
16-
}
17-
18-
internal EventCollectorRequest(
19-
string splunkHost,
20-
string logEvent,
21-
string source,
22-
string sourceType,
23-
string host,
24-
string index)
11+
internal SplunkEvent(string logEvent, string source, string sourceType, string host, string index)
2512
{
13+
_payload = string.Empty;
2614

27-
var jsonPayLoad = @"{""event"":" + logEvent
28-
.Replace("\r\n", string.Empty);
15+
var jsonPayLoad = @"{""event"":" + logEvent
16+
.Replace("\r\n", string.Empty);
2917

3018
if (!string.IsNullOrWhiteSpace(source))
3119
{
3220
jsonPayLoad = jsonPayLoad + @",""source"":""" + source + @"""";
3321
}
3422
if (!string.IsNullOrWhiteSpace(sourceType))
3523
{
36-
jsonPayLoad = jsonPayLoad + @",""sourceType"":""" + sourceType + @"""";
24+
jsonPayLoad = jsonPayLoad + @",""sourceType"":""" + sourceType + @"""";
3725
}
3826
if (!string.IsNullOrWhiteSpace(host))
3927
{
@@ -44,7 +32,20 @@ internal EventCollectorRequest(
4432
jsonPayLoad = jsonPayLoad + @",""index"":""" + index + @"""";
4533
}
4634

47-
jsonPayLoad = jsonPayLoad + "}";
35+
jsonPayLoad = jsonPayLoad + "}";
36+
_payload = jsonPayLoad;
37+
}
38+
39+
public string Payload
40+
{
41+
get { return _payload; }
42+
}
43+
}
44+
45+
internal class EventCollectorRequest : HttpRequestMessage
46+
{
47+
internal EventCollectorRequest(string splunkHost, string jsonPayLoad)
48+
{
4849

4950
var stringContent = new StringContent(jsonPayLoad, Encoding.UTF8, "application/json");
5051
RequestUri = new Uri(splunkHost);

src/Serilog.Sinks.Splunk/Sinks/Splunk/EventCollectorSink.cs

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
12
// Copyright 2014 Serilog Contributors
23
//
34
// Licensed under the Apache License, Version 2.0 (the "License");
@@ -29,7 +30,7 @@ namespace Serilog.Sinks.Splunk
2930
/// <summary>
3031
/// A sink to log to the Event Collector available in Splunk 6.3
3132
/// </summary>
32-
public class EventCollectorSink : ILogEventSink
33+
public class EventCollectorSink : ILogEventSink, IDisposable
3334
{
3435
private readonly string _splunkHost;
3536
private readonly string _eventCollectorToken;
@@ -40,9 +41,8 @@ public class EventCollectorSink : ILogEventSink
4041
private readonly int _batchSizeLimitLimit;
4142
private readonly SplunkJsonFormatter _jsonFormatter;
4243
private readonly ConcurrentQueue<LogEvent> _queue;
43-
private readonly TimeSpan _batchInterval;
44-
private readonly EventCollectorClient _httpClient;
45-
44+
private readonly EventCollectorClient _httpClient;
45+
4646
/// <summary>
4747
/// Taken from Splunk.Logging.Common
4848
/// </summary>
@@ -76,13 +76,13 @@ public EventCollectorSink(
7676
_queue = new ConcurrentQueue<LogEvent>();
7777
_jsonFormatter = new SplunkJsonFormatter(renderMessage: true, formatProvider: formatProvider, renderTemplate: renderTemplate);
7878
_batchSizeLimitLimit = batchSizeLimit;
79-
_batchInterval = TimeSpan.FromSeconds(batchIntervalInSeconds);
79+
var batchInterval = TimeSpan.FromSeconds(batchIntervalInSeconds);
8080

8181
_httpClient = new EventCollectorClient(_eventCollectorToken);
8282

8383
//TODO: Implement handling similar to the Seq HTTP sink, including dispose flush
8484

85-
RepeatAction.OnInterval(_batchInterval, () => ProcessQueue().Wait(), new CancellationToken());
85+
RepeatAction.OnInterval(batchInterval, () => ProcessQueue().Wait(), new CancellationToken());
8686

8787
}
8888

@@ -153,39 +153,44 @@ private async Task ProcessQueue()
153153
if (events.Count == 0)
154154
return;
155155

156-
//TODO: Add streaming capability for performance.
157-
// - Stream writer needs to move to outer scope.
158-
// - Change Event Collector Request to only take string (or stream) of events and host
159-
// - New object to stream many events as collection
156+
string allEvents = string.Empty;
160157

161158
foreach (var logEvent in events)
162159
{
163160
var sw = new StringWriter();
164-
165161
_jsonFormatter.Format(logEvent, sw);
166-
var evt = sw.ToString();
167162

168-
var request = new EventCollectorRequest(_splunkHost, evt, _source, _sourceType, _host, _index);
169-
var response = await _httpClient.SendAsync(request);
163+
var serialisedEvent = sw.ToString();
164+
165+
var splunkEvent = new SplunkEvent(serialisedEvent, _source, _sourceType, _host, _index);
166+
167+
allEvents = $"{allEvents}{splunkEvent.Payload}";
168+
169+
}
170+
var request = new EventCollectorRequest(_splunkHost, allEvents);
171+
172+
var response = await _httpClient.SendAsync(request);
170173

171-
if (response.IsSuccessStatusCode) { //Do Nothing?
174+
if (response.IsSuccessStatusCode)
175+
{ //Do Nothing?
176+
}
177+
else
178+
{
179+
//Application Errors sent via HTTP Event Collector
180+
if (HttpEventCollectorApplicationErrors.Any(x => x == response.StatusCode))
181+
{
182+
SelfLog.WriteLine("A status code of {0} was received when attempting to send to {1}. The event has been discarded and will not be placed back in the queue.", response.StatusCode.ToString(), _splunkHost);
172183
}
173184
else
174185
{
175-
//Application Errors sent via HTTP Event Collector
176-
if (HttpEventCollectorApplicationErrors.Any(x => x == response.StatusCode))
177-
{
178-
SelfLog.WriteLine("A status code of {0} was received when attempting to send to {1}. The event has been discarded and will not be placed back in the queue.", response.StatusCode.ToString(), _splunkHost);
179-
}
180-
else
181-
{
182-
//Put the item back in the queue & retry on next go
183-
SelfLog.WriteLine("A status code of {0} was received when attempting to send to {1}. The event has been placed back in the queue", response.StatusCode.ToString(), _splunkHost);
186+
//Put the item back in the queue & retry on next go
187+
SelfLog.WriteLine("A status code of {0} was received when attempting to send to {1}. The event has been placed back in the queue", response.StatusCode.ToString(), _splunkHost);
184188

189+
foreach (var logEvent in events)
190+
{
185191
_queue.Enqueue(logEvent);
186192
}
187193
}
188-
189194
}
190195
} while (true);
191196
}

0 commit comments

Comments
 (0)