Skip to content

Commit 2e42383

Browse files
authored
Merge pull request #9 from nblumhardt/f-captiveworker
Captive thread and BlockingCollection (WIP)
2 parents 7f951c0 + 6cd5ff7 commit 2e42383

File tree

11 files changed

+254
-6
lines changed

11 files changed

+254
-6
lines changed

src/Serilog.Sinks.Async.PerformanceTests/Benchmarks.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ public class Benchmarks
1010
public void Benchmark()
1111
{
1212
BenchmarkRunner.Run<ThroughputBenchmark>();
13+
BenchmarkRunner.Run<LatencyBenchmark>();
1314
}
1415
}
1516
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
using System;
2+
using BenchmarkDotNet.Attributes;
3+
using Serilog.Core;
4+
using Serilog.Events;
5+
using Serilog.Parsing;
6+
7+
namespace Serilog.Sinks.Async.PerformanceTests
8+
{
9+
public class LatencyBenchmark
10+
{
11+
private const int Count = 10000;
12+
13+
private readonly LogEvent _evt = new LogEvent(DateTimeOffset.Now, LogEventLevel.Information, null,
14+
new MessageTemplate(new[] {new TextToken("Hello")}), new LogEventProperty[0]);
15+
16+
private Logger _syncLogger, _asyncLogger, _async2Logger;
17+
18+
[Setup]
19+
public void Reset()
20+
{
21+
_syncLogger?.Dispose();
22+
_asyncLogger?.Dispose();
23+
_async2Logger?.Dispose();
24+
25+
_syncLogger = new LoggerConfiguration()
26+
.WriteTo.Sink(new SilentSink())
27+
.CreateLogger();
28+
29+
_asyncLogger = new LoggerConfiguration()
30+
.WriteTo.Async(a => a.Sink(new SilentSink()))
31+
.CreateLogger();
32+
33+
_async2Logger = new LoggerConfiguration()
34+
.WriteTo.Async2(a => a.Sink(new SilentSink()))
35+
.CreateLogger();
36+
}
37+
38+
[Benchmark(Baseline = true)]
39+
public void Sync()
40+
{
41+
for (var i = 0; i < Count; ++i)
42+
{
43+
_syncLogger.Write(_evt);
44+
}
45+
}
46+
47+
[Benchmark]
48+
public void Async()
49+
{
50+
for (var i = 0; i < Count; ++i)
51+
{
52+
_asyncLogger.Write(_evt);
53+
}
54+
}
55+
56+
[Benchmark]
57+
public void Async2()
58+
{
59+
for (var i = 0; i < Count; ++i)
60+
{
61+
_async2Logger.Write(_evt);
62+
}
63+
}
64+
}
65+
}

src/Serilog.Sinks.Async.PerformanceTests/Serilog.Sinks.Async.PerformanceTests.csproj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@
8585
<Compile Include="Benchmarks.cs" />
8686
<Compile Include="Properties\AssemblyInfo.cs" />
8787
<Compile Include="SignallingSink.cs" />
88+
<Compile Include="LatencyBenchmark.cs" />
89+
<Compile Include="SilentSink.cs" />
8890
<Compile Include="ThroughputBenchmark.cs" />
8991
</ItemGroup>
9092
<ItemGroup>
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using Serilog.Core;
2+
using Serilog.Events;
3+
4+
namespace Serilog.Sinks.Async.PerformanceTests
5+
{
6+
public class SilentSink : ILogEventSink
7+
{
8+
public void Emit(LogEvent logEvent)
9+
{
10+
}
11+
}
12+
}

src/Serilog.Sinks.Async.PerformanceTests/ThroughputBenchmark.cs

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using BenchmarkDotNet.Attributes;
3+
using Serilog.Core;
34
using Serilog.Events;
45
using Serilog.Parsing;
56

@@ -13,11 +14,21 @@ public class ThroughputBenchmark
1314
new MessageTemplate(new[] {new TextToken("Hello")}), new LogEventProperty[0]);
1415

1516
private readonly SignallingSink _signal;
16-
private readonly ILogger _syncLogger, _asyncLogger;
17+
private Logger _syncLogger, _asyncLogger, _async2Logger;
1718

1819
public ThroughputBenchmark()
1920
{
2021
_signal = new SignallingSink(Count);
22+
}
23+
24+
[Setup]
25+
public void Reset()
26+
{
27+
_syncLogger?.Dispose();
28+
_asyncLogger?.Dispose();
29+
_async2Logger?.Dispose();
30+
31+
_signal.Reset();
2132

2233
_syncLogger = new LoggerConfiguration()
2334
.WriteTo.Sink(_signal)
@@ -26,12 +37,10 @@ public ThroughputBenchmark()
2637
_asyncLogger = new LoggerConfiguration()
2738
.WriteTo.Async(a => a.Sink(_signal))
2839
.CreateLogger();
29-
}
3040

31-
[Setup]
32-
public void Reset()
33-
{
34-
_signal.Reset();
41+
_async2Logger = new LoggerConfiguration()
42+
.WriteTo.Async2(a => a.Sink(_signal))
43+
.CreateLogger();
3544
}
3645

3746
[Benchmark(Baseline = true)]
@@ -56,5 +65,16 @@ public void Async()
5665

5766
_signal.Wait();
5867
}
68+
69+
[Benchmark]
70+
public void Async2()
71+
{
72+
for (var i = 0; i < Count; ++i)
73+
{
74+
_async2Logger.Write(_evt);
75+
}
76+
77+
_signal.Wait();
78+
}
5979
}
6080
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
using System;
2+
using System.Threading;
3+
using Microsoft.VisualStudio.TestTools.UnitTesting;
4+
using Serilog.Core;
5+
using Serilog.Events;
6+
using Serilog.Parsing;
7+
8+
namespace Serilog.Sinks.Async.UnitTests
9+
{
10+
[TestClass]
11+
public class AsyncWorkerSinkTests
12+
{
13+
[TestMethod]
14+
public void EventsArePassedToInnerSink()
15+
{
16+
var collector = new CollectingSink();
17+
18+
using (var log = new LoggerConfiguration()
19+
.WriteTo.Async2(w => w.Sink(collector))
20+
.CreateLogger())
21+
{
22+
log.Information("Hello, async world!");
23+
log.Information("Hello again!");
24+
}
25+
26+
Assert.AreEqual(2, collector.Events.Count);
27+
}
28+
29+
[TestMethod]
30+
public void DisposeCompletesWithoutWorkPerformed()
31+
{
32+
var collector = new CollectingSink();
33+
34+
using (new LoggerConfiguration()
35+
.WriteTo.Async2(w => w.Sink(collector))
36+
.CreateLogger())
37+
{
38+
}
39+
40+
Assert.AreEqual(0, collector.Events.Count);
41+
}
42+
}
43+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
using System.Collections.Concurrent;
2+
using Serilog.Core;
3+
using Serilog.Events;
4+
5+
namespace Serilog.Sinks.Async.UnitTests
6+
{
7+
public class CollectingSink : ILogEventSink
8+
{
9+
public ConcurrentBag<LogEvent> Events { get; } = new ConcurrentBag<LogEvent>();
10+
11+
public void Emit(LogEvent logEvent)
12+
{
13+
Events.Add(logEvent);
14+
}
15+
}
16+
}

src/Serilog.Sinks.Async.UnitTests/Serilog.Sinks.Async.UnitTests.csproj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,10 @@
7272
<Link>Properties\GlobalAssemblyInfo.cs</Link>
7373
</Compile>
7474
<Compile Include="AssertExtensions.cs" />
75+
<Compile Include="AsyncWorkerSinkTests.cs" />
7576
<Compile Include="BufferedQueueSinkSpec.cs" />
7677
<Compile Include="BufferedQueueSpec.cs" />
78+
<Compile Include="CollectingSink.cs" />
7779
<Compile Include="Properties\AssemblyInfo.cs" />
7880
</ItemGroup>
7981
<ItemGroup>
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.Threading;
4+
using Serilog.Core;
5+
using Serilog.Debugging;
6+
using Serilog.Events;
7+
8+
namespace Serilog.Sinks.Async
9+
{
10+
sealed class AsyncWorkerSink : ILogEventSink, IDisposable
11+
{
12+
readonly Logger _pipeline;
13+
readonly int _bufferCapacity;
14+
volatile bool _disposed;
15+
readonly CancellationTokenSource _cancel = new CancellationTokenSource();
16+
readonly BlockingCollection<LogEvent> _queue;
17+
readonly Thread _worker;
18+
19+
public AsyncWorkerSink(Logger pipeline, int bufferCapacity)
20+
{
21+
if (pipeline == null) throw new ArgumentNullException(nameof(pipeline));
22+
if (bufferCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(bufferCapacity));
23+
_pipeline = pipeline;
24+
_bufferCapacity = bufferCapacity;
25+
_queue = new BlockingCollection<LogEvent>(_bufferCapacity);
26+
_worker = new Thread(Pump) { IsBackground = true };
27+
_worker.Start();
28+
}
29+
30+
public void Emit(LogEvent logEvent)
31+
{
32+
// The disposed check is racy, but only ensures we don't prevent flush from
33+
// completing by pushing more events.
34+
if (!_disposed && !_queue.TryAdd(logEvent))
35+
SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(AsyncWorkerSink), _bufferCapacity);
36+
}
37+
38+
public void Dispose()
39+
{
40+
_disposed = true;
41+
_cancel.Cancel();
42+
_worker.Join();
43+
_pipeline.Dispose();
44+
// _cancel not disposed, because it will make _cancel.Cancel() non-idempotent
45+
}
46+
47+
void Pump()
48+
{
49+
try
50+
{
51+
try
52+
{
53+
while (true)
54+
{
55+
var next = _queue.Take(_cancel.Token);
56+
_pipeline.Write(next);
57+
}
58+
}
59+
catch (OperationCanceledException)
60+
{
61+
LogEvent next;
62+
while (_queue.TryTake(out next))
63+
_pipeline.Write(next);
64+
}
65+
}
66+
catch (Exception ex)
67+
{
68+
SelfLog.WriteLine("{0} fatal error in worker thread: {1}", typeof(AsyncWorkerSink), ex);
69+
}
70+
}
71+
}
72+
}

src/Serilog.Sinks.Async/LoggerConfigurationExtensions.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using Serilog.Configuration;
3+
using Serilog.Events;
34

45
namespace Serilog.Sinks.Async
56
{
@@ -16,5 +17,18 @@ public static LoggerConfiguration Async(this LoggerSinkConfiguration configurati
1617

1718
return configuration.Sink(wrapper);
1819
}
20+
21+
public static LoggerConfiguration Async2(this LoggerSinkConfiguration configuration,
22+
Action<LoggerSinkConfiguration> sinkConfiguration, int bufferSize = 10000)
23+
{
24+
var sublogger = new LoggerConfiguration();
25+
sublogger.MinimumLevel.Is(LevelAlias.Minimum);
26+
27+
sinkConfiguration(sublogger.WriteTo);
28+
29+
var wrapper = new AsyncWorkerSink(sublogger.CreateLogger(), bufferSize);
30+
31+
return configuration.Sink(wrapper);
32+
}
1933
}
2034
}

0 commit comments

Comments
 (0)