Skip to content

Commit f3edfd2

Browse files
committed
Batch ingestion not correctly handling OperationCancelledException which can cause the ingestion to never used incoming context tasks and hang.
1 parent b882944 commit f3edfd2

File tree

2 files changed

+104
-72
lines changed

2 files changed

+104
-72
lines changed

src/ServiceControl.Audit/Auditing/AuditIngestion.cs

Lines changed: 52 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,27 @@ public AuditIngestion(
5959

6060
errorHandlingPolicy = new AuditIngestionFaultPolicy(failedImportsStorage, settings.LoggingSettings, OnCriticalError);
6161

62-
watchdog = new Watchdog("audit message ingestion", EnsureStarted, EnsureStopped, ingestionState.ReportError, ingestionState.Clear, settings.TimeToRestartAuditIngestionAfterFailure, logger);
63-
64-
ingestionWorker = Task.Run(() => Loop(), CancellationToken.None);
62+
watchdog = new Watchdog(
63+
"audit message ingestion",
64+
EnsureStarted,
65+
EnsureStopped,
66+
ingestionState.ReportError,
67+
ingestionState.Clear,
68+
settings.TimeToRestartAuditIngestionAfterFailure,
69+
logger
70+
);
6571
}
6672

67-
public Task StartAsync(CancellationToken _) => watchdog.Start(() => applicationLifetime.StopApplication());
73+
public async Task StartAsync(CancellationToken cancellationToken)
74+
{
75+
stopSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
76+
ingestionWorker = Task.Run(() => Loop(stopSource.Token), stopSource.Token);
77+
await watchdog.Start(() => applicationLifetime.StopApplication());
78+
}
6879

6980
public async Task StopAsync(CancellationToken cancellationToken)
7081
{
82+
await stopSource.CancelAsync();
7183
await watchdog.Stop();
7284
channel.Writer.Complete();
7385
await ingestionWorker;
@@ -202,55 +214,60 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati
202214
await taskCompletionSource.Task;
203215
}
204216

205-
async Task Loop()
217+
async Task Loop(CancellationToken cancellationToken)
206218
{
207-
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value);
208-
209-
while (await channel.Reader.WaitToReadAsync())
219+
try
210220
{
211-
// will only enter here if there is something to read.
212-
try
221+
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value);
222+
223+
while (await channel.Reader.WaitToReadAsync(cancellationToken))
213224
{
214-
// as long as there is something to read this will fetch up to MaximumConcurrency items
215-
while (channel.Reader.TryRead(out var context))
225+
// will only enter here if there is something to read.
226+
try
216227
{
217-
contexts.Add(context);
228+
// as long as there is something to read this will fetch up to MaximumConcurrency items
229+
while (channel.Reader.TryRead(out var context))
230+
{
231+
contexts.Add(context);
232+
}
233+
234+
batchSizeMeter.Mark(contexts.Count);
235+
using (batchDurationMeter.Measure())
236+
{
237+
await auditIngestor.Ingest(contexts);
238+
}
218239
}
219-
220-
batchSizeMeter.Mark(contexts.Count);
221-
using (batchDurationMeter.Measure())
240+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
222241
{
223-
await auditIngestor.Ingest(contexts);
242+
throw; // Catch again in outer catch
224243
}
225-
}
226-
catch (OperationCanceledException)
227-
{
228-
//Do nothing as we are shutting down
229-
continue;
230-
}
231-
catch (Exception e) // show must go on
232-
{
233-
if (logger.IsInfoEnabled)
244+
catch (Exception e) // show must go on
234245
{
235246
logger.Info("Ingesting messages failed", e);
236-
}
237247

238-
// signal all message handling tasks to terminate
239-
foreach (var context in contexts)
248+
// signal all message handling tasks to terminate
249+
foreach (var context in contexts)
250+
{
251+
context.GetTaskCompletionSource().TrySetException(e);
252+
}
253+
}
254+
finally
240255
{
241-
context.GetTaskCompletionSource().TrySetException(e);
256+
contexts.Clear();
242257
}
243258
}
244-
finally
245-
{
246-
contexts.Clear();
247-
}
259+
// will fall out here when writer is completed
260+
}
261+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
262+
{
263+
// Cancellation
248264
}
249-
// will fall out here when writer is completed
250265
}
251266

252267
TransportInfrastructure transportInfrastructure;
253268
IMessageReceiver queueIngestor;
269+
Task ingestionWorker;
270+
CancellationTokenSource stopSource;
254271

255272
readonly SemaphoreSlim startStopSemaphore = new SemaphoreSlim(1);
256273
readonly string inputEndpoint;
@@ -265,7 +282,6 @@ async Task Loop()
265282
readonly Meter batchDurationMeter;
266283
readonly Counter receivedMeter;
267284
readonly Watchdog watchdog;
268-
readonly Task ingestionWorker;
269285
readonly IHostApplicationLifetime applicationLifetime;
270286

271287
static readonly ILog logger = LogManager.GetLogger<AuditIngestion>();

src/ServiceControl/Operations/ErrorIngestion.cs

Lines changed: 52 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,23 @@ public ErrorIngestion(
5959

6060
errorHandlingPolicy = new ErrorIngestionFaultPolicy(dataStore, settings.LoggingSettings, OnCriticalError);
6161

62-
watchdog = new Watchdog("failed message ingestion", EnsureStarted, EnsureStopped, ingestionState.ReportError, ingestionState.Clear, settings.TimeToRestartErrorIngestionAfterFailure, Logger);
63-
64-
ingestionWorker = Task.Run(() => Loop(), CancellationToken.None);
62+
watchdog = new Watchdog(
63+
"failed message ingestion",
64+
EnsureStarted,
65+
EnsureStopped,
66+
ingestionState.ReportError,
67+
ingestionState.Clear,
68+
settings.TimeToRestartErrorIngestionAfterFailure,
69+
Logger
70+
);
6571
}
6672

67-
public Task StartAsync(CancellationToken _) => watchdog.Start(() => applicationLifetime.StopApplication());
73+
public async Task StartAsync(CancellationToken cancellationToken)
74+
{
75+
stopSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
76+
ingestionWorker = Task.Run(() => Loop(stopSource.Token), stopSource.Token);
77+
await watchdog.Start(() => applicationLifetime.StopApplication());
78+
}
6879

6980
public async Task StopAsync(CancellationToken cancellationToken)
7081
{
@@ -77,51 +88,54 @@ public async Task StopAsync(CancellationToken cancellationToken)
7788
}
7889
}
7990

80-
async Task Loop()
91+
async Task Loop(CancellationToken cancellationToken)
8192
{
82-
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value);
83-
84-
while (await channel.Reader.WaitToReadAsync())
93+
try
8594
{
86-
// will only enter here if there is something to read.
87-
try
95+
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value);
96+
97+
while (await channel.Reader.WaitToReadAsync(cancellationToken))
8898
{
89-
// as long as there is something to read this will fetch up to MaximumConcurrency items
90-
while (channel.Reader.TryRead(out var context))
99+
// will only enter here if there is something to read.
100+
try
91101
{
92-
contexts.Add(context);
102+
// as long as there is something to read this will fetch up to MaximumConcurrency items
103+
while (channel.Reader.TryRead(out var context))
104+
{
105+
contexts.Add(context);
106+
}
107+
108+
batchSizeMeter.Mark(contexts.Count);
109+
using (batchDurationMeter.Measure())
110+
{
111+
await ingestor.Ingest(contexts);
112+
}
93113
}
94-
95-
batchSizeMeter.Mark(contexts.Count);
96-
using (batchDurationMeter.Measure())
114+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
97115
{
98-
await ingestor.Ingest(contexts);
116+
throw; // Catch again in outer catch
99117
}
100-
}
101-
catch (OperationCanceledException)
102-
{
103-
//Do nothing as we are shutting down
104-
continue;
105-
}
106-
catch (Exception e) // show must go on
107-
{
108-
if (Logger.IsInfoEnabled)
118+
catch (Exception e) // show must go on
109119
{
110120
Logger.Info("Ingesting messages failed", e);
111-
}
112121

113-
// signal all message handling tasks to terminate
114-
foreach (var context in contexts)
122+
// signal all message handling tasks to terminate
123+
foreach (var context in contexts)
124+
{
125+
context.GetTaskCompletionSource().TrySetException(e);
126+
}
127+
}
128+
finally
115129
{
116-
context.GetTaskCompletionSource().TrySetException(e);
130+
contexts.Clear();
117131
}
118132
}
119-
finally
120-
{
121-
contexts.Clear();
122-
}
133+
// will fall out here when writer is completed
134+
}
135+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
136+
{
137+
// Cancellation
123138
}
124-
// will fall out here when writer is completed
125139
}
126140

127141
async Task EnsureStarted(CancellationToken cancellationToken = default)
@@ -230,19 +244,21 @@ async Task EnsureStopped(CancellationToken cancellationToken = default)
230244
ErrorIngestionFaultPolicy errorHandlingPolicy;
231245
TransportInfrastructure transportInfrastructure;
232246
IMessageReceiver messageReceiver;
247+
Task ingestionWorker;
248+
CancellationTokenSource stopSource;
233249

234250
readonly Settings settings;
235251
readonly ITransportCustomization transportCustomization;
236252
readonly TransportSettings transportSettings;
237253
readonly Watchdog watchdog;
238254
readonly Channel<MessageContext> channel;
239-
readonly Task ingestionWorker;
240255
readonly Meter batchDurationMeter;
241256
readonly Meter batchSizeMeter;
242257
readonly Counter receivedMeter;
243258
readonly ErrorIngestor ingestor;
244259
readonly IIngestionUnitOfWorkFactory unitOfWorkFactory;
245260
readonly IHostApplicationLifetime applicationLifetime;
261+
246262
static readonly ILog Logger = LogManager.GetLogger<ErrorIngestion>();
247263
}
248264
}

0 commit comments

Comments
 (0)