Skip to content

Commit 92c6864

Browse files
authored
Batch ingestion not correctly handling OperationCancelledException which can cause the ingestion to never used incoming context tasks and hang. (#4780)
- Rectored to use BackgroundService - Fix `catch (OperationCancelledException)` blocks to add `when` guards - only ignore cancellations set by caller - Overriding Start and Stop to support graceful shutdown and improve intent and readability - Ensure TrySetException is always set on exception - Logging improvements around cancellation to inform that tasks got cancelled. Important when shutting down to diagnose where teardown "hangs".
1 parent 1d34d8c commit 92c6864

File tree

30 files changed

+211
-146
lines changed

30 files changed

+211
-146
lines changed

src/Particular.LicensingComponent/AuditThroughput/AuditThroughputCollectorHostedService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken)
4040
}
4141
} while (await timer.WaitForNextTickAsync(cancellationToken));
4242
}
43-
catch (OperationCanceledException)
43+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
4444
{
4545
logger.LogInformation($"Stopping {nameof(AuditThroughputCollectorHostedService)}");
4646
}

src/Particular.LicensingComponent/BrokerThroughput/BrokerThroughputCollectorHostedService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ static ReadOnlyDictionary<string, string> LoadBrokerSettingValues(IEnumerable<Ke
5353
}
5454
} while (await timer.WaitForNextTickAsync(stoppingToken));
5555
}
56-
catch (OperationCanceledException)
56+
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
5757
{
5858
logger.LogInformation($"Stopping {nameof(BrokerThroughputCollectorHostedService)}");
5959
}

src/ServiceControl.AcceptanceTesting/ScenarioWithEndpointBehaviorExtensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,9 @@ public override async Task Stop(CancellationToken cancellationToken = default)
139139
{
140140
await checkTask;
141141
}
142-
catch (OperationCanceledException)
142+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
143143
{
144-
//Swallow
144+
// Even though we are stopping, ONLY swallow when OCE from callee to not hide any ungraceful stop errors
145145
}
146146
finally
147147
{

src/ServiceControl.Audit/Auditing/AuditIngestion.cs

Lines changed: 80 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
using ServiceControl.Infrastructure;
1717
using Transports;
1818

19-
class AuditIngestion : IHostedService
19+
class AuditIngestion : BackgroundService
2020
{
2121
public AuditIngestion(
2222
Settings settings,
@@ -51,23 +51,15 @@ public AuditIngestion(
5151

5252
errorHandlingPolicy = new AuditIngestionFaultPolicy(failedImportsStorage, settings.LoggingSettings, OnCriticalError);
5353

54-
watchdog = new Watchdog("audit message ingestion", EnsureStarted, EnsureStopped, ingestionState.ReportError, ingestionState.Clear, settings.TimeToRestartAuditIngestionAfterFailure, logger);
55-
56-
ingestionWorker = Task.Run(() => Loop(), CancellationToken.None);
57-
}
58-
59-
public Task StartAsync(CancellationToken _) => watchdog.Start(() => applicationLifetime.StopApplication());
60-
61-
public async Task StopAsync(CancellationToken cancellationToken)
62-
{
63-
await watchdog.Stop();
64-
channel.Writer.Complete();
65-
await ingestionWorker;
66-
67-
if (transportInfrastructure != null)
68-
{
69-
await transportInfrastructure.Shutdown(cancellationToken);
70-
}
54+
watchdog = new Watchdog(
55+
"audit message ingestion",
56+
EnsureStarted,
57+
EnsureStopped,
58+
ingestionState.ReportError,
59+
ingestionState.Clear,
60+
settings.TimeToRestartAuditIngestionAfterFailure,
61+
logger
62+
);
7163
}
7264

7365
Task OnCriticalError(string failure, Exception exception)
@@ -132,7 +124,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default)
132124
}
133125
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
134126
{
135-
// ignored
127+
logger.Info("StopReceive cancelled");
136128
}
137129
}
138130

@@ -170,7 +162,7 @@ async Task EnsureStopped(CancellationToken cancellationToken = default)
170162
}
171163
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
172164
{
173-
// ignored
165+
logger.Info("StopReceive cancelled");
174166
}
175167
finally
176168
{
@@ -200,57 +192,92 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati
200192
}
201193
}
202194

203-
async Task Loop()
195+
public override async Task StartAsync(CancellationToken cancellationToken)
204196
{
205-
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value);
197+
await watchdog.Start(() => applicationLifetime.StopApplication());
198+
await base.StartAsync(cancellationToken);
199+
}
206200

207-
while (await channel.Reader.WaitToReadAsync())
201+
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
202+
{
203+
try
208204
{
209-
// will only enter here if there is something to read.
210-
try
205+
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value);
206+
207+
while (await channel.Reader.WaitToReadAsync(stoppingToken))
211208
{
212-
// as long as there is something to read this will fetch up to MaximumConcurrency items
213-
while (channel.Reader.TryRead(out var context))
209+
// will only enter here if there is something to read.
210+
try
214211
{
215-
contexts.Add(context);
212+
// as long as there is something to read this will fetch up to MaximumConcurrency items
213+
while (channel.Reader.TryRead(out var context))
214+
{
215+
contexts.Add(context);
216+
}
217+
218+
auditBatchSize.Record(contexts.Count);
219+
220+
using (new DurationRecorder(auditBatchDuration))
221+
{
222+
await auditIngestor.Ingest(contexts);
223+
}
224+
225+
consecutiveBatchFailuresCounter.Record(0);
216226
}
227+
catch (Exception e)
228+
{
229+
// signal all message handling tasks to terminate
230+
foreach (var context in contexts)
231+
{
232+
_ = context.GetTaskCompletionSource().TrySetException(e);
233+
}
234+
235+
if (e is OperationCanceledException && stoppingToken.IsCancellationRequested)
236+
{
237+
logger.Info("Batch cancelled", e);
238+
break;
239+
}
217240

218-
auditBatchSize.Record(contexts.Count);
241+
logger.Info("Ingesting messages failed", e);
219242

220-
using (new DurationRecorder(auditBatchDuration))
243+
// no need to do interlocked increment since this is running sequential
244+
consecutiveBatchFailuresCounter.Record(consecutiveBatchFailures++);
245+
}
246+
finally
221247
{
222-
await auditIngestor.Ingest(contexts);
248+
contexts.Clear();
223249
}
224-
225-
consecutiveBatchFailuresCounter.Record(0);
226250
}
227-
catch (OperationCanceledException)
228-
{
229-
//Do nothing as we are shutting down
230-
continue;
231-
}
232-
catch (Exception e) // show must go on
251+
// will fall out here when writer is completed
252+
}
253+
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
254+
{
255+
// ExecuteAsync cancelled
256+
}
257+
}
258+
259+
public override async Task StopAsync(CancellationToken cancellationToken)
260+
{
261+
try
262+
{
263+
await watchdog.Stop();
264+
channel.Writer.Complete();
265+
await base.StopAsync(cancellationToken);
266+
}
267+
finally
268+
{
269+
if (transportInfrastructure != null)
233270
{
234-
if (logger.IsInfoEnabled)
271+
try
235272
{
236-
logger.Info("Ingesting messages failed", e);
273+
await transportInfrastructure.Shutdown(cancellationToken);
237274
}
238-
239-
// signal all message handling tasks to terminate
240-
foreach (var context in contexts)
275+
catch (OperationCanceledException e) when (cancellationToken.IsCancellationRequested)
241276
{
242-
context.GetTaskCompletionSource().TrySetException(e);
277+
logger.Info("Shutdown cancelled", e);
243278
}
244-
245-
// no need to do interlocked increment since this is running sequential
246-
consecutiveBatchFailuresCounter.Record(consecutiveBatchFailures++);
247-
}
248-
finally
249-
{
250-
contexts.Clear();
251279
}
252280
}
253-
// will fall out here when writer is completed
254281
}
255282

256283
TransportInfrastructure transportInfrastructure;
@@ -273,7 +300,6 @@ async Task Loop()
273300
readonly Histogram<long> consecutiveBatchFailuresCounter = Telemetry.Meter.CreateHistogram<long>(Telemetry.CreateInstrumentName("ingestion", "consecutive_batch_failures"), unit: "count", description: "Consecutive audit ingestion batch failure");
274301
readonly Histogram<double> ingestionDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "duration"), unit: "ms", description: "Average incoming audit message processing duration");
275302
readonly Watchdog watchdog;
276-
readonly Task ingestionWorker;
277303
readonly IHostApplicationLifetime applicationLifetime;
278304

279305
static readonly ILog logger = LogManager.GetLogger<AuditIngestion>();

src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ await failedAuditStore.ProcessFailedMessages(
4848
Logger.Debug($"Successfully re-imported failed audit message {transportMessage.Id}.");
4949
}
5050
}
51-
catch (OperationCanceledException)
51+
catch (OperationCanceledException e) when (token.IsCancellationRequested)
5252
{
53-
// no-op
53+
Logger.Info("Cancelled", e);
5454
}
5555
catch (Exception e)
5656
{

src/ServiceControl.Audit/Infrastructure/Hosting/Commands/ImportFailedAuditsCommand.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@
77
using Microsoft.Extensions.DependencyInjection;
88
using Microsoft.Extensions.Hosting;
99
using NServiceBus;
10+
using NServiceBus.Logging;
1011
using Settings;
1112

1213
class ImportFailedAuditsCommand : AbstractCommand
1314
{
15+
readonly ILog logger = LogManager.GetLogger<ImportFailedAuditsCommand>();
16+
1417
public override async Task Execute(HostArguments args, Settings settings)
1518
{
1619
settings.IngestAuditMessages = false;
@@ -37,9 +40,9 @@ public override async Task Execute(HostArguments args, Settings settings)
3740
{
3841
await importer.Run(tokenSource.Token);
3942
}
40-
catch (OperationCanceledException)
43+
catch (OperationCanceledException e) when (tokenSource.IsCancellationRequested)
4144
{
42-
// no op
45+
logger.Info("Cancelled", e);
4346
}
4447
finally
4548
{

src/ServiceControl.Infrastructure.Metrics/MetricsReporter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public void Start()
3232
await Task.Delay(interval, tokenSource.Token).ConfigureAwait(false);
3333
}
3434
}
35-
catch (OperationCanceledException)
35+
catch (OperationCanceledException) when (tokenSource.IsCancellationRequested)
3636
{
3737
//no-op
3838
}

src/ServiceControl.Infrastructure/AsyncTimer.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,17 @@ public TimerJob(Func<CancellationToken, Task<TimerJobExecutionResult>> callback,
4040

4141
//Otherwise execute immediately
4242
}
43-
catch (OperationCanceledException)
43+
catch (OperationCanceledException) when (token.IsCancellationRequested)
4444
{
45-
// no-op
45+
break;
4646
}
4747
catch (Exception ex)
4848
{
4949
errorCallback(ex);
5050
}
5151
}
5252
}
53-
catch (OperationCanceledException)
53+
catch (OperationCanceledException) when (token.IsCancellationRequested)
5454
{
5555
// no-op
5656
}
@@ -64,7 +64,7 @@ public async Task Stop()
6464
return;
6565
}
6666

67-
tokenSource.Cancel();
67+
await tokenSource.CancelAsync().ConfigureAwait(false);
6868
tokenSource.Dispose();
6969

7070
if (task != null)
@@ -73,7 +73,7 @@ public async Task Stop()
7373
{
7474
await task.ConfigureAwait(false);
7575
}
76-
catch (OperationCanceledException)
76+
catch (OperationCanceledException) when (tokenSource.IsCancellationRequested)
7777
{
7878
//NOOP
7979
}

src/ServiceControl.Infrastructure/ReadOnlyStream.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public override Task CopyToAsync(Stream destination, int bufferSize, Cancellatio
5151

5252
return Task.CompletedTask;
5353
}
54-
catch (OperationCanceledException e)
54+
catch (OperationCanceledException e) when (cancellationToken.IsCancellationRequested)
5555
{
5656
return Task.FromCanceled(e.CancellationToken);
5757
}
@@ -113,7 +113,7 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
113113

114114
return Task.FromResult(result);
115115
}
116-
catch (OperationCanceledException e)
116+
catch (OperationCanceledException e) when (cancellationToken.IsCancellationRequested)
117117
{
118118
return Task.FromCanceled<int>(e.CancellationToken);
119119
}
@@ -136,7 +136,7 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken
136136

137137
return new ValueTask<int>(result);
138138
}
139-
catch (OperationCanceledException e)
139+
catch (OperationCanceledException e) when (cancellationToken.IsCancellationRequested)
140140
{
141141
return new ValueTask<int>(Task.FromCanceled<int>(e.CancellationToken));
142142
}

src/ServiceControl.Infrastructure/Watchdog.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,10 @@ public Task Start(Action onFailedOnStartup)
5454

5555
failedOnStartup ??= false;
5656
}
57-
catch (OperationCanceledException)
57+
catch (OperationCanceledException e) when (!shutdownTokenSource.IsCancellationRequested)
5858
{
59-
//Do not Delay
59+
// Continue, as OCE is not from caller
60+
log.Info("Start cancelled, retrying...", e);
6061
continue;
6162
}
6263
catch (Exception e)
@@ -81,9 +82,9 @@ public Task Start(Action onFailedOnStartup)
8182
{
8283
await Task.Delay(timeToWaitBetweenStartupAttempts, shutdownTokenSource.Token).ConfigureAwait(false);
8384
}
84-
catch (OperationCanceledException)
85+
catch (OperationCanceledException) when (shutdownTokenSource.IsCancellationRequested)
8586
{
86-
//Ignore
87+
//Ignore, no need to log cancellation of delay
8788
}
8889
}
8990
try

0 commit comments

Comments
 (0)