Skip to content

Commit 6cee32a

Browse files
authored
Merge pull request #29 from bartelink/feature/queue-monitoring
Add queue inspector facility
2 parents f22d170 + 17e0779 commit 6cee32a

File tree

8 files changed

+210
-24
lines changed

8 files changed

+210
-24
lines changed

README.md

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,53 @@ Log.Logger = new LoggerConfiguration()
1919
.WriteTo.Async(a => a.File("logs/myapp.log"))
2020
// Other logger configuration
2121
.CreateLogger()
22-
22+
2323
Log.Information("This will be written to disk on the worker thread");
2424

25-
// At application shutdown
25+
// At application shutdown (results in monitors getting StopMonitoring calls)
2626
Log.CloseAndFlush();
2727
```
2828

2929
The wrapped sink (`File` in this case) will be invoked on a worker thread while your application's thread gets on with more important stuff.
3030

3131
Because the memory buffer may contain events that have not yet been written to the target sink, it is important to call `Log.CloseAndFlush()` or `Logger.Dispose()` when the application exits.
3232

33-
### Buffering
33+
### Buffering & Dropping
34+
35+
The default memory buffer feeding the worker thread is capped to 10,000 items, after which arriving events will be dropped. To increase or decrease this limit, specify it when configuring the async sink. One can determine whether events have been dropped via `Serilog.Async.IAsyncLogEventSinkInspector.DroppedMessagesCount` (see Sink State Inspection interface below).
36+
37+
```csharp
38+
// Reduce the buffer to 500 events
39+
.WriteTo.Async(a => a.File("logs/myapp.log"), bufferSize: 500)
40+
```
41+
42+
### Health Monitoring via the Monitor and Inspector interfaces
43+
44+
The `Async` wrapper is primarily intended to allow one to achieve minimal logging latency at all times, even when writing to sinks that may momentarily block during the course of their processing (e.g., a `File` Sink might block for a low number of ms while flushing). The dropping behavior is an important failsafe; it avoids having an unbounded buffering behaviour should logging throughput overwhelm the sink, or the sink ingestion throughput degrade.
3445

35-
The default memory buffer feeding the worker thread is capped to 10,000 items, after which arriving events will be dropped. To increase or decrease this limit, specify it when configuring the async sink.
46+
In practice, this configuration (assuming one provisions an adequate `bufferSize`) achieves an efficient and resilient logging configuration that can safely handle load without impacting processing throughput. The risk is of course that events get be dropped if the buffer threshold gets breached. The inspection interface, `IAsyncLogEventSinkInspector` (obtained by providing an `IAsyncLogEventSinkMonitor` when configuring the `Async` Sink), enables a health monitoring mechanism to actively validate that the buffer allocation is not being exceeded in practice.
3647

3748
```csharp
38-
// Reduce the buffer to 500 events
39-
.WriteTo.Async(a => a.File("logs/myapp.log"), bufferSize: 500)
49+
// Example check: log message to an out of band alarm channel if logging is showing signs of getting overwhelmed
50+
void ExecuteAsyncBufferCheck(IAsyncLogEventSinkInspector inspector)
51+
{
52+
var usagePct = inspector.Count * 100 / inspector.BoundedCapacity;
53+
if (usagePct > 50) SelfLog.WriteLine("Log buffer exceeded {0:p0} usage (limit: {1})", usagePct, inspector.BoundedCapacity);
54+
}
55+
56+
class MonitorConfiguration : IAsyncLogEventSinkMonitor
57+
{
58+
public void StartMonitoring(IAsyncLogEventSinkInspector inspector) =>
59+
HealthMonitor.AddPeriodicCheck(() => ExecuteAsyncBufferCheck(inspector));
60+
61+
public void StopMonitoring(IAsyncLogEventSinkInspector inspector)
62+
{ /* reverse of StartMonitoring */ }
63+
}
64+
65+
// Provide monitor so we can wire the health check to the inspector
66+
var monitor = new MonitorConfiguration();
67+
// Use default config (drop events if >10,000 backlog)
68+
.WriteTo.Async(a => a.File("logs/myapp.log"), monitor: monitor) ...
4069
```
4170

4271
### Blocking
@@ -46,9 +75,9 @@ Warning: For the same reason one typically does not want exceptions from logging
4675
When the buffer size limit is reached, the default behavior is to drop any further attempted writes until the queue abates, reporting each such failure to the `Serilog.Debugging.SelfLog`. To replace this with a blocking behaviour, set `blockWhenFull` to `true`.
4776

4877
```csharp
49-
// Wait for any queued event to be accepted by the `File` log before allowing the calling thread
50-
// to resume its application work after a logging call when there are 10,000 LogEvents waiting
51-
.WriteTo.Async(a => a.File("logs/myapp.log"), blockWhenFull: true)
78+
// Wait for any queued event to be accepted by the `File` log before allowing the calling thread to resume its
79+
// application work after a logging call when there are 10,000 LogEvents awaiting ingestion by the pipeline
80+
.WriteTo.Async(a => a.File("logs/myapp.log"), blockWhenFull: true)
5281
```
5382

5483
### XML `<appSettings>` and JSON configuration

src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,34 @@ public static LoggerConfiguration Async(
4444
Action<LoggerSinkConfiguration> configure,
4545
int bufferSize = 10000,
4646
bool blockWhenFull = false)
47+
{
48+
return loggerSinkConfiguration.Async(configure, null, bufferSize, blockWhenFull);
49+
}
50+
51+
/// <summary>
52+
/// Configure a sink to be invoked asynchronously, on a background worker thread.
53+
/// Accepts a reference to a <paramref name="monitor"/> that will be supplied the internal state interface for health monitoring purposes.
54+
/// </summary>
55+
/// <param name="loggerSinkConfiguration">The <see cref="LoggerSinkConfiguration"/> being configured.</param>
56+
/// <param name="configure">An action that configures the wrapped sink.</param>
57+
/// <param name="bufferSize">The size of the concurrent queue used to feed the background worker thread. If
58+
/// the thread is unable to process events quickly enough and the queue is filled, depending on
59+
/// <paramref name="blockWhenFull"/> the queue will block or subsequent events will be dropped until
60+
/// room is made in the queue.</param>
61+
/// <param name="blockWhenFull">Block when the queue is full, instead of dropping events.</param>
62+
/// <param name="monitor">Monitor to supply buffer information to.</param>
63+
/// <returns>A <see cref="LoggerConfiguration"/> allowing configuration to continue.</returns>
64+
public static LoggerConfiguration Async(
65+
this LoggerSinkConfiguration loggerSinkConfiguration,
66+
Action<LoggerSinkConfiguration> configure,
67+
IAsyncLogEventSinkMonitor monitor,
68+
int bufferSize = 10000,
69+
bool blockWhenFull = false)
4770
{
4871
return LoggerSinkConfiguration.Wrap(
4972
loggerSinkConfiguration,
50-
wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull),
73+
wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull, monitor),
5174
configure);
5275
}
53-
5476
}
55-
}
77+
}

src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,25 @@
88

99
namespace Serilog.Sinks.Async
1010
{
11-
sealed class BackgroundWorkerSink : ILogEventSink, IDisposable
11+
sealed class BackgroundWorkerSink : ILogEventSink, IAsyncLogEventSinkInspector, IDisposable
1212
{
1313
readonly ILogEventSink _pipeline;
1414
readonly bool _blockWhenFull;
1515
readonly BlockingCollection<LogEvent> _queue;
1616
readonly Task _worker;
17+
readonly IAsyncLogEventSinkMonitor _monitor;
1718

18-
public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity, bool blockWhenFull)
19+
long _droppedMessages;
20+
21+
public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity, bool blockWhenFull, IAsyncLogEventSinkMonitor monitor = null)
1922
{
20-
if (pipeline == null) throw new ArgumentNullException(nameof(pipeline));
2123
if (bufferCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(bufferCapacity));
22-
_pipeline = pipeline;
24+
_pipeline = pipeline ?? throw new ArgumentNullException(nameof(pipeline));
2325
_blockWhenFull = blockWhenFull;
2426
_queue = new BlockingCollection<LogEvent>(bufferCapacity);
2527
_worker = Task.Factory.StartNew(Pump, CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
28+
_monitor = monitor;
29+
monitor?.StartMonitoring(this);
2630
}
2731

2832
public void Emit(LogEvent logEvent)
@@ -39,7 +43,10 @@ public void Emit(LogEvent logEvent)
3943
else
4044
{
4145
if (!_queue.TryAdd(logEvent))
46+
{
47+
Interlocked.Increment(ref _droppedMessages);
4248
SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _queue.BoundedCapacity);
49+
}
4350
}
4451
}
4552
catch (InvalidOperationException)
@@ -55,9 +62,11 @@ public void Dispose()
5562
_queue.CompleteAdding();
5663

5764
// Allow queued events to be flushed
58-
_worker.Wait();
59-
65+
_worker.Wait();
66+
6067
(_pipeline as IDisposable)?.Dispose();
68+
69+
_monitor?.StopMonitoring(this);
6170
}
6271

6372
void Pump()
@@ -74,5 +83,11 @@ void Pump()
7483
SelfLog.WriteLine("{0} fatal error in worker thread: {1}", typeof(BackgroundWorkerSink), ex);
7584
}
7685
}
86+
87+
int IAsyncLogEventSinkInspector.BufferSize => _queue.BoundedCapacity;
88+
89+
int IAsyncLogEventSinkInspector.Count => _queue.Count;
90+
91+
long IAsyncLogEventSinkInspector.DroppedMessagesCount => _droppedMessages;
7792
}
78-
}
93+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
namespace Serilog.Sinks.Async
2+
{
3+
/// <summary>
4+
/// Provides a way to inspect the state of Async wrapper's ingestion queue.
5+
/// </summary>
6+
public interface IAsyncLogEventSinkInspector
7+
{
8+
/// <summary>
9+
/// Configured maximum number of items permitted to be held in the buffer awaiting ingestion.
10+
/// </summary>
11+
/// <exception cref="T:System.ObjectDisposedException">The Sink has been disposed.</exception>
12+
int BufferSize { get; }
13+
14+
/// <summary>
15+
/// Current moment-in-time Count of items currently awaiting ingestion.
16+
/// </summary>
17+
/// <exception cref="T:System.ObjectDisposedException">The Sink has been disposed.</exception>
18+
int Count { get; }
19+
20+
/// <summary>
21+
/// Accumulated number of messages dropped due to breaches of <see cref="BufferSize"/> limit.
22+
/// </summary>
23+
long DroppedMessagesCount { get; }
24+
}
25+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
namespace Serilog.Sinks.Async
2+
{
3+
/// <summary>
4+
/// Defines a mechanism for the Async Sink to afford Health Checks a buffer metadata inspection mechanism.
5+
/// </summary>
6+
public interface IAsyncLogEventSinkMonitor
7+
{
8+
/// <summary>
9+
/// Invoked by Sink to supply the inspector to the monitor.
10+
/// </summary>
11+
/// <param name="inspector">The Async Sink's inspector.</param>
12+
void StartMonitoring(IAsyncLogEventSinkInspector inspector);
13+
14+
/// <summary>
15+
/// Invoked by Sink to indicate that it is being Disposed.
16+
/// </summary>
17+
/// <param name="inspector">The Async Sink's inspector.</param>
18+
void StopMonitoring(IAsyncLogEventSinkInspector inspector);
19+
}
20+
}

test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections.Generic;
33
using System.Diagnostics;
44
using System.Linq;
5+
using System.Threading;
56
using System.Threading.Tasks;
67
using Serilog.Core;
78
using Serilog.Events;
@@ -25,7 +26,7 @@ public BackgroundWorkerSinkSpec()
2526
[Fact]
2627
public void WhenCtorWithNullSink_ThenThrows()
2728
{
28-
Assert.Throws<ArgumentNullException>(() => new BackgroundWorkerSink(null, 10000, false));
29+
Assert.Throws<ArgumentNullException>(() => new BackgroundWorkerSink(null, 10000, false, null));
2930
}
3031

3132
[Fact]
@@ -84,12 +85,12 @@ public async Task WhenEmitMultipleTimes_ThenRelaysToInnerSink()
8485
}
8586

8687
[Fact]
87-
public async Task GivenDefaultConfig_WhenQueueOverCapacity_DoesNotBlock()
88+
public async Task GivenDefaultConfig_WhenRequestsExceedCapacity_DoesNotBlock()
8889
{
8990
var batchTiming = Stopwatch.StartNew();
9091
using (var sink = new BackgroundWorkerSink(_logger, 1, blockWhenFull: false /*default*/))
9192
{
92-
// Cause a delay when emmitting to the inner sink, allowing us to easily fill the queue to capacity
93+
// Cause a delay when emitting to the inner sink, allowing us to easily fill the queue to capacity
9394
// while the first event is being propagated
9495
var acceptInterval = TimeSpan.FromMilliseconds(500);
9596
_innerSink.DelayEmit = acceptInterval;
@@ -106,6 +107,7 @@ public async Task GivenDefaultConfig_WhenQueueOverCapacity_DoesNotBlock()
106107

107108
// Allow at least one to propagate
108109
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
110+
Assert.NotEqual(0, ((IAsyncLogEventSinkInspector)sink).DroppedMessagesCount);
109111
}
110112
// Sanity check the overall timing
111113
batchTiming.Stop();
@@ -114,7 +116,7 @@ public async Task GivenDefaultConfig_WhenQueueOverCapacity_DoesNotBlock()
114116
}
115117

116118
[Fact]
117-
public async Task GivenDefaultConfig_WhenRequestsOverCapacity_ThenDropsEventsAndRecovers()
119+
public async Task GivenDefaultConfig_WhenRequestsExceedCapacity_ThenDropsEventsAndRecovers()
118120
{
119121
using (var sink = new BackgroundWorkerSink(_logger, 1, blockWhenFull: false /*default*/))
120122
{
@@ -144,6 +146,7 @@ from e in _innerSink.Events
144146
Assert.InRange(2, 2 * 3 / 2 - 1, propagatedExcludingFinal.Count());
145147
// Final event should have made it through
146148
Assert.Contains(_innerSink.Events, x => Object.ReferenceEquals(finalEvent, x));
149+
Assert.NotEqual(0, ((IAsyncLogEventSinkInspector)sink).DroppedMessagesCount);
147150
}
148151
}
149152

@@ -182,9 +185,51 @@ public async Task GivenConfiguredToBlock_WhenQueueFilled_ThenBlocks()
182185

183186
// No events should be dropped
184187
Assert.Equal(3, _innerSink.Events.Count);
188+
Assert.Equal(0, ((IAsyncLogEventSinkInspector)sink).DroppedMessagesCount);
185189
}
186190
}
187191

192+
[Fact]
193+
public void MonitorParameterAffordsSinkInspectorSuitableForHealthChecking()
194+
{
195+
var collector = new MemorySink { DelayEmit = TimeSpan.FromSeconds(2) };
196+
// 2 spaces in queue; 1 would make the second log entry eligible for dropping if consumer does not activate instantaneously
197+
var bufferSize = 2;
198+
var monitor = new DummyMonitor();
199+
using (var logger = new LoggerConfiguration()
200+
.WriteTo.Async(w => w.Sink(collector), bufferSize: 2, monitor: monitor)
201+
.CreateLogger())
202+
{
203+
// Construction of BackgroundWorkerSink triggers StartMonitoring
204+
var inspector = monitor.Inspector;
205+
Assert.Equal(bufferSize, inspector.BufferSize);
206+
Assert.Equal(0, inspector.Count);
207+
Assert.Equal(0, inspector.DroppedMessagesCount);
208+
logger.Information("Something to freeze the processing for 2s");
209+
// Can be taken from queue either instantanously or be awaiting consumer to take
210+
Assert.InRange(inspector.Count, 0, 1);
211+
Assert.Equal(0, inspector.DroppedMessagesCount);
212+
logger.Information("Something that will sit in the queue");
213+
Assert.InRange(inspector.Count, 1, 2);
214+
logger.Information("Something that will probably also sit in the queue (but could get dropped if first message has still not been picked up)");
215+
Assert.InRange(inspector.Count, 1, 2);
216+
logger.Information("Something that will get dropped unless we get preempted for 2s during our execution");
217+
const string droppedMessage = "Something that will definitely get dropped";
218+
logger.Information(droppedMessage);
219+
Assert.InRange(inspector.Count, 1, 2);
220+
// Unless we are put to sleep for a Rip Van Winkle period, either:
221+
// a) the BackgroundWorker will be emitting the item [and incurring the 2s delay we established], leaving a single item in the buffer
222+
// or b) neither will have been picked out of the buffer yet.
223+
Assert.InRange(inspector.Count, 1, 2);
224+
Assert.Equal(bufferSize, inspector.BufferSize);
225+
Assert.DoesNotContain(collector.Events, x => x.MessageTemplate.Text == droppedMessage);
226+
// Because messages wait 2 seconds, the only real way to get one into the buffer is with a debugger breakpoint or a sleep
227+
Assert.InRange(collector.Events.Count, 0, 3);
228+
}
229+
// Dispose should trigger a StopMonitoring call
230+
Assert.Null(monitor.Inspector);
231+
}
232+
188233
private BackgroundWorkerSink CreateSinkWithDefaultOptions()
189234
{
190235
return new BackgroundWorkerSink(_logger, 10000, false);

test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkTests.cs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,21 @@ public void DisposeCompletesWithoutWorkPerformed()
3434

3535
Assert.Empty(collector.Events);
3636
}
37+
38+
[Fact]
39+
public void CtorAndDisposeInformMonitor()
40+
{
41+
var collector = new MemorySink();
42+
var monitor = new DummyMonitor();
43+
44+
using (new LoggerConfiguration()
45+
.WriteTo.Async(w => w.Sink(collector), monitor: monitor)
46+
.CreateLogger())
47+
{
48+
Assert.NotNull(monitor.Inspector);
49+
}
50+
51+
Assert.Null(monitor.Inspector);
52+
}
3753
}
38-
}
54+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
namespace Serilog.Sinks.Async.Tests.Support
2+
{
3+
class DummyMonitor : IAsyncLogEventSinkMonitor
4+
{
5+
IAsyncLogEventSinkInspector inspector;
6+
public IAsyncLogEventSinkInspector Inspector => inspector;
7+
8+
void IAsyncLogEventSinkMonitor.StartMonitoring(IAsyncLogEventSinkInspector inspector) =>
9+
this.inspector = inspector;
10+
11+
void IAsyncLogEventSinkMonitor.StopMonitoring(IAsyncLogEventSinkInspector inspector) =>
12+
System.Threading.Interlocked.CompareExchange(ref this.inspector, null, inspector);
13+
}
14+
}

0 commit comments

Comments
 (0)