Skip to content

Commit 9a34941

Browse files
committed
Added locking to the DuplicateCheckerPlugin as there were race conditions
1 parent 91f4cbd commit 9a34941

File tree

2 files changed

+36
-25
lines changed

2 files changed

+36
-25
lines changed

Source/Shared/Plugins/Default/1010_DuplicateCheckerPlugin.cs

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ namespace Exceptionless.Plugins.Default {
1010
public class DuplicateCheckerPlugin : IEventPlugin, IDisposable {
1111
private readonly ConcurrentQueue<Tuple<int, DateTimeOffset>> _processed = new ConcurrentQueue<Tuple<int, DateTimeOffset>>();
1212
private readonly ConcurrentQueue<MergedEvent> _mergedEvents = new ConcurrentQueue<MergedEvent>();
13+
private readonly object _lock = new object();
1314
private readonly TimeSpan _interval;
1415
private Timer _timer;
1516

@@ -31,40 +32,47 @@ public void Run(EventPluginContext context) {
3132
int hashCode = context.Event.GetHashCode();
3233
int count = context.Event.Count ?? 1;
3334
context.Log.FormattedTrace(typeof(DuplicateCheckerPlugin), String.Concat("Checking event: ", context.Event.Message, " with hash: ", hashCode));
35+
36+
lock (_lock) {
37+
context.Log.Debug("Got Lock on thread" + Thread.CurrentThread.ManagedThreadId);
3438

35-
// Increment the occurrence count if the event is already queued for submission.
36-
var merged = _mergedEvents.FirstOrDefault(s => s.HashCode == hashCode);
37-
if (merged != null) {
38-
merged.IncrementCount(count);
39-
context.Log.FormattedInfo(typeof(DuplicateCheckerPlugin), String.Concat("Ignoring duplicate event with hash:", hashCode));
40-
context.Cancel = true;
41-
return;
42-
}
39+
// Increment the occurrence count if the event is already queued for submission.
40+
var merged = _mergedEvents.FirstOrDefault(s => s.HashCode == hashCode);
41+
if (merged != null) {
42+
merged.IncrementCount(count);
43+
context.Log.FormattedInfo(typeof(DuplicateCheckerPlugin), String.Concat("Ignoring duplicate event with hash:", hashCode));
44+
context.Cancel = true;
45+
return;
46+
}
47+
48+
DateTimeOffset repeatWindow = DateTimeOffset.UtcNow.Subtract(_interval);
49+
if (_processed.Any(s => s.Item1 == hashCode && s.Item2 >= repeatWindow)) {
50+
context.Log.FormattedInfo(typeof(DuplicateCheckerPlugin), String.Concat("Adding event with hash:", hashCode, " to cache."));
51+
// This event is a duplicate for the first time, lets save it so we can delay it while keeping count
52+
_mergedEvents.Enqueue(new MergedEvent(hashCode, context, count));
53+
context.Cancel = true;
54+
return;
55+
}
4356

44-
DateTimeOffset repeatWindow = DateTimeOffset.UtcNow.Subtract(_interval);
45-
if (_processed.Any(s => s.Item1 == hashCode && s.Item2 >= repeatWindow)) {
46-
context.Log.FormattedInfo(typeof(DuplicateCheckerPlugin), String.Concat("Adding event with hash:", hashCode, " to cache."));
47-
// This event is a duplicate for the first time, lets save it so we can delay it while keeping count
48-
_mergedEvents.Enqueue(new MergedEvent(hashCode, context, count));
49-
context.Cancel = true;
50-
} else {
5157
context.Log.FormattedInfo(typeof(DuplicateCheckerPlugin), String.Concat("Enqueueing event with hash:", hashCode, " to cache."));
5258
_processed.Enqueue(Tuple.Create(hashCode, DateTimeOffset.UtcNow));
59+
60+
Tuple<int, DateTimeOffset> temp;
61+
while (_processed.Count > 50)
62+
_processed.TryDequeue(out temp);
5363
}
54-
55-
Tuple<int, DateTimeOffset> temp;
56-
while (_processed.Count > 50)
57-
_processed.TryDequeue(out temp);
5864
}
5965

6066
private void OnTimer(object state) {
6167
EnqueueMergedEvents();
6268
}
6369

6470
private void EnqueueMergedEvents() {
65-
MergedEvent mergedEvent;
66-
while (_mergedEvents.TryDequeue(out mergedEvent))
67-
mergedEvent.Enqueue();
71+
lock (_lock) {
72+
MergedEvent mergedEvent;
73+
while (_mergedEvents.TryDequeue(out mergedEvent))
74+
mergedEvent.Enqueue();
75+
}
6876
}
6977

7078
public void Dispose() {

Source/Tests/Plugins/PluginTests.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -575,18 +575,21 @@ public void VerifyDeduplicationMultithreaded() {
575575
var errorPlugin = new ErrorPlugin();
576576

577577
List<EventPluginContext> contexts = new List<EventPluginContext>();
578-
using (var duplicateCheckerPlugin = new DuplicateCheckerPlugin(TimeSpan.FromMilliseconds(20))) {
579-
Parallel.For(0, 10, index => {
578+
using (var duplicateCheckerPlugin = new DuplicateCheckerPlugin(TimeSpan.FromMilliseconds(100))) {
579+
var result = Parallel.For(0, 10, index => {
580580
var builder = GetException().ToExceptionless();
581581
var context = new EventPluginContext(client, builder.Target, builder.PluginContextData);
582582
contexts.Add(context);
583583

584584
errorPlugin.Run(context);
585585
duplicateCheckerPlugin.Run(context);
586586
});
587+
588+
while (!result.IsCompleted)
589+
Thread.Sleep(1);
587590
}
588591

589-
Thread.Sleep(50);
592+
Thread.Sleep(150);
590593
Assert.Equal(1, contexts.Count(c => !c.Cancel));
591594
Assert.Equal(9, contexts.Count(c => c.Cancel));
592595
Assert.Equal(9, contexts.Sum(c => c.Event.Count.GetValueOrDefault()));

0 commit comments

Comments
 (0)