Skip to content

Commit 2ee0f26

Browse files
committed
Add option to block when the queue is full, instead of dropping events
Also, use `GetConsumingEnumerable` to enumerate the queue, instead of a `while` loop
1 parent 1a6689f commit 2ee0f26

File tree

5 files changed

+110
-33
lines changed

5 files changed

+110
-33
lines changed

src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,20 @@ public static class LoggerConfigurationAsyncExtensions
1616
/// <param name="loggerSinkConfiguration">The <see cref="LoggerSinkConfiguration"/> being configured.</param>
1717
/// <param name="configure">An action that configures the wrapped sink.</param>
1818
/// <param name="bufferSize">The size of the concurrent queue used to feed the background worker thread. If
19-
/// the thread is unable to process events quickly enough and the queue is filled, subsequent events will be
20-
/// dropped until room is made in the queue.</param>
19+
/// the thread is unable to process events quickly enough and the queue is filled, depending on
20+
/// <paramref name="blockWhenFull"/> the queue will block or subsequent events will be dropped until
21+
/// room is made in the queue.</param>
22+
/// <param name="blockWhenFull">Block when the queue is full, instead of dropping events.</param>
2123
/// <returns>A <see cref="LoggerConfiguration"/> allowing configuration to continue.</returns>
2224
public static LoggerConfiguration Async(
2325
this LoggerSinkConfiguration loggerSinkConfiguration,
2426
Action<LoggerSinkConfiguration> configure,
25-
int bufferSize = 10000)
27+
int bufferSize = 10000,
28+
bool blockWhenFull = false)
2629
{
2730
return LoggerSinkConfiguration.Wrap(
2831
loggerSinkConfiguration,
29-
wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize),
32+
wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull),
3033
configure);
3134
}
3235
}

src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,18 @@ sealed class BackgroundWorkerSink : ILogEventSink, IDisposable
1212
{
1313
readonly ILogEventSink _pipeline;
1414
readonly int _bufferCapacity;
15+
readonly bool _blockWhenFull;
1516
volatile bool _disposed;
16-
readonly CancellationTokenSource _cancel = new CancellationTokenSource();
1717
readonly BlockingCollection<LogEvent> _queue;
1818
readonly Task _worker;
1919

20-
public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity)
20+
public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity, bool blockWhenFull)
2121
{
2222
if (pipeline == null) throw new ArgumentNullException(nameof(pipeline));
2323
if (bufferCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(bufferCapacity));
2424
_pipeline = pipeline;
2525
_bufferCapacity = bufferCapacity;
26+
_blockWhenFull = blockWhenFull;
2627
_queue = new BlockingCollection<LogEvent>(_bufferCapacity);
2728
_worker = Task.Factory.StartNew(Pump, CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
2829
}
@@ -31,36 +32,35 @@ public void Emit(LogEvent logEvent)
3132
{
3233
// The disposed check is racy, but only ensures we don't prevent flush from
3334
// completing by pushing more events.
34-
if (!_disposed && !_queue.TryAdd(logEvent))
35-
SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _bufferCapacity);
35+
if (_disposed)
36+
return;
37+
38+
if (!this._blockWhenFull)
39+
{
40+
if (!_queue.TryAdd(logEvent))
41+
SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _bufferCapacity);
42+
}
43+
else
44+
{
45+
this._queue.Add(logEvent);
46+
}
3647
}
3748

3849
public void Dispose()
3950
{
4051
_disposed = true;
41-
_cancel.Cancel();
52+
_queue.CompleteAdding();
4253
_worker.Wait();
4354
(_pipeline as IDisposable)?.Dispose();
44-
// _cancel not disposed, because it will make _cancel.Cancel() non-idempotent
4555
}
4656

4757
void Pump()
4858
{
4959
try
5060
{
51-
try
52-
{
53-
while (true)
54-
{
55-
var next = _queue.Take(_cancel.Token);
56-
_pipeline.Emit(next);
57-
}
58-
}
59-
catch (OperationCanceledException)
61+
foreach (var next in _queue.GetConsumingEnumerable())
6062
{
61-
LogEvent next;
62-
while (_queue.TryTake(out next))
63-
_pipeline.Emit(next);
63+
_pipeline.Emit(next);
6464
}
6565
}
6666
catch (Exception ex)

test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,28 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Diagnostics;
34
using System.Linq;
45
using System.Threading.Tasks;
56
using Serilog.Core;
7+
using Serilog.Debugging;
68
using Serilog.Events;
79
using Serilog.Parsing;
8-
using Serilog.Sinks.Async.Tests;
910
using Serilog.Sinks.Async.Tests.Support;
1011
using Xunit;
1112

1213
namespace Serilog.Sinks.Async.Tests
1314
{
1415
public class BackgroundWorkerSinkSpec : IDisposable
1516
{
17+
readonly Logger _logger;
1618
readonly MemorySink _innerSink;
17-
readonly BackgroundWorkerSink _sink;
19+
BackgroundWorkerSink _sink;
1820

1921
public BackgroundWorkerSinkSpec()
2022
{
2123
_innerSink = new MemorySink();
22-
var logger = new LoggerConfiguration().WriteTo.Sink(_innerSink).CreateLogger();
23-
_sink = new BackgroundWorkerSink(logger, 10000);
24+
_logger = new LoggerConfiguration().WriteTo.Sink(_innerSink).CreateLogger();
25+
_sink = new BackgroundWorkerSink(_logger, 10000, false);
2426
}
2527

2628
public void Dispose()
@@ -31,14 +33,15 @@ public void Dispose()
3133
[Fact]
3234
public void WhenCtorWithNullSink_ThenThrows()
3335
{
34-
Assert.Throws<ArgumentNullException>(() => new BackgroundWorkerSink(null, 10000));
36+
Assert.Throws<ArgumentNullException>(() => new BackgroundWorkerSink(null, 10000, false));
3537
}
3638

3739
[Fact]
3840
public async Task WhenEmitSingle_ThenRelaysToInnerSink()
3941
{
4042
var logEvent = CreateEvent();
4143
_sink.Emit(logEvent);
44+
_sink.Dispose();
4245

4346
await Task.Delay(TimeSpan.FromSeconds(3));
4447

@@ -80,6 +83,77 @@ public async Task WhenEmitMultipleTimes_ThenRelaysToInnerSink()
8083
Assert.Equal(3, _innerSink.Events.Count);
8184
}
8285

86+
[Fact]
87+
public async Task WhenQueueFull_ThenDropsEvents()
88+
{
89+
_sink = new BackgroundWorkerSink(_logger, 1, false);
90+
91+
// Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity
92+
// after the first event is popped
93+
_innerSink.DelayEmit = true;
94+
95+
var events = new List<LogEvent>
96+
{
97+
CreateEvent(),
98+
CreateEvent(),
99+
CreateEvent(),
100+
CreateEvent(),
101+
CreateEvent()
102+
};
103+
events.ForEach(e =>
104+
{
105+
var sw = Stopwatch.StartNew();
106+
_sink.Emit(e);
107+
sw.Stop();
108+
109+
Assert.True(sw.ElapsedMilliseconds < 2000, "Should not block the caller when the queue is full");
110+
});
111+
112+
// If we *weren't* dropped events, the delay in the inner sink would mean the 5 events would take
113+
// at least 15 seconds to process
114+
await Task.Delay(TimeSpan.FromSeconds(18));
115+
116+
// Events should be dropped
117+
Assert.Equal(2, _innerSink.Events.Count);
118+
}
119+
120+
[Fact]
121+
public async Task WhenQueueFull_ThenBlocks()
122+
{
123+
_sink = new BackgroundWorkerSink(_logger, 1, true);
124+
125+
// Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity
126+
// after the first event is popped
127+
_innerSink.DelayEmit = true;
128+
129+
var events = new List<LogEvent>
130+
{
131+
CreateEvent(),
132+
CreateEvent(),
133+
CreateEvent()
134+
};
135+
136+
int i = 0;
137+
events.ForEach(e =>
138+
{
139+
var sw = Stopwatch.StartNew();
140+
_sink.Emit(e);
141+
sw.Stop();
142+
143+
// Emit should return immediately the first time, since the queue is not yet full. On
144+
// subsequent calls, the queue should be full, so we should be blocked
145+
if (i > 0)
146+
{
147+
Assert.True(sw.ElapsedMilliseconds > 2000, "Should block the caller when the queue is full");
148+
}
149+
});
150+
151+
await Task.Delay(TimeSpan.FromSeconds(12));
152+
153+
// No events should be dropped
154+
Assert.Equal(3, _innerSink.Events.Count);
155+
}
156+
83157
private static LogEvent CreateEvent()
84158
{
85159
return new LogEvent(DateTimeOffset.MaxValue, LogEventLevel.Error, null,

test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkTests.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,4 @@
1-
using System;
2-
using System.Threading;
3-
using Serilog.Core;
4-
using Serilog.Events;
5-
using Serilog.Parsing;
6-
using Serilog.Sinks.Async.Tests.Support;
1+
using Serilog.Sinks.Async.Tests.Support;
72
using Xunit;
83

94
namespace Serilog.Sinks.Async.Tests

test/Serilog.Sinks.Async.Tests/Support/MemorySink.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,21 @@
22
using Serilog.Core;
33
using System.Collections.Concurrent;
44
using System;
5+
using System.Threading.Tasks;
56

67
namespace Serilog.Sinks.Async.Tests.Support
78
{
89
public class MemorySink : ILogEventSink
910
{
1011
public ConcurrentBag<LogEvent> Events { get; } = new ConcurrentBag<LogEvent>();
1112
public bool ThrowAfterCollecting { get; set; }
13+
public bool DelayEmit { get; set; }
1214

1315
public void Emit(LogEvent logEvent)
1416
{
17+
if (DelayEmit)
18+
Task.Delay(TimeSpan.FromSeconds(3)).Wait();
19+
1520
Events.Add(logEvent);
1621

1722
if (ThrowAfterCollecting)

0 commit comments

Comments
 (0)