Skip to content

Commit d3b5309

Browse files
committed
Surface dropped events count. Resolves #13
1 parent dcd24ac commit d3b5309

File tree

4 files changed

+30
-5
lines changed

4 files changed

+30
-5
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ The wrapped sink (`File` in this case) will be invoked on a worker thread while
3030

3131
Because the memory buffer may contain events that have not yet been written to the target sink, it is important to call `Log.CloseAndFlush()` or `Logger.Dispose()` when the application exits.
3232

33-
### Buffering
33+
### Buffering & Dropping
3434

35-
The default memory buffer feeding the worker thread is capped to 10,000 items, after which arriving events will be dropped. To increase or decrease this limit, specify it when configuring the async sink.
35+
The default memory buffer feeding the worker thread is capped to 10,000 items, after which arriving events will be dropped. To increase or decrease this limit, specify it when configuring the async sink. One can determine whether events have been dropped via `IQueueState.DroppedMessagesCount` (see Buffer Inspection interface below).
3636

3737
```csharp
3838
// Reduce the buffer to 500 events

src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@ public interface IQueueState
9898
/// <exception cref="T:System.ObjectDisposedException">The Sink has been disposed.</exception>
9999
int Count { get; }
100100

101+
/// <summary>
102+
/// Accumulated number of messages dropped due to attempted submission having breached <see cref="BufferSize"/> limit.
103+
/// </summary>
104+
long DroppedMessagesCount { get; }
105+
101106
/// <summary>
102107
/// Maximum number of items permitted to be held in the buffer awaiting ingestion.
103108
/// </summary>

src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ sealed class BackgroundWorkerSink : ILogEventSink, IQueueState, IDisposable
1515
readonly BlockingCollection<LogEvent> _queue;
1616
readonly Task _worker;
1717

18+
long _droppedMessages;
19+
1820
public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity, bool blockWhenFull)
1921
{
2022
if (bufferCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(bufferCapacity));
@@ -38,7 +40,10 @@ public void Emit(LogEvent logEvent)
3840
else
3941
{
4042
if (!_queue.TryAdd(logEvent))
43+
{
44+
Interlocked.Increment(ref _droppedMessages);
4145
SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _queue.BoundedCapacity);
46+
}
4247
}
4348
}
4449
catch (InvalidOperationException)
@@ -77,5 +82,7 @@ void Pump()
7782
int IQueueState.Count => _queue.Count;
7883

7984
int IQueueState.BufferSize => _queue.BoundedCapacity;
85+
86+
long IQueueState.DroppedMessagesCount => _droppedMessages;
8087
}
8188
}

test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ public async Task GivenDefaultConfig_WhenRequestsExceedCapacity_DoesNotBlock()
106106

107107
// Allow at least one to propagate
108108
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
109+
Assert.NotEqual(0, ((IQueueState)sink).DroppedMessagesCount);
109110
}
110111
// Sanity check the overall timing
111112
batchTiming.Stop();
@@ -144,6 +145,7 @@ from e in _innerSink.Events
144145
Assert.InRange(2, 2 * 3 / 2 - 1, propagatedExcludingFinal.Count());
145146
// Final event should have made it through
146147
Assert.Contains(_innerSink.Events, x => Object.ReferenceEquals(finalEvent, x));
148+
Assert.NotEqual(0, ((IQueueState)sink).DroppedMessagesCount);
147149
}
148150
}
149151

@@ -182,11 +184,12 @@ public async Task GivenConfiguredToBlock_WhenQueueFilled_ThenBlocks()
182184

183185
// No events should be dropped
184186
Assert.Equal(3, _innerSink.Events.Count);
187+
Assert.Equal(0, ((IQueueState)sink).DroppedMessagesCount);
185188
}
186189
}
187190

188191
[Fact]
189-
public async Task InspectorOutParameterAffordsHealthMonitoringHook()
192+
public void InspectorOutParameterAffordsHealthMonitoringHook()
190193
{
191194
var collector = new MemorySink { DelayEmit = TimeSpan.FromSeconds(2) };
192195
// 2 spaces in queue; 1 would make the second log entry eligible for dropping if consumer does not activate instantaneously
@@ -197,17 +200,27 @@ public async Task InspectorOutParameterAffordsHealthMonitoringHook()
197200
{
198201
Assert.Equal(bufferSize, inspector.BufferSize);
199202
Assert.Equal(0, inspector.Count);
203+
Assert.Equal(0, inspector.DroppedMessagesCount);
200204
logger.Information("Something to freeze the processing for 2s");
201-
await Task.Delay(TimeSpan.FromMilliseconds(200));
202205
// Can be taken from queue either instantanously or be awaiting consumer to take
203206
Assert.InRange(inspector.Count, 0, 1);
207+
Assert.Equal(0, inspector.DroppedMessagesCount);
204208
logger.Information("Something that will sit in the queue");
209+
Assert.InRange(inspector.Count, 1, 2);
210+
logger.Information("Something that will probably also sit in the queue (but could get dropped if first message has still not been picked up)");
211+
Assert.InRange(inspector.Count, 1, 2);
212+
logger.Information("Something that will get dropped unless we get preempted for 2s during our execution");
213+
const string droppedMessage = "Something that will definitely get dropped";
214+
logger.Information(droppedMessage);
215+
Assert.InRange(inspector.Count, 1, 2);
205216
// Unless we are put to sleep for a Rip Van Winkle period, either:
206217
// a) the BackgroundWorker will be emitting the item [and incurring the 2s delay we established], leaving a single item in the buffer
207218
// or b) neither will have been picked out of the buffer yet.
208-
await Task.Delay(TimeSpan.FromMilliseconds(200));
209219
Assert.InRange(inspector.Count, 1, 2);
210220
Assert.Equal(bufferSize, inspector.BufferSize);
221+
Assert.DoesNotContain(collector.Events, x => x.MessageTemplate.Text == droppedMessage);
222+
// Because messages wait 2 seconds, the only real way to get one into the buffer is with a debugger breakpoint or a sleep
223+
Assert.InRange(collector.Events.Count, 0, 3);
211224
}
212225
}
213226

0 commit comments

Comments
 (0)