Skip to content

Commit 17e0779

Browse files
committed
Complete IAsyncLogEventSinkMonitor impl
1 parent 6546958 commit 17e0779

File tree

8 files changed

+96
-86
lines changed

8 files changed

+96
-86
lines changed

README.md

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ Log.Logger = new LoggerConfiguration()
1919
.WriteTo.Async(a => a.File("logs/myapp.log"))
2020
// Other logger configuration
2121
.CreateLogger()
22-
22+
2323
Log.Information("This will be written to disk on the worker thread");
2424

25-
// At application shutdown
25+
// At application shutdown (results in monitors getting StopMonitoring calls)
2626
Log.CloseAndFlush();
2727
```
2828

@@ -32,32 +32,40 @@ Because the memory buffer may contain events that have not yet been written to t
3232

3333
### Buffering & Dropping
3434

35-
The default memory buffer feeding the worker thread is capped to 10,000 items, after which arriving events will be dropped. To increase or decrease this limit, specify it when configuring the async sink. One can determine whether events have been dropped via `Serilog.Async.IAsyncLogEventSinkState.DroppedMessagesCount` (see Sink State Inspection interface below).
35+
The default memory buffer feeding the worker thread is capped to 10,000 items, after which arriving events will be dropped. To increase or decrease this limit, specify it when configuring the async sink. One can determine whether events have been dropped via `Serilog.Async.IAsyncLogEventSinkInspector.DroppedMessagesCount` (see Sink State Inspection interface below).
3636

3737
```csharp
38-
// Reduce the buffer to 500 events
39-
.WriteTo.Async(a => a.File("logs/myapp.log"), bufferSize: 500)
38+
// Reduce the buffer to 500 events
39+
.WriteTo.Async(a => a.File("logs/myapp.log"), bufferSize: 500)
4040
```
4141

42-
### Health Monitoring via the Sink State Inspection interface
42+
### Health Monitoring via the Monitor and Inspector interfaces
4343

44-
The `Async` wrapper is primarily intended to allow one to achieve minimal logging latency at all times, even when writing to sinks that may momentarily block during the course of their processing (e.g., a `File` Sink might block for a low number of ms while flushing). The dropping behavior is an important failsafe; it avoids having an unbounded buffering behaviour should logging frequency overwhelm the sink, or the sink ingestion throughput degrade to a major degree.
44+
The `Async` wrapper is primarily intended to allow one to achieve minimal logging latency at all times, even when writing to sinks that may momentarily block during the course of their processing (e.g., a `File` Sink might block for a low number of ms while flushing). The dropping behavior is an important failsafe; it avoids having an unbounded buffering behaviour should logging throughput overwhelm the sink, or the sink ingestion throughput degrade.
4545

46-
In practice, this configuration (assuming one provisions an adequate `bufferSize`) achieves an efficient and resilient logging configuration that can handle load safely. The key risk is of course that events get be dropped when the buffer threshold gets breached. The `inspector` allows one to arrange for your Application's health monitoring mechanism to actively validate that the buffer allocation is not being exceeded in practice.
46+
In practice, this configuration (assuming one provisions an adequate `bufferSize`) achieves an efficient and resilient logging configuration that can safely handle load without impacting processing throughput. The risk is of course that events get be dropped if the buffer threshold gets breached. The inspection interface, `IAsyncLogEventSinkInspector` (obtained by providing an `IAsyncLogEventSinkMonitor` when configuring the `Async` Sink), enables a health monitoring mechanism to actively validate that the buffer allocation is not being exceeded in practice.
4747

4848
```csharp
49-
// Example check: log message to an out of band alarm channel if logging is showing signs of getting overwhelmed
50-
void PeriodicMonitorCheck(IQueueState inspector)
51-
{
52-
var usagePct = inspector.Count * 100 / inspector.BoundedCapacity;
53-
if (usagePct > 50) SelfLog.WriteLine("Log buffer exceeded {0:p0} usage (limit: {1})", usagePct, inspector.BoundedCapacity);
54-
}
55-
56-
// Allow a backlog of up to 10,000 items to be maintained (dropping extras if full)
57-
.WriteTo.Async(a => a.File("logs/myapp.log"), inspector: out Async.IAsyncLogEventSinkState inspector) ...
58-
59-
// Wire the inspector through to application health monitoring and/or metrics in order to periodically emit a metric, raise an alarm, etc.
60-
... HealthMonitor.RegisterAsyncLogSink(inspector);
49+
// Example check: log message to an out of band alarm channel if logging is showing signs of getting overwhelmed
50+
void ExecuteAsyncBufferCheck(IAsyncLogEventSinkInspector inspector)
51+
{
52+
var usagePct = inspector.Count * 100 / inspector.BoundedCapacity;
53+
if (usagePct > 50) SelfLog.WriteLine("Log buffer exceeded {0:p0} usage (limit: {1})", usagePct, inspector.BoundedCapacity);
54+
}
55+
56+
class MonitorConfiguration : IAsyncLogEventSinkMonitor
57+
{
58+
public void StartMonitoring(IAsyncLogEventSinkInspector inspector) =>
59+
HealthMonitor.AddPeriodicCheck(() => ExecuteAsyncBufferCheck(inspector));
60+
61+
public void StopMonitoring(IAsyncLogEventSinkInspector inspector)
62+
{ /* reverse of StartMonitoring */ }
63+
}
64+
65+
// Provide monitor so we can wire the health check to the inspector
66+
var monitor = new MonitorConfiguration();
67+
// Use default config (drop events if >10,000 backlog)
68+
.WriteTo.Async(a => a.File("logs/myapp.log"), monitor: monitor) ...
6169
```
6270

6371
### Blocking
@@ -67,9 +75,9 @@ Warning: For the same reason one typically does not want exceptions from logging
6775
When the buffer size limit is reached, the default behavior is to drop any further attempted writes until the queue abates, reporting each such failure to the `Serilog.Debugging.SelfLog`. To replace this with a blocking behaviour, set `blockWhenFull` to `true`.
6876

6977
```csharp
70-
// Wait for any queued event to be accepted by the `File` log before allowing the calling thread
71-
// to resume its application work after a logging call when there are 10,000 LogEvents waiting
72-
.WriteTo.Async(a => a.File("logs/myapp.log"), blockWhenFull: true)
78+
// Wait for any queued event to be accepted by the `File` log before allowing the calling thread to resume its
79+
// application work after a logging call when there are 10,000 LogEvents awaiting ingestion by the pipeline
80+
.WriteTo.Async(a => a.File("logs/myapp.log"), blockWhenFull: true)
7381
```
7482

7583
### XML `<appSettings>` and JSON configuration

src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs

Lines changed: 3 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -59,59 +59,19 @@ public static LoggerConfiguration Async(
5959
/// <paramref name="blockWhenFull"/> the queue will block or subsequent events will be dropped until
6060
/// room is made in the queue.</param>
6161
/// <param name="blockWhenFull">Block when the queue is full, instead of dropping events.</param>
62-
/// <param name="monitor">Monitor to supply buffer information to. If the monitor implements <see cref="IDisposable"/>, <c>Dispose()</c> will be called to advise of the Sink being <c>Dispose()</c>d.</param>
62+
/// <param name="monitor">Monitor to supply buffer information to.</param>
6363
/// <returns>A <see cref="LoggerConfiguration"/> allowing configuration to continue.</returns>
6464
public static LoggerConfiguration Async(
6565
this LoggerSinkConfiguration loggerSinkConfiguration,
6666
Action<LoggerSinkConfiguration> configure,
6767
IAsyncLogEventSinkMonitor monitor,
68-
int bufferSize,
69-
bool blockWhenFull)
70-
{
71-
return LoggerSinkConfiguration.Wrap(
72-
loggerSinkConfiguration,
73-
wrappedSink =>
74-
{
75-
var sink = new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull, monitor);
76-
monitor?.MonitorState(sink);
77-
return sink;
78-
},
79-
configure);
80-
}
81-
82-
/// <summary>
83-
/// Configure a sink to be invoked asynchronously, on a background worker thread.
84-
/// Provides an <paramref name="inspector"/> that can be used to check the live state of the buffer for health monitoring purposes.
85-
/// </summary>
86-
/// <param name="loggerSinkConfiguration">The <see cref="LoggerSinkConfiguration"/> being configured.</param>
87-
/// <param name="configure">An action that configures the wrapped sink.</param>
88-
/// <param name="bufferSize">The size of the concurrent queue used to feed the background worker thread. If
89-
/// the thread is unable to process events quickly enough and the queue is filled, depending on
90-
/// <paramref name="blockWhenFull"/> the queue will block or subsequent events will be dropped until
91-
/// room is made in the queue.</param>
92-
/// <param name="blockWhenFull">Block when the queue is full, instead of dropping events.</param>
93-
/// <param name="inspector">Provides a way to inspect the state of the queue for health monitoring purposes.</param>
94-
/// <returns>A <see cref="LoggerConfiguration"/> allowing configuration to continue.</returns>
95-
public static LoggerConfiguration Async(
96-
this LoggerSinkConfiguration loggerSinkConfiguration,
97-
Action<LoggerSinkConfiguration> configure,
98-
out IAsyncLogEventSinkState inspector,
9968
int bufferSize = 10000,
10069
bool blockWhenFull = false)
10170
{
102-
// Cannot assign directly to the out param from within the lambda, so we need a temp
103-
IAsyncLogEventSinkState stateLens = null;
104-
var result = LoggerSinkConfiguration.Wrap(
71+
return LoggerSinkConfiguration.Wrap(
10572
loggerSinkConfiguration,
106-
wrappedSink =>
107-
{
108-
var sink = new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull, null);
109-
stateLens = sink;
110-
return sink;
111-
},
73+
wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull, monitor),
11274
configure);
113-
inspector = stateLens;
114-
return result;
11575
}
11676
}
11777
}

src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
namespace Serilog.Sinks.Async
1010
{
11-
sealed class BackgroundWorkerSink : ILogEventSink, IAsyncLogEventSinkState, IDisposable
11+
sealed class BackgroundWorkerSink : ILogEventSink, IAsyncLogEventSinkInspector, IDisposable
1212
{
1313
readonly ILogEventSink _pipeline;
1414
readonly bool _blockWhenFull;
@@ -26,6 +26,7 @@ public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity, bool blo
2626
_queue = new BlockingCollection<LogEvent>(bufferCapacity);
2727
_worker = Task.Factory.StartNew(Pump, CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
2828
_monitor = monitor;
29+
monitor?.StartMonitoring(this);
2930
}
3031

3132
public void Emit(LogEvent logEvent)
@@ -65,7 +66,7 @@ public void Dispose()
6566

6667
(_pipeline as IDisposable)?.Dispose();
6768

68-
(_monitor as IDisposable)?.Dispose();
69+
_monitor?.StopMonitoring(this);
6970
}
7071

7172
void Pump()
@@ -83,10 +84,10 @@ void Pump()
8384
}
8485
}
8586

86-
int IAsyncLogEventSinkState.BufferSize => _queue.BoundedCapacity;
87+
int IAsyncLogEventSinkInspector.BufferSize => _queue.BoundedCapacity;
8788

88-
int IAsyncLogEventSinkState.Count => _queue.Count;
89+
int IAsyncLogEventSinkInspector.Count => _queue.Count;
8990

90-
long IAsyncLogEventSinkState.DroppedMessagesCount => _droppedMessages;
91+
long IAsyncLogEventSinkInspector.DroppedMessagesCount => _droppedMessages;
9192
}
92-
}
93+
}

src/Serilog.Sinks.Async/Sinks/Async/IAsyncLogEventSinkState.cs renamed to src/Serilog.Sinks.Async/Sinks/Async/IAsyncLogEventSinkInspector.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
namespace Serilog.Sinks.Async
22
{
33
/// <summary>
4-
/// Provides a way to monitor the state of Async wrapper's ingestion queue.
4+
/// Provides a way to inspect the state of Async wrapper's ingestion queue.
55
/// </summary>
6-
public interface IAsyncLogEventSinkState
6+
public interface IAsyncLogEventSinkInspector
77
{
88
/// <summary>
99
/// Configured maximum number of items permitted to be held in the buffer awaiting ingestion.
@@ -18,7 +18,7 @@ public interface IAsyncLogEventSinkState
1818
int Count { get; }
1919

2020
/// <summary>
21-
/// Accumulated number of messages dropped due to breach of <see cref="BufferSize"/> limit.
21+
/// Accumulated number of messages dropped due to breaches of <see cref="BufferSize"/> limit.
2222
/// </summary>
2323
long DroppedMessagesCount { get; }
2424
}
Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
namespace Serilog.Sinks.Async
22
{
33
/// <summary>
4-
/// Defines a mechanism for the Async Sink to provide buffer metadata to facilitate integration into system health checking.
4+
/// Defines a mechanism for the Async Sink to afford Health Checks a buffer metadata inspection mechanism.
55
/// </summary>
6-
/// <remarks>If the instance implements <see cref="System.IDisposable"/>, it will be <c>Dispose()</c>d at then time the Sink is.</remarks>
76
public interface IAsyncLogEventSinkMonitor
87
{
98
/// <summary>
10-
/// Invoked by Sink to supply the buffer state hook to the monitor.
9+
/// Invoked by Sink to supply the inspector to the monitor.
1110
/// </summary>
12-
/// <param name="state">The Async Sink's state information interface.</param>
13-
void MonitorState(IAsyncLogEventSinkState state);
11+
/// <param name="inspector">The Async Sink's inspector.</param>
12+
void StartMonitoring(IAsyncLogEventSinkInspector inspector);
13+
14+
/// <summary>
15+
/// Invoked by Sink to indicate that it is being Disposed.
16+
/// </summary>
17+
/// <param name="inspector">The Async Sink's inspector.</param>
18+
void StopMonitoring(IAsyncLogEventSinkInspector inspector);
1419
}
1520
}

test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections.Generic;
33
using System.Diagnostics;
44
using System.Linq;
5+
using System.Threading;
56
using System.Threading.Tasks;
67
using Serilog.Core;
78
using Serilog.Events;
@@ -106,7 +107,7 @@ public async Task GivenDefaultConfig_WhenRequestsExceedCapacity_DoesNotBlock()
106107

107108
// Allow at least one to propagate
108109
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
109-
Assert.NotEqual(0, ((IAsyncLogEventSinkState)sink).DroppedMessagesCount);
110+
Assert.NotEqual(0, ((IAsyncLogEventSinkInspector)sink).DroppedMessagesCount);
110111
}
111112
// Sanity check the overall timing
112113
batchTiming.Stop();
@@ -145,7 +146,7 @@ from e in _innerSink.Events
145146
Assert.InRange(2, 2 * 3 / 2 - 1, propagatedExcludingFinal.Count());
146147
// Final event should have made it through
147148
Assert.Contains(_innerSink.Events, x => Object.ReferenceEquals(finalEvent, x));
148-
Assert.NotEqual(0, ((IAsyncLogEventSinkState)sink).DroppedMessagesCount);
149+
Assert.NotEqual(0, ((IAsyncLogEventSinkInspector)sink).DroppedMessagesCount);
149150
}
150151
}
151152

@@ -184,20 +185,23 @@ public async Task GivenConfiguredToBlock_WhenQueueFilled_ThenBlocks()
184185

185186
// No events should be dropped
186187
Assert.Equal(3, _innerSink.Events.Count);
187-
Assert.Equal(0, ((IAsyncLogEventSinkState)sink).DroppedMessagesCount);
188+
Assert.Equal(0, ((IAsyncLogEventSinkInspector)sink).DroppedMessagesCount);
188189
}
189190
}
190191

191192
[Fact]
192-
public void InspectorOutParameterAffordsHealthMonitoringHook()
193+
public void MonitorParameterAffordsSinkInspectorSuitableForHealthChecking()
193194
{
194195
var collector = new MemorySink { DelayEmit = TimeSpan.FromSeconds(2) };
195196
// 2 spaces in queue; 1 would make the second log entry eligible for dropping if consumer does not activate instantaneously
196197
var bufferSize = 2;
198+
var monitor = new DummyMonitor();
197199
using (var logger = new LoggerConfiguration()
198-
.WriteTo.Async(w => w.Sink(collector), bufferSize: 2, inspector: out IAsyncLogEventSinkState inspector)
200+
.WriteTo.Async(w => w.Sink(collector), bufferSize: 2, monitor: monitor)
199201
.CreateLogger())
200202
{
203+
// Construction of BackgroundWorkerSink triggers StartMonitoring
204+
var inspector = monitor.Inspector;
201205
Assert.Equal(bufferSize, inspector.BufferSize);
202206
Assert.Equal(0, inspector.Count);
203207
Assert.Equal(0, inspector.DroppedMessagesCount);
@@ -222,6 +226,8 @@ public void InspectorOutParameterAffordsHealthMonitoringHook()
222226
// Because messages wait 2 seconds, the only real way to get one into the buffer is with a debugger breakpoint or a sleep
223227
Assert.InRange(collector.Events.Count, 0, 3);
224228
}
229+
// Dispose should trigger a StopMonitoring call
230+
Assert.Null(monitor.Inspector);
225231
}
226232

227233
private BackgroundWorkerSink CreateSinkWithDefaultOptions()

test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkTests.cs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,21 @@ public void DisposeCompletesWithoutWorkPerformed()
3434

3535
Assert.Empty(collector.Events);
3636
}
37+
38+
[Fact]
39+
public void CtorAndDisposeInformMonitor()
40+
{
41+
var collector = new MemorySink();
42+
var monitor = new DummyMonitor();
43+
44+
using (new LoggerConfiguration()
45+
.WriteTo.Async(w => w.Sink(collector), monitor: monitor)
46+
.CreateLogger())
47+
{
48+
Assert.NotNull(monitor.Inspector);
49+
}
50+
51+
Assert.Null(monitor.Inspector);
52+
}
3753
}
38-
}
54+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
namespace Serilog.Sinks.Async.Tests.Support
2+
{
3+
class DummyMonitor : IAsyncLogEventSinkMonitor
4+
{
5+
IAsyncLogEventSinkInspector inspector;
6+
public IAsyncLogEventSinkInspector Inspector => inspector;
7+
8+
void IAsyncLogEventSinkMonitor.StartMonitoring(IAsyncLogEventSinkInspector inspector) =>
9+
this.inspector = inspector;
10+
11+
void IAsyncLogEventSinkMonitor.StopMonitoring(IAsyncLogEventSinkInspector inspector) =>
12+
System.Threading.Interlocked.CompareExchange(ref this.inspector, null, inspector);
13+
}
14+
}

0 commit comments

Comments
 (0)