Skip to content

Commit 6678c50

Browse files
committed
Fixes ProcessQueue method so it blocks until queue has been processed
1 parent 27129f3 commit 6678c50

File tree

1 file changed

+54
-26
lines changed

1 file changed

+54
-26
lines changed

src/Exceptionless/Queue/DefaultEventQueue.cs

Lines changed: 54 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,13 @@ public class DefaultEventQueue : IEventQueue {
1616
private readonly IObjectStorage _storage;
1717
private readonly IJsonSerializer _serializer;
1818
private Timer _queueTimer;
19-
private bool _processingQueue;
19+
private Task _processingQueueTask;
20+
private readonly object sync = new object();
2021
private readonly TimeSpan _processQueueInterval = TimeSpan.FromSeconds(10);
2122
private DateTime? _suspendProcessingUntil;
2223
private DateTime? _discardQueuedItemsUntil;
2324

24-
public DefaultEventQueue(ExceptionlessConfiguration config, IExceptionlessLog log, ISubmissionClient client, IObjectStorage objectStorage, IJsonSerializer serializer): this(config, log, client, objectStorage, serializer, null, null) {}
25+
public DefaultEventQueue(ExceptionlessConfiguration config, IExceptionlessLog log, ISubmissionClient client, IObjectStorage objectStorage, IJsonSerializer serializer) : this(config, log, client, objectStorage, serializer, null, null) { }
2526

2627
public DefaultEventQueue(ExceptionlessConfiguration config, IExceptionlessLog log, ISubmissionClient client, IObjectStorage objectStorage, IJsonSerializer serializer, TimeSpan? processQueueInterval, TimeSpan? queueStartDelay) {
2728
_log = log;
@@ -45,20 +46,30 @@ public void Enqueue(Event ev) {
4546
}
4647

4748
public Task ProcessAsync() {
48-
return Task.Factory.StartNew(Process);
49+
return Task.Factory.StartNew(
50+
() => {
51+
Process().ConfigureAwait(false).GetAwaiter().GetResult();
52+
});
4953
}
5054

51-
public void Process() {
55+
public Task Process() {
5256
if (!_config.Enabled) {
5357
_log.Info(typeof(DefaultEventQueue), "Configuration is disabled. The queue will not be processed.");
54-
return;
58+
return Task.FromResult(false);
5559
}
5660

57-
if (_processingQueue)
58-
return;
61+
TaskCompletionSource<bool> tcs;
62+
lock (sync) {
63+
if (_processingQueueTask != null) {
64+
return _processingQueueTask;
65+
}
66+
else {
67+
tcs = new TaskCompletionSource<bool>();
68+
_processingQueueTask = tcs.Task;
69+
}
70+
}
5971

60-
_processingQueue = true;
61-
72+
Task resultTask;
6273
try {
6374
_log.Trace(typeof(DefaultEventQueue), "Processing queue...");
6475
_storage.CleanupQueueFiles(_config.GetQueueName(), _config.QueueMaxAge, _config.QueueMaxAttempts);
@@ -75,43 +86,52 @@ public void Process() {
7586
var response = _client.PostEvents(events, _config, _serializer);
7687
if (response.Success) {
7788
_log.FormattedInfo(typeof(DefaultEventQueue), "Sent {0} events to \"{1}\".", batch.Count, _config.ServerUrl);
78-
} else if (response.ServiceUnavailable) {
89+
}
90+
else if (response.ServiceUnavailable) {
7991
// You are currently over your rate limit or the servers are under stress.
8092
_log.Error(typeof(DefaultEventQueue), "Server returned service unavailable.");
8193
SuspendProcessing();
8294
deleteBatch = false;
83-
} else if (response.PaymentRequired) {
95+
}
96+
else if (response.PaymentRequired) {
8497
// If the organization over the rate limit then discard the event.
8598
_log.Warn(typeof(DefaultEventQueue), "Too many events have been submitted, please upgrade your plan.");
8699
SuspendProcessing(discardFutureQueuedItems: true, clearQueue: true);
87-
} else if (response.UnableToAuthenticate) {
100+
}
101+
else if (response.UnableToAuthenticate) {
88102
// The api key was suspended or could not be authorized.
89103
_log.Error(typeof(DefaultEventQueue), "Unable to authenticate, please check your configuration. The event will not be submitted.");
90104
SuspendProcessing(TimeSpan.FromMinutes(15));
91-
} else if (response.NotFound || response.BadRequest) {
105+
}
106+
else if (response.NotFound || response.BadRequest) {
92107
// The service end point could not be found.
93108
_log.FormattedError(typeof(DefaultEventQueue), "Error while trying to submit data: {0}", response.Message);
94109
SuspendProcessing(TimeSpan.FromHours(4));
95-
} else if (response.RequestEntityTooLarge) {
110+
}
111+
else if (response.RequestEntityTooLarge) {
96112
if (batchSize > 1) {
97113
_log.Error(typeof(DefaultEventQueue), "Event submission discarded for being too large. The event will be retried with a smaller batch size.");
98114
batchSize = Math.Max(1, (int)Math.Round(batchSize / 1.5d, 0));
99115
deleteBatch = false;
100-
} else {
116+
}
117+
else {
101118
_log.Error(typeof(DefaultEventQueue), "Event submission discarded for being too large. The event will not be submitted.");
102119
}
103-
} else if (!response.Success) {
120+
}
121+
else if (!response.Success) {
104122
_log.Error(typeof(DefaultEventQueue), String.Concat("An error occurred while submitting events: ", response.Message));
105123
SuspendProcessing();
106124
deleteBatch = false;
107125
}
108126

109127
OnEventsPosted(new EventsPostedEventArgs { Events = events, Response = response });
110-
} catch (AggregateException ex) {
128+
}
129+
catch (AggregateException ex) {
111130
_log.Error(typeof(DefaultEventQueue), ex, String.Concat("An error occurred while submitting events: ", ex.Flatten().Message));
112131
SuspendProcessing();
113132
deleteBatch = false;
114-
} catch (Exception ex) {
133+
}
134+
catch (Exception ex) {
115135
_log.Error(typeof(DefaultEventQueue), ex, String.Concat("An error occurred while submitting events: ", ex.Message));
116136
SuspendProcessing();
117137
deleteBatch = false;
@@ -127,20 +147,26 @@ public void Process() {
127147

128148
batch = _storage.GetEventBatch(_config.GetQueueName(), _serializer, batchSize, maxCreatedDate);
129149
}
130-
} catch (Exception ex) {
150+
}
151+
catch (Exception ex) {
131152
_log.Error(typeof(DefaultEventQueue), ex, String.Concat("An error occurred while processing the queue: ", ex.Message));
132153
SuspendProcessing();
133-
} finally {
134-
_processingQueue = false;
135154
}
155+
finally {
156+
tcs.SetResult(true);
157+
lock (sync) {
158+
_processingQueueTask = null;
159+
resultTask = tcs.Task;
160+
}
161+
}
162+
return resultTask;
136163
}
137164

138165
private void OnProcessQueue(object state) {
139166
if (IsQueueProcessingSuspended)
140167
return;
141-
142-
if (!_processingQueue)
143-
Process();
168+
169+
Process();
144170
}
145171

146172
public void SuspendProcessing(TimeSpan? duration = null, bool discardFutureQueuedItems = false, bool clearQueue = false) {
@@ -162,7 +188,8 @@ public void SuspendProcessing(TimeSpan? duration = null, bool discardFutureQueue
162188
#pragma warning disable 4014
163189
_storage.CleanupQueueFiles(_config.GetQueueName(), TimeSpan.Zero);
164190
#pragma warning restore 4014
165-
} catch (Exception) { }
191+
}
192+
catch (Exception) { }
166193
}
167194

168195
public event EventHandler<EventsPostedEventArgs> EventsPosted;
@@ -171,7 +198,8 @@ protected virtual void OnEventsPosted(EventsPostedEventArgs e) {
171198
try {
172199
if (EventsPosted != null)
173200
EventsPosted.Invoke(this, e);
174-
} catch (Exception ex) {
201+
}
202+
catch (Exception ex) {
175203
_log.Error(typeof(DefaultEventQueue), ex, "Error while calling OnEventsPosted event handlers.");
176204
}
177205
}

0 commit comments

Comments
 (0)