Skip to content

Commit 04248fe

Browse files
committed
Merge branch 'fix-watchdog-stop' into spike-ravendb-termination
2 parents 95522fc + aa3557a commit 04248fe

File tree

5 files changed

+126
-82
lines changed

5 files changed

+126
-82
lines changed

src/ServiceControl.Audit.AcceptanceTests.RavenDB/Auditing/When_critical_storage_threshold_reached.cs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Linq;
55
using System.Threading.Tasks;
66
using AcceptanceTesting.EndpointTemplates;
7+
using Audit.Auditing;
78
using Microsoft.Extensions.DependencyInjection;
89
using NServiceBus;
910
using NServiceBus.AcceptanceTesting;
@@ -35,8 +36,7 @@ await Define<ScenarioContext>()
3536
.When(context =>
3637
{
3738
return context.Logs.ToArray().Any(i =>
38-
i.Message.StartsWith(
39-
"Ensure started. Infrastructure started"));
39+
i.Message.StartsWith(AuditIngestion.LogMessages.StartedInfrastructure));
4040
}, (_, __) =>
4141
{
4242
var databaseConfiguration = ServiceProvider.GetRequiredService<DatabaseConfiguration>();
@@ -47,8 +47,7 @@ await Define<ScenarioContext>()
4747
.When(context =>
4848
{
4949
return context.Logs.ToArray().Any(i =>
50-
i.Message.StartsWith(
51-
"Shutting down due to failed persistence health check. Infrastructure shut down completed"));
50+
i.Message.StartsWith(AuditIngestion.LogMessages.StoppedInfrastructure));
5251
}, (bus, c) => bus.SendLocal(new MyMessage()))
5352
)
5453
.Done(async c => await this.TryGetSingle<MessagesView>(
@@ -72,7 +71,7 @@ await Define<ScenarioContext>()
7271
{
7372
return context.Logs.ToArray().Any(i =>
7473
i.Message.StartsWith(
75-
"Ensure started. Infrastructure started"));
74+
AuditIngestion.LogMessages.StartedInfrastructure));
7675
}, (session, context) =>
7776
{
7877
var databaseConfiguration = ServiceProvider.GetRequiredService<DatabaseConfiguration>();
@@ -83,8 +82,7 @@ await Define<ScenarioContext>()
8382
.When(context =>
8483
{
8584
ingestionShutdown = context.Logs.ToArray().Any(i =>
86-
i.Message.StartsWith(
87-
"Shutting down due to failed persistence health check. Infrastructure shut down completed"));
85+
i.Message.StartsWith(AuditIngestion.LogMessages.StoppedInfrastructure));
8886

8987
return ingestionShutdown;
9088
},

src/ServiceControl.Audit/Auditing/AuditIngestion.cs

Lines changed: 77 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -76,67 +76,104 @@ async Task EnsureStarted(CancellationToken cancellationToken = default)
7676
await startStopSemaphore.WaitAsync(cancellationToken);
7777
logger.Debug("Ensure started. Start/stop semaphore acquired");
7878

79-
if (!unitOfWorkFactory.CanIngestMore())
80-
{
81-
if (queueIngestor != null)
82-
{
83-
var stoppable = queueIngestor;
84-
queueIngestor = null;
85-
logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down commencing");
86-
await stoppable.StopReceive(cancellationToken);
87-
logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down completed");
88-
}
79+
var canIngest = unitOfWorkFactory.CanIngestMore();
8980

90-
return;
91-
}
81+
logger.DebugFormat("Ensure started {0}", canIngest);
9282

93-
if (queueIngestor != null)
83+
if (canIngest)
84+
{
85+
await SetUpAndStartInfrastructure(cancellationToken);
86+
}
87+
else
9488
{
95-
logger.Debug("Ensure started. Already started, skipping start up");
96-
return; //Already started
89+
await StopAndTeardownInfrastructure(cancellationToken);
90+
}
91+
}
92+
catch (Exception e)
93+
{
94+
try
95+
{
96+
await StopAndTeardownInfrastructure(cancellationToken);
97+
}
98+
catch (Exception teardownException)
99+
{
100+
throw new AggregateException(e, teardownException);
97101
}
98102

99-
logger.Info("Ensure started. Infrastructure starting");
103+
throw;
104+
}
105+
finally
106+
{
107+
logger.Debug("Ensure started. Start/stop semaphore releasing");
108+
startStopSemaphore.Release();
109+
logger.Debug("Ensure started. Start/stop semaphore released");
110+
}
111+
}
100112

113+
async Task SetUpAndStartInfrastructure(CancellationToken cancellationToken)
114+
{
115+
if (queueIngestor != null)
116+
{
117+
logger.Debug("Infrastructure already Started");
118+
return;
119+
}
120+
121+
try
122+
{
123+
logger.Info("Starting infrastructure");
101124
transportInfrastructure = await transportCustomization.CreateTransportInfrastructure(
102125
inputEndpoint,
103126
transportSettings,
104127
OnMessage,
105128
errorHandlingPolicy.OnError,
106129
OnCriticalError,
107-
TransportTransactionMode.ReceiveOnly);
130+
TransportTransactionMode.ReceiveOnly
131+
);
108132

109133
queueIngestor = transportInfrastructure.Receivers[inputEndpoint];
110134

111135
await auditIngestor.VerifyCanReachForwardingAddress();
112-
113136
await queueIngestor.StartReceive(cancellationToken);
114137

115-
logger.Info("Ensure started. Infrastructure started");
138+
logger.Info(LogMessages.StartedInfrastructure);
139+
}
140+
catch (Exception e)
141+
{
142+
logger.Error("Failed to start infrastructure", e);
143+
throw;
144+
}
145+
}
146+
147+
async Task StopAndTeardownInfrastructure(CancellationToken cancellationToken)
148+
{
149+
if (transportInfrastructure == null)
150+
{
151+
logger.Debug("Infrastructure already Stopped");
152+
return;
116153
}
117-
catch
154+
155+
try
118156
{
119-
if (queueIngestor != null)
157+
logger.Info("Stopping infrastructure");
158+
try
120159
{
121-
try
160+
if (queueIngestor != null)
122161
{
123162
await queueIngestor.StopReceive(cancellationToken);
124163
}
125-
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
126-
{
127-
logger.Info("StopReceive cancelled");
128-
}
164+
}
165+
finally
166+
{
167+
await transportInfrastructure.Shutdown(cancellationToken);
129168
}
130169

131-
queueIngestor = null; // Setting to null so that it doesn't exit when it retries in line 185
132-
133-
throw;
170+
queueIngestor = null;
171+
logger.Info(LogMessages.StoppedInfrastructure);
134172
}
135-
finally
173+
catch (Exception e)
136174
{
137-
logger.Debug("Ensure started. Start/stop semaphore releasing");
138-
startStopSemaphore.Release();
139-
logger.Debug("Ensure started. Start/stop semaphore released");
175+
logger.Error("Failed to stop infrastructure", e);
176+
throw;
140177
}
141178
}
142179

@@ -260,7 +297,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)
260297
{
261298
try
262299
{
263-
await watchdog.Stop();
300+
await watchdog.Stop(cancellationToken);
264301
channel.Writer.Complete();
265302
await base.StopAsync(cancellationToken);
266303
}
@@ -301,5 +338,11 @@ public override async Task StopAsync(CancellationToken cancellationToken)
301338
readonly IHostApplicationLifetime applicationLifetime;
302339

303340
static readonly ILog logger = LogManager.GetLogger<AuditIngestion>();
341+
342+
internal static class LogMessages
343+
{
344+
internal const string StartedInfrastructure = "Started infrastructure";
345+
internal const string StoppedInfrastructure = "Stopped infrastructure";
346+
}
304347
}
305348
}

src/ServiceControl.Infrastructure.Tests/WatchdogTests.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@ public async Task It_shuts_down_gracefully()
3131

3232
await started.Task;
3333

34-
await dog.Stop();
34+
await dog.Stop(TestContext.CurrentContext.CancellationToken);
3535

3636
await stopped.Task;
3737
}
3838

39-
[Test]
39+
[Test, Ignore("When ensurestopped throws an exception, stop should also throw an exception as that is an ungraceful stop")]
4040
public async Task When_stop_fails_it_reports_the_failure()
4141
{
4242
string lastFailure = null;
@@ -52,7 +52,7 @@ public async Task When_stop_fails_it_reports_the_failure()
5252

5353
await started.Task;
5454

55-
await dog.Stop();
55+
await dog.Stop(TestContext.CurrentContext.CancellationToken);
5656

5757
Assert.That(lastFailure, Is.EqualTo("Simulated"));
5858
}
@@ -91,7 +91,7 @@ public async Task On_failure_triggers_stopping()
9191

9292
await restarted.Task;
9393

94-
await dog.Stop();
94+
await dog.Stop(TestContext.CurrentContext.CancellationToken);
9595
}
9696

9797
[Test]
@@ -129,7 +129,7 @@ public async Task When_first_start_attempt_works_it_recovers_from_further_errors
129129

130130
await recoveredFromError.Task;
131131

132-
await dog.Stop();
132+
await dog.Stop(TestContext.CurrentContext.CancellationToken);
133133

134134
//Make sure failure is cleared
135135
Assert.That(lastFailure, Is.Null);

src/ServiceControl.Infrastructure/Watchdog.cs

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,19 @@ public class Watchdog
1212
Action<string> reportFailure;
1313
Action clearFailure;
1414
Task watchdog;
15-
CancellationTokenSource shutdownTokenSource = new CancellationTokenSource();
15+
CancellationTokenSource shutdownTokenSource = new();
1616
TimeSpan timeToWaitBetweenStartupAttempts;
1717
ILog log;
1818
string taskName;
1919

20-
public Watchdog(string taskName, Func<CancellationToken, Task> ensureStarted,
21-
Func<CancellationToken, Task> ensureStopped, Action<string> reportFailure, Action clearFailure,
22-
TimeSpan timeToWaitBetweenStartupAttempts, ILog log)
20+
public Watchdog(
21+
string taskName,
22+
Func<CancellationToken, Task> ensureStarted,
23+
Func<CancellationToken, Task> ensureStopped, Action<string> reportFailure,
24+
Action clearFailure,
25+
TimeSpan timeToWaitBetweenStartupAttempts,
26+
ILog log
27+
)
2328
{
2429
this.taskName = taskName;
2530
this.ensureStopped = ensureStopped;
@@ -42,41 +47,37 @@ public Task Start(Action onFailedOnStartup)
4247
{
4348
log.Debug($"Starting watching {taskName}");
4449

45-
bool? failedOnStartup = null;
50+
bool startup = true;
4651

4752
while (!shutdownTokenSource.IsCancellationRequested)
4853
{
4954
try
5055
{
56+
using var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(shutdownTokenSource.Token);
57+
cancellationTokenSource.CancelAfter(15000);
58+
5159
log.Debug($"Ensuring {taskName} is running");
52-
await ensureStarted(shutdownTokenSource.Token).ConfigureAwait(false);
60+
await ensureStarted(cancellationTokenSource.Token).ConfigureAwait(false);
5361
clearFailure();
54-
55-
failedOnStartup ??= false;
62+
startup = false;
5663
}
57-
catch (OperationCanceledException e) when (!shutdownTokenSource.IsCancellationRequested)
64+
catch (OperationCanceledException e) when (shutdownTokenSource.IsCancellationRequested)
5865
{
59-
// Continue, as OCE is not from caller
60-
log.Info("Start cancelled, retrying...", e);
61-
continue;
66+
log.Debug("Cancelled", e);
67+
return;
6268
}
6369
catch (Exception e)
6470
{
6571
reportFailure(e.Message);
6672

67-
if (failedOnStartup == null)
73+
if (startup)
6874
{
69-
failedOnStartup = true;
70-
7175
log.Error($"Error during initial startup attempt for {taskName}.", e);
72-
73-
//there was an error during startup hence we want to shut down the instance
7476
onFailedOnStartup();
77+
return;
7578
}
76-
else
77-
{
78-
log.Error($"Error while trying to start {taskName}. Starting will be retried in {timeToWaitBetweenStartupAttempts}.", e);
79-
}
79+
80+
log.Error($"Error while trying to start {taskName}. Starting will be retried in {timeToWaitBetweenStartupAttempts}.", e);
8081
}
8182
try
8283
{
@@ -87,25 +88,27 @@ public Task Start(Action onFailedOnStartup)
8788
//Ignore, no need to log cancellation of delay
8889
}
8990
}
90-
try
91-
{
92-
log.Debug($"Stopping watching process {taskName}");
93-
//We don't pass the shutdown token here because it has already been cancelled and we want to ensure we stop the ingestion.
94-
await ensureStopped(CancellationToken.None).ConfigureAwait(false);
95-
}
96-
catch (Exception e)
97-
{
98-
log.Error($"Error while trying to stop {taskName}.", e);
99-
reportFailure(e.Message);
100-
}
10191
});
10292
return Task.CompletedTask;
10393
}
10494

105-
public Task Stop()
95+
public async Task Stop(CancellationToken cancellationToken)
10696
{
107-
shutdownTokenSource.Cancel();
108-
return watchdog;
97+
try
98+
{
99+
log.Debug($"Stopping watching process {taskName}");
100+
await shutdownTokenSource.CancelAsync().ConfigureAwait(false);
101+
await watchdog.ConfigureAwait(false);
102+
}
103+
catch (Exception e)
104+
{
105+
log.Error($"Error while trying to stop {taskName}.", e);
106+
throw;
107+
}
108+
finally
109+
{
110+
await ensureStopped(cancellationToken).ConfigureAwait(false);
111+
}
109112
}
110113
}
111114
}

src/ServiceControl/Operations/ErrorIngestion.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)
132132
{
133133
try
134134
{
135-
await watchdog.Stop();
135+
await watchdog.Stop(cancellationToken);
136136
channel.Writer.Complete();
137137
await base.StopAsync(cancellationToken);
138138
}

0 commit comments

Comments
 (0)