Skip to content

Commit dd2e03e

Browse files
authored
Merge pull request #21 from cocowalla/buffer-strategy
Fulfills #20 - add option to block when the queue is full, instead of dropping events
2 parents 1a6689f + 3e5a186 commit dd2e03e

File tree

5 files changed

+177
-66
lines changed

5 files changed

+177
-66
lines changed

src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
using System;
2+
using System.ComponentModel;
23
using Serilog.Configuration;
3-
44
using Serilog.Sinks.Async;
55

66
namespace Serilog
@@ -16,18 +16,40 @@ public static class LoggerConfigurationAsyncExtensions
1616
/// <param name="loggerSinkConfiguration">The <see cref="LoggerSinkConfiguration"/> being configured.</param>
1717
/// <param name="configure">An action that configures the wrapped sink.</param>
1818
/// <param name="bufferSize">The size of the concurrent queue used to feed the background worker thread. If
19-
/// the thread is unable to process events quickly enough and the queue is filled, subsequent events will be
19+
/// the thread is unable to process events quickly enough and the queue is filled, subsequent events will be
2020
/// dropped until room is made in the queue.</param>
2121
/// <returns>A <see cref="LoggerConfiguration"/> allowing configuration to continue.</returns>
22+
[EditorBrowsable(EditorBrowsableState.Never)]
2223
public static LoggerConfiguration Async(
2324
this LoggerSinkConfiguration loggerSinkConfiguration,
2425
Action<LoggerSinkConfiguration> configure,
25-
int bufferSize = 10000)
26+
int bufferSize)
27+
{
28+
return loggerSinkConfiguration.Async(configure, bufferSize, false);
29+
}
30+
31+
/// <summary>
32+
/// Configure a sink to be invoked asynchronously, on a background worker thread.
33+
/// </summary>
34+
/// <param name="loggerSinkConfiguration">The <see cref="LoggerSinkConfiguration"/> being configured.</param>
35+
/// <param name="configure">An action that configures the wrapped sink.</param>
36+
/// <param name="bufferSize">The size of the concurrent queue used to feed the background worker thread. If
37+
/// the thread is unable to process events quickly enough and the queue is filled, depending on
38+
/// <paramref name="blockWhenFull"/> the queue will block or subsequent events will be dropped until
39+
/// room is made in the queue.</param>
40+
/// <param name="blockWhenFull">Block when the queue is full, instead of dropping events.</param>
41+
/// <returns>A <see cref="LoggerConfiguration"/> allowing configuration to continue.</returns>
42+
public static LoggerConfiguration Async(
43+
this LoggerSinkConfiguration loggerSinkConfiguration,
44+
Action<LoggerSinkConfiguration> configure,
45+
int bufferSize = 10000,
46+
bool blockWhenFull = false)
2647
{
2748
return LoggerSinkConfiguration.Wrap(
2849
loggerSinkConfiguration,
29-
wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize),
50+
wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull),
3051
configure);
3152
}
53+
3254
}
3355
}

src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -12,55 +12,63 @@ sealed class BackgroundWorkerSink : ILogEventSink, IDisposable
1212
{
1313
readonly ILogEventSink _pipeline;
1414
readonly int _bufferCapacity;
15-
volatile bool _disposed;
16-
readonly CancellationTokenSource _cancel = new CancellationTokenSource();
15+
readonly bool _blockWhenFull;
1716
readonly BlockingCollection<LogEvent> _queue;
1817
readonly Task _worker;
1918

20-
public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity)
19+
public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity, bool blockWhenFull)
2120
{
2221
if (pipeline == null) throw new ArgumentNullException(nameof(pipeline));
2322
if (bufferCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(bufferCapacity));
2423
_pipeline = pipeline;
2524
_bufferCapacity = bufferCapacity;
25+
_blockWhenFull = blockWhenFull;
2626
_queue = new BlockingCollection<LogEvent>(_bufferCapacity);
2727
_worker = Task.Factory.StartNew(Pump, CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
2828
}
2929

3030
public void Emit(LogEvent logEvent)
3131
{
32-
// The disposed check is racy, but only ensures we don't prevent flush from
33-
// completing by pushing more events.
34-
if (!_disposed && !_queue.TryAdd(logEvent))
35-
SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _bufferCapacity);
32+
if (this._queue.IsAddingCompleted)
33+
return;
34+
35+
try
36+
{
37+
if (_blockWhenFull)
38+
{
39+
_queue.Add(logEvent);
40+
}
41+
else
42+
{
43+
if (!_queue.TryAdd(logEvent))
44+
SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _bufferCapacity);
45+
}
46+
}
47+
catch (InvalidOperationException)
48+
{
49+
// Thrown in the event of a race condition when we try to add another event after
50+
// CompleteAdding has been called
51+
}
3652
}
3753

3854
public void Dispose()
3955
{
40-
_disposed = true;
41-
_cancel.Cancel();
42-
_worker.Wait();
56+
// Prevent any more events from being added
57+
_queue.CompleteAdding();
58+
59+
// Allow queued events to be flushed
60+
_worker.Wait();
61+
4362
(_pipeline as IDisposable)?.Dispose();
44-
// _cancel not disposed, because it will make _cancel.Cancel() non-idempotent
4563
}
4664

4765
void Pump()
4866
{
4967
try
5068
{
51-
try
52-
{
53-
while (true)
54-
{
55-
var next = _queue.Take(_cancel.Token);
56-
_pipeline.Emit(next);
57-
}
58-
}
59-
catch (OperationCanceledException)
69+
foreach (var next in _queue.GetConsumingEnumerable())
6070
{
61-
LogEvent next;
62-
while (_queue.TryTake(out next))
63-
_pipeline.Emit(next);
71+
_pipeline.Emit(next);
6472
}
6573
}
6674
catch (Exception ex)

test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs

Lines changed: 114 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,83 +1,164 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Diagnostics;
34
using System.Linq;
45
using System.Threading.Tasks;
56
using Serilog.Core;
67
using Serilog.Events;
78
using Serilog.Parsing;
8-
using Serilog.Sinks.Async.Tests;
99
using Serilog.Sinks.Async.Tests.Support;
1010
using Xunit;
1111

1212
namespace Serilog.Sinks.Async.Tests
1313
{
14-
public class BackgroundWorkerSinkSpec : IDisposable
14+
public class BackgroundWorkerSinkSpec
1515
{
16+
readonly Logger _logger;
1617
readonly MemorySink _innerSink;
17-
readonly BackgroundWorkerSink _sink;
1818

1919
public BackgroundWorkerSinkSpec()
2020
{
2121
_innerSink = new MemorySink();
22-
var logger = new LoggerConfiguration().WriteTo.Sink(_innerSink).CreateLogger();
23-
_sink = new BackgroundWorkerSink(logger, 10000);
24-
}
25-
26-
public void Dispose()
27-
{
28-
_sink.Dispose();
22+
_logger = new LoggerConfiguration().WriteTo.Sink(_innerSink).CreateLogger();
2923
}
3024

3125
[Fact]
3226
public void WhenCtorWithNullSink_ThenThrows()
3327
{
34-
Assert.Throws<ArgumentNullException>(() => new BackgroundWorkerSink(null, 10000));
28+
Assert.Throws<ArgumentNullException>(() => new BackgroundWorkerSink(null, 10000, false));
3529
}
3630

3731
[Fact]
3832
public async Task WhenEmitSingle_ThenRelaysToInnerSink()
3933
{
40-
var logEvent = CreateEvent();
41-
_sink.Emit(logEvent);
34+
using (var sink = this.CreateSinkWithDefaultOptions())
35+
{
36+
var logEvent = CreateEvent();
37+
38+
sink.Emit(logEvent);
4239

43-
await Task.Delay(TimeSpan.FromSeconds(3));
40+
await Task.Delay(TimeSpan.FromSeconds(3));
4441

45-
Assert.Equal(1, _innerSink.Events.Count);
42+
Assert.Equal(1, _innerSink.Events.Count);
43+
}
4644
}
4745

4846
[Fact]
4947
public async Task WhenInnerEmitThrows_ThenContinuesRelaysToInnerSink()
5048
{
51-
_innerSink.ThrowAfterCollecting = true;
52-
53-
var events = new List<LogEvent>
49+
using (var sink = this.CreateSinkWithDefaultOptions())
5450
{
55-
CreateEvent(),
56-
CreateEvent(),
57-
CreateEvent()
58-
};
59-
events.ForEach(e => _sink.Emit(e));
51+
_innerSink.ThrowAfterCollecting = true;
52+
53+
var events = new List<LogEvent>
54+
{
55+
CreateEvent(),
56+
CreateEvent(),
57+
CreateEvent()
58+
};
59+
events.ForEach(e => sink.Emit(e));
6060

61-
await Task.Delay(TimeSpan.FromSeconds(3));
61+
await Task.Delay(TimeSpan.FromSeconds(3));
6262

63-
Assert.Equal(3, _innerSink.Events.Count);
63+
Assert.Equal(3, _innerSink.Events.Count);
64+
}
6465
}
6566

6667
[Fact]
6768
public async Task WhenEmitMultipleTimes_ThenRelaysToInnerSink()
6869
{
69-
var events = new List<LogEvent>
70+
using (var sink = this.CreateSinkWithDefaultOptions())
7071
{
71-
CreateEvent(),
72-
CreateEvent(),
73-
CreateEvent()
74-
};
72+
var events = new List<LogEvent>
73+
{
74+
CreateEvent(),
75+
CreateEvent(),
76+
CreateEvent()
77+
};
78+
events.ForEach(e => { sink.Emit(e); });
79+
80+
await Task.Delay(TimeSpan.FromSeconds(3));
81+
82+
Assert.Equal(3, _innerSink.Events.Count);
83+
}
84+
}
7585

76-
events.ForEach(e => { _sink.Emit(e); });
86+
[Fact]
87+
public async Task WhenQueueFull_ThenDropsEvents()
88+
{
89+
using (var sink = new BackgroundWorkerSink(_logger, 1, false))
90+
{
91+
// Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity
92+
// after the first event is popped
93+
_innerSink.DelayEmit = TimeSpan.FromMilliseconds(300);
94+
95+
var events = new List<LogEvent>
96+
{
97+
CreateEvent(),
98+
CreateEvent(),
99+
CreateEvent(),
100+
CreateEvent(),
101+
CreateEvent()
102+
};
103+
events.ForEach(e =>
104+
{
105+
var sw = Stopwatch.StartNew();
106+
sink.Emit(e);
107+
sw.Stop();
108+
109+
Assert.True(sw.ElapsedMilliseconds < 200, "Should not block the caller when the queue is full");
110+
});
111+
112+
// If we *weren't* dropped events, the delay in the inner sink would mean the 5 events would take
113+
// at least 15 seconds to process
114+
await Task.Delay(TimeSpan.FromSeconds(2));
115+
116+
// Events should be dropped
117+
Assert.Equal(2, _innerSink.Events.Count);
118+
}
119+
}
77120

78-
await Task.Delay(TimeSpan.FromSeconds(3));
121+
[Fact]
122+
public async Task WhenQueueFull_ThenBlocks()
123+
{
124+
using (var sink = new BackgroundWorkerSink(_logger, 1, true))
125+
{
126+
// Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity
127+
// after the first event is popped
128+
_innerSink.DelayEmit = TimeSpan.FromMilliseconds(300);
129+
130+
var events = new List<LogEvent>
131+
{
132+
CreateEvent(),
133+
CreateEvent(),
134+
CreateEvent()
135+
};
136+
137+
int i = 0;
138+
events.ForEach(e =>
139+
{
140+
var sw = Stopwatch.StartNew();
141+
sink.Emit(e);
142+
sw.Stop();
143+
144+
// Emit should return immediately the first time, since the queue is not yet full. On
145+
// subsequent calls, the queue should be full, so we should be blocked
146+
if (i > 0)
147+
{
148+
Assert.True(sw.ElapsedMilliseconds > 200, "Should block the caller when the queue is full");
149+
}
150+
});
151+
152+
await Task.Delay(TimeSpan.FromSeconds(2));
153+
154+
// No events should be dropped
155+
Assert.Equal(3, _innerSink.Events.Count);
156+
}
157+
}
79158

80-
Assert.Equal(3, _innerSink.Events.Count);
159+
private BackgroundWorkerSink CreateSinkWithDefaultOptions()
160+
{
161+
return new BackgroundWorkerSink(_logger, 10000, false);
81162
}
82163

83164
private static LogEvent CreateEvent()

test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkTests.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,4 @@
1-
using System;
2-
using System.Threading;
3-
using Serilog.Core;
4-
using Serilog.Events;
5-
using Serilog.Parsing;
6-
using Serilog.Sinks.Async.Tests.Support;
1+
using Serilog.Sinks.Async.Tests.Support;
72
using Xunit;
83

94
namespace Serilog.Sinks.Async.Tests

test/Serilog.Sinks.Async.Tests/Support/MemorySink.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,21 @@
22
using Serilog.Core;
33
using System.Collections.Concurrent;
44
using System;
5+
using System.Threading.Tasks;
56

67
namespace Serilog.Sinks.Async.Tests.Support
78
{
89
public class MemorySink : ILogEventSink
910
{
1011
public ConcurrentBag<LogEvent> Events { get; } = new ConcurrentBag<LogEvent>();
1112
public bool ThrowAfterCollecting { get; set; }
13+
public TimeSpan? DelayEmit { get; set; }
1214

1315
public void Emit(LogEvent logEvent)
1416
{
17+
if (DelayEmit.HasValue)
18+
Task.Delay(DelayEmit.Value).Wait();
19+
1520
Events.Add(logEvent);
1621

1722
if (ThrowAfterCollecting)

0 commit comments

Comments
 (0)