Skip to content

Commit 3fbb3e8

Browse files
committed
Refactored to use BackgroundService
1 parent f3edfd2 commit 3fbb3e8

File tree

2 files changed

+36
-50
lines changed

2 files changed

+36
-50
lines changed

src/ServiceControl.Audit/Auditing/AuditIngestion.cs

Lines changed: 20 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
using ServiceControl.Infrastructure.Metrics;
1818
using Transports;
1919

20-
class AuditIngestion : IHostedService
20+
class AuditIngestion : BackgroundService
2121
{
2222
static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000;
2323

@@ -70,26 +70,6 @@ public AuditIngestion(
7070
);
7171
}
7272

73-
public async Task StartAsync(CancellationToken cancellationToken)
74-
{
75-
stopSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
76-
ingestionWorker = Task.Run(() => Loop(stopSource.Token), stopSource.Token);
77-
await watchdog.Start(() => applicationLifetime.StopApplication());
78-
}
79-
80-
public async Task StopAsync(CancellationToken cancellationToken)
81-
{
82-
await stopSource.CancelAsync();
83-
await watchdog.Stop();
84-
channel.Writer.Complete();
85-
await ingestionWorker;
86-
87-
if (transportInfrastructure != null)
88-
{
89-
await transportInfrastructure.Shutdown(cancellationToken);
90-
}
91-
}
92-
9373
Task OnCriticalError(string failure, Exception exception)
9474
{
9575
logger.Fatal($"OnCriticalError. '{failure}'", exception);
@@ -214,13 +194,16 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati
214194
await taskCompletionSource.Task;
215195
}
216196

217-
async Task Loop(CancellationToken cancellationToken)
197+
198+
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
218199
{
200+
await watchdog.Start(() => applicationLifetime.StopApplication());
201+
219202
try
220203
{
221204
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value);
222205

223-
while (await channel.Reader.WaitToReadAsync(cancellationToken))
206+
while (await channel.Reader.WaitToReadAsync(stoppingToken))
224207
{
225208
// will only enter here if there is something to read.
226209
try
@@ -237,7 +220,7 @@ async Task Loop(CancellationToken cancellationToken)
237220
await auditIngestor.Ingest(contexts);
238221
}
239222
}
240-
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
223+
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
241224
{
242225
throw; // Catch again in outer catch
243226
}
@@ -258,16 +241,26 @@ async Task Loop(CancellationToken cancellationToken)
258241
}
259242
// will fall out here when writer is completed
260243
}
261-
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
244+
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
262245
{
263246
// Cancellation
264247
}
248+
finally
249+
{
250+
await watchdog.Stop();
251+
channel.Writer.Complete();
252+
253+
if (transportInfrastructure != null)
254+
{
255+
// stoppingToken is cancelled, invoke so transport infrastructure will run teardown
256+
// No need to await, as this will throw OperationCancelledException
257+
_ = transportInfrastructure.Shutdown(stoppingToken);
258+
}
259+
}
265260
}
266261

267262
TransportInfrastructure transportInfrastructure;
268263
IMessageReceiver queueIngestor;
269-
Task ingestionWorker;
270-
CancellationTokenSource stopSource;
271264

272265
readonly SemaphoreSlim startStopSemaphore = new SemaphoreSlim(1);
273266
readonly string inputEndpoint;

src/ServiceControl/Operations/ErrorIngestion.cs

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
using ServiceBus.Management.Infrastructure.Settings;
1818
using Transports;
1919

20-
class ErrorIngestion : IHostedService
20+
class ErrorIngestion : BackgroundService
2121
{
2222
static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000;
2323

@@ -70,31 +70,15 @@ public ErrorIngestion(
7070
);
7171
}
7272

73-
public async Task StartAsync(CancellationToken cancellationToken)
73+
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
7474
{
75-
stopSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
76-
ingestionWorker = Task.Run(() => Loop(stopSource.Token), stopSource.Token);
7775
await watchdog.Start(() => applicationLifetime.StopApplication());
78-
}
79-
80-
public async Task StopAsync(CancellationToken cancellationToken)
81-
{
82-
await watchdog.Stop();
83-
channel.Writer.Complete();
84-
await ingestionWorker;
85-
if (transportInfrastructure != null)
86-
{
87-
await transportInfrastructure.Shutdown(cancellationToken);
88-
}
89-
}
9076

91-
async Task Loop(CancellationToken cancellationToken)
92-
{
9377
try
9478
{
9579
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value);
9680

97-
while (await channel.Reader.WaitToReadAsync(cancellationToken))
81+
while (await channel.Reader.WaitToReadAsync(stoppingToken))
9882
{
9983
// will only enter here if there is something to read.
10084
try
@@ -111,7 +95,7 @@ async Task Loop(CancellationToken cancellationToken)
11195
await ingestor.Ingest(contexts);
11296
}
11397
}
114-
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
98+
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
11599
{
116100
throw; // Catch again in outer catch
117101
}
@@ -132,10 +116,21 @@ async Task Loop(CancellationToken cancellationToken)
132116
}
133117
// will fall out here when writer is completed
134118
}
135-
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
119+
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
136120
{
137121
// Cancellation
138122
}
123+
finally
124+
{
125+
await watchdog.Stop();
126+
channel.Writer.Complete();
127+
if (transportInfrastructure != null)
128+
{
129+
// stoppingToken is cancelled, invoke so transport infrastructure will run teardown
130+
// No need to await, as this will throw OperationCancelledException
131+
_ = transportInfrastructure.Shutdown(stoppingToken);
132+
}
133+
}
139134
}
140135

141136
async Task EnsureStarted(CancellationToken cancellationToken = default)
@@ -244,8 +239,6 @@ async Task EnsureStopped(CancellationToken cancellationToken = default)
244239
ErrorIngestionFaultPolicy errorHandlingPolicy;
245240
TransportInfrastructure transportInfrastructure;
246241
IMessageReceiver messageReceiver;
247-
Task ingestionWorker;
248-
CancellationTokenSource stopSource;
249242

250243
readonly Settings settings;
251244
readonly ITransportCustomization transportCustomization;

0 commit comments

Comments
 (0)