Skip to content

Commit 319b44e

Browse files
authored
Merge pull request #148 from MihaMarkic/processqueue
Fixes ProcessQueue method so it blocks until queue has been processed
2 parents 27129f3 + d14b90f commit 319b44e

File tree

1 file changed

+23
-12
lines changed

1 file changed

+23
-12
lines changed

src/Exceptionless/Queue/DefaultEventQueue.cs

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ public class DefaultEventQueue : IEventQueue {
1616
private readonly IObjectStorage _storage;
1717
private readonly IJsonSerializer _serializer;
1818
private Timer _queueTimer;
19-
private bool _processingQueue;
19+
private Task _processingQueueTask;
20+
private readonly object _sync = new object();
2021
private readonly TimeSpan _processQueueInterval = TimeSpan.FromSeconds(10);
2122
private DateTime? _suspendProcessingUntil;
2223
private DateTime? _discardQueuedItemsUntil;
@@ -45,20 +46,26 @@ public void Enqueue(Event ev) {
4546
}
4647

4748
public Task ProcessAsync() {
48-
return Task.Factory.StartNew(Process);
49+
return Task.Run(Process);
4950
}
5051

51-
public void Process() {
52+
public Task Process() {
5253
if (!_config.Enabled) {
5354
_log.Info(typeof(DefaultEventQueue), "Configuration is disabled. The queue will not be processed.");
54-
return;
55+
return Task.FromResult(false);
5556
}
5657

57-
if (_processingQueue)
58-
return;
58+
TaskCompletionSource<bool> tcs;
59+
lock (_sync) {
60+
if (_processingQueueTask != null) {
61+
return _processingQueueTask;
62+
} else {
63+
tcs = new TaskCompletionSource<bool>();
64+
_processingQueueTask = tcs.Task;
65+
}
66+
}
5967

60-
_processingQueue = true;
61-
68+
Task resultTask;
6269
try {
6370
_log.Trace(typeof(DefaultEventQueue), "Processing queue...");
6471
_storage.CleanupQueueFiles(_config.GetQueueName(), _config.QueueMaxAge, _config.QueueMaxAttempts);
@@ -131,16 +138,20 @@ public void Process() {
131138
_log.Error(typeof(DefaultEventQueue), ex, String.Concat("An error occurred while processing the queue: ", ex.Message));
132139
SuspendProcessing();
133140
} finally {
134-
_processingQueue = false;
141+
tcs.SetResult(true);
142+
lock (_sync) {
143+
_processingQueueTask = null;
144+
resultTask = tcs.Task;
145+
}
135146
}
147+
return resultTask;
136148
}
137149

138150
private void OnProcessQueue(object state) {
139151
if (IsQueueProcessingSuspended)
140152
return;
141-
142-
if (!_processingQueue)
143-
Process();
153+
154+
Process();
144155
}
145156

146157
public void SuspendProcessing(TimeSpan? duration = null, bool discardFutureQueuedItems = false, bool clearQueue = false) {

0 commit comments

Comments
 (0)