Skip to content

Commit 7f951c0

Browse files
committed
Using an ActionBlock<T> instead of a custom Consumer
1 parent 5f30497 commit 7f951c0

File tree

9 files changed

+319
-99
lines changed

9 files changed

+319
-99
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using Microsoft.VisualStudio.TestTools.UnitTesting;
4+
5+
namespace Serilog.Sinks.Async.UnitTests
6+
{
7+
public static class Assert
8+
{
9+
public static async Task<TException> ThrowsAsync<TException>(Func<Task> action) where TException : Exception
10+
{
11+
try
12+
{
13+
await action();
14+
}
15+
catch (TException ex)
16+
{
17+
return ex;
18+
}
19+
catch (Exception ex)
20+
{
21+
throw new AssertFailedException(string.Format("Was expecting an exception of type {0}, but threw {1}",
22+
typeof (TException), ex.GetType()));
23+
}
24+
25+
throw new AssertFailedException(string.Format("Was expecting an exception of type {0}", typeof (TException)));
26+
}
27+
28+
public static void AreEqual(int expected, int actual)
29+
{
30+
Microsoft.VisualStudio.TestTools.UnitTesting.Assert.AreEqual(expected, actual);
31+
}
32+
33+
public static void IsTrue(bool actual)
34+
{
35+
Microsoft.VisualStudio.TestTools.UnitTesting.Assert.IsTrue(actual);
36+
}
37+
}
38+
}

src/Serilog.Sinks.Async.UnitTests/BufferedQueueSpec.cs

Lines changed: 38 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Linq;
34
using System.Threading.Tasks;
45
using Microsoft.VisualStudio.TestTools.UnitTesting;
56
using Moq;
@@ -8,59 +9,52 @@ namespace Serilog.Sinks.Async.UnitTests
89
{
910
public class BufferedQueueSpec
1011
{
12+
private static List<TestMessage> CreateMessages(int count)
13+
{
14+
var messages = new List<TestMessage>();
15+
16+
Loop.For(() => { messages.Add(new TestMessage()); }, count);
17+
18+
return messages;
19+
}
20+
21+
// ReSharper disable once MemberCanBePrivate.Global
1122
public interface IRanAction<in TMessage>
1223
{
1324
void Ran(TMessage message);
1425
}
1526

1627
[TestClass]
17-
public class GivenAContext
28+
public class GivenAnAction
1829
{
30+
private const int BufferSize = 5;
1931
private BufferedQueue<TestMessage> _queue;
2032
private Mock<IRanAction<TestMessage>> _ranner;
2133

2234
[TestInitialize]
2335
public void Initialize()
2436
{
2537
_ranner = new Mock<IRanAction<TestMessage>>();
26-
_queue = new BufferedQueue<TestMessage>(5);
38+
_queue = new BufferedQueue<TestMessage>(BufferSize, _ranner.Object.Ran);
2739
}
2840

29-
[TestMethod, TestCategory("Unit")]
30-
public async Task WhenProduceBatchOfOne_ThenReturnsProduced()
41+
[TestCleanup]
42+
public void Cleanup()
3143
{
32-
await _queue.ProduceAsync(new TestMessage());
33-
34-
Assert.AreEqual(1, _queue.Count);
35-
}
36-
37-
[TestMethod, TestCategory("Unit")]
38-
public void WhenProduceBatch_ThenReturnsProduced()
39-
{
40-
var messages = new List<TestMessage>
44+
if (_queue != null)
4145
{
42-
new TestMessage(),
43-
new TestMessage(),
44-
new TestMessage()
45-
};
46-
47-
messages.ForEach(async msg => await _queue.ProduceAsync(msg));
48-
49-
Assert.AreEqual(3, _queue.Count);
46+
_queue.Complete();
47+
}
5048
}
5149

5250
[TestMethod, TestCategory("Unit")]
53-
public async Task WhenConsumeAsyncAndNoMessages_ThenConsumesNone()
51+
public void WhenProduceNoMessages_ThenConsumesNone()
5452
{
55-
var consumer = _queue.ConsumeAsync(msg => _ranner.Object.Ran(msg));
56-
_queue.Complete();
57-
await consumer;
58-
5953
_ranner.Verify(r => r.Ran(It.IsAny<TestMessage>()), Times.Never);
6054
}
6155

6256
[TestMethod, TestCategory("Unit")]
63-
public async Task WhenConsumeAsyncAndActionThrows_ThenConsumesAllProduced()
57+
public async Task WhenProduceAndActionThrows_ThenConsumesAllProduced()
6458
{
6559
var messages = new List<TestMessage>
6660
{
@@ -72,81 +66,68 @@ public async Task WhenConsumeAsyncAndActionThrows_ThenConsumesAllProduced()
7266
_ranner.Setup(r => r.Ran(It.IsAny<TestMessage>()))
7367
.Throws(exception);
7468

75-
messages.ForEach(async msg => await _queue.ProduceAsync(msg));
76-
var consumer = _queue.ConsumeAsync(msg => _ranner.Object.Ran(msg));
69+
await Task.WhenAll(messages.Select(msg => _queue.ProduceAsync(msg)));
7770
_queue.Complete();
7871

79-
await Task.WhenAll(consumer, _queue.IsComplete);
72+
await _queue.IsComplete;
8073

8174
messages.ForEach(msg => { _ranner.Verify(r => r.Ran(msg), Times.Once); });
8275
}
8376

8477
[TestMethod, TestCategory("Unit")]
85-
public async Task WhenConsumeAsyncWithProducedBatchOfOne_ThenConsumesAllProduced()
78+
public async Task WhenProduceBatchOfOne_ThenConsumesAllProduced()
8679
{
8780
var message = new TestMessage();
8881

8982
await _queue.ProduceAsync(message);
90-
var consumer = _queue.ConsumeAsync(msg => _ranner.Object.Ran(msg));
9183
_queue.Complete();
9284

93-
await Task.WhenAll(consumer, _queue.IsComplete);
85+
await _queue.IsComplete;
9486

9587
_ranner.Verify(r => r.Ran(message), Times.Once);
9688
}
9789

9890
[TestMethod, TestCategory("Unit")]
99-
public async Task WhenConsumeAsyncWithProducedBatchSmallerThanBuffer_ThenConsumesAllProduced()
91+
public async Task WhenProducedBatchSmallerThanBuffer_ThenConsumesAllProduced()
10092
{
101-
var messages = CreateMessages(3);
93+
var messages = CreateMessages(BufferSize - 1);
10294

103-
messages.ForEach(async msg => await _queue.ProduceAsync(msg));
104-
var consumer = _queue.ConsumeAsync(msg => _ranner.Object.Ran(msg));
95+
await Task.WhenAll(messages.Select(msg => _queue.ProduceAsync(msg)));
10596
_queue.Complete();
10697

107-
await Task.WhenAll(consumer, _queue.IsComplete);
98+
await _queue.IsComplete;
10899

109100
messages.ForEach(msg => { _ranner.Verify(r => r.Ran(msg), Times.Once); });
110101
}
111102

112103
[TestMethod, TestCategory("Unit")]
113-
public async Task WhenConsumeAsyncWithProducedBatchSizeOfBuffer_ThenConsumesAllProduced()
104+
public async Task WhenProducedBatchSizeOfBuffer_ThenConsumesAllProduced()
114105
{
115-
var messages = CreateMessages(_queue.Size);
106+
var messages = CreateMessages(BufferSize);
116107

117-
messages.ForEach(async msg => await _queue.ProduceAsync(msg));
118-
var consumer = _queue.ConsumeAsync(msg => _ranner.Object.Ran(msg));
108+
await Task.WhenAll(messages.Select(msg => _queue.ProduceAsync(msg)));
119109
_queue.Complete();
120110

121-
await Task.WhenAll(consumer, _queue.IsComplete);
111+
await _queue.IsComplete;
122112

123113
messages.ForEach(msg => { _ranner.Verify(r => r.Ran(msg), Times.Once); });
124114
}
125115

126116
[TestMethod, TestCategory("Unit")]
127-
public async Task WhenConsumeAsyncWithProducedBatchLargerThanBuffer_ThenConsumesAllProduced()
117+
public async Task WhenProducedBatchLargerThanBuffer_ThenConsumesAllProduced()
128118
{
129-
var messages = CreateMessages(_queue.Size + 1);
119+
var messages = CreateMessages(BufferSize + 1);
130120

131-
messages.ForEach(async msg => await _queue.ProduceAsync(msg));
132-
var consumer = _queue.ConsumeAsync(msg => _ranner.Object.Ran(msg));
121+
await Task.WhenAll(messages.Select(msg => _queue.ProduceAsync(msg)));
133122
_queue.Complete();
134123

135-
await Task.WhenAll(consumer, _queue.IsComplete);
124+
await _queue.IsComplete;
136125

137126
messages.ForEach(msg => { _ranner.Verify(r => r.Ran(msg), Times.Once); });
138127
}
139-
140-
private static List<TestMessage> CreateMessages(int count)
141-
{
142-
var messages = new List<TestMessage>();
143-
144-
Loop.For(counter => { messages.Add(new TestMessage()); }, count);
145-
146-
return messages;
147-
}
148128
}
149129

130+
// ReSharper disable once MemberCanBePrivate.Global
150131
public class TestMessage
151132
{
152133
}

src/Serilog.Sinks.Async.UnitTests/Serilog.Sinks.Async.UnitTests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
<Compile Include="..\GlobalAssemblyInfo.cs">
7272
<Link>Properties\GlobalAssemblyInfo.cs</Link>
7373
</Compile>
74+
<Compile Include="AssertExtensions.cs" />
7475
<Compile Include="BufferedQueueSinkSpec.cs" />
7576
<Compile Include="BufferedQueueSpec.cs" />
7677
<Compile Include="Properties\AssemblyInfo.cs" />

src/Serilog.Sinks.Async/BufferedQueue.cs

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using System;
2-
using System.Threading;
32
using System.Threading.Tasks;
43
using System.Threading.Tasks.Dataflow;
54

@@ -14,26 +13,40 @@ namespace Serilog.Sinks.Async
1413
public class BufferedQueue<TMessage>
1514
{
1615
private const int DefaultQueueSize = 50;
17-
16+
private readonly ActionBlock<TMessage> _executor;
1817
private readonly BufferBlock<TMessage> _queue;
1918

20-
public BufferedQueue()
21-
: this(0)
19+
public BufferedQueue(int size)
2220
{
21+
Size = ((size > 0) ? size : DefaultQueueSize);
22+
_queue = new BufferBlock<TMessage>(new DataflowBlockOptions
23+
{
24+
BoundedCapacity = Size
25+
});
2326
}
2427

25-
public BufferedQueue(int size)
28+
public BufferedQueue(int size, Action<TMessage> action)
2629
{
30+
if (action == null)
31+
{
32+
throw new ArgumentNullException("action");
33+
}
34+
2735
Size = ((size > 0) ? size : DefaultQueueSize);
2836
_queue = new BufferBlock<TMessage>(new DataflowBlockOptions
2937
{
3038
BoundedCapacity = Size
3139
});
40+
_executor = new ActionBlock<TMessage>(message => { ExecuteAction(action, message); });
41+
_queue.LinkTo(_executor, new DataflowLinkOptions
42+
{
43+
PropagateCompletion = true
44+
});
3245
}
3346

3447
public Task IsComplete
3548
{
36-
get { return _queue.Completion; }
49+
get { return _executor != null ? _executor.Completion : _queue.Completion; }
3750
}
3851

3952
public int Count
@@ -48,25 +61,15 @@ public async Task ProduceAsync(TMessage message)
4861
await _queue.SendAsync(message);
4962
}
5063

51-
public async Task ConsumeAsync(Action<TMessage> action)
64+
private static void ExecuteAction(Action<TMessage> action, TMessage message)
5265
{
53-
await ConsumeAsync(action, CancellationToken.None);
54-
}
55-
56-
public async Task ConsumeAsync(Action<TMessage> action, CancellationToken cancellation)
57-
{
58-
while (await _queue.OutputAvailableAsync(cancellation))
66+
try
5967
{
60-
var message = await _queue.ReceiveAsync(cancellation);
61-
62-
try
63-
{
64-
action(message);
65-
}
66-
catch (Exception)
67-
{
68-
//Log and Ignore exception and continue
69-
}
68+
action(message);
69+
}
70+
catch (Exception)
71+
{
72+
//Log and Ignore exception and continue
7073
}
7174
}
7275

src/Serilog.Sinks.Async/BufferedQueueSink.cs

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using System;
2-
using System.Threading.Tasks;
32
using Serilog.Core;
43
using Serilog.Events;
54

@@ -44,25 +43,15 @@ private void EnsureConsumerStarted()
4443
}
4544
ConsumerStarted = true;
4645

47-
_queue = new BufferedQueue<LogEvent>(_bufferSize);
48-
49-
StartConsumer(logEvent => _sink.Emit(logEvent));
50-
}
51-
52-
public void StartConsumer(Action<LogEvent> action)
53-
{
54-
Task.Run(async () =>
46+
_queue = new BufferedQueue<LogEvent>(_bufferSize, logEvent =>
5547
{
56-
while (true)
48+
try
49+
{
50+
_sink.Emit(logEvent);
51+
}
52+
catch (Exception)
5753
{
58-
try
59-
{
60-
await _queue.ConsumeAsync(action);
61-
}
62-
catch (Exception)
63-
{
64-
//Log and Ignore exception and continue
65-
}
54+
//Log and Ignore exception and continue
6655
}
6756
});
6857
}
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using System.Reflection;
2+
using System.Runtime.CompilerServices;
23
using System.Runtime.InteropServices;
34

45
[assembly: AssemblyTitle("Serilog.Sinks.Async")]
56
[assembly: AssemblyDescription("An async Serilog sink")]
6-
[assembly: Guid("02106765-de7b-48c8-aa6e-79ebb435a6cd")]
7+
[assembly: Guid("02106765-de7b-48c8-aa6e-79ebb435a6cd")]
8+
[assembly: InternalsVisibleTo("Serilog.Sinks.Async.UnitTests")]

0 commit comments

Comments
 (0)