Skip to content

Commit fca1ba5

Browse files
Fix watchdog re-creation of TransportInfrastructure and cancellation improvements (#4818)
* Watchdog stop passed CancellationToken.None which could block stop till infinity * Ignored watchdog test that seems wrong * Fix leaking transportInfrastructure instance * Thanks who ever wrote a test with dependency on log output message without making this dependency visible :yakshaving: * Improve readability * Add cancel on start to the wathdog for completeness * Remove uneeded defaults * Better logging * Revert "Better logging" This reverts commit 5be3091. * Why 15 seconds and why even the CTS * Refactored tests to ensure stop throws when is stops ungracefully * - Applied changes from audit to error - Removed duplication in EnsureStopped - Removed log events around semaphore handling * Fix failing test * fixup! Fix failing test --------- Co-authored-by: Andreas Öhlund <[email protected]>
1 parent 546b249 commit fca1ba5

File tree

6 files changed

+243
-155
lines changed

6 files changed

+243
-155
lines changed

src/ServiceControl.AcceptanceTests/Monitoring/CustomChecks/When_critical_storage_threshold_reached.cs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
using NServiceBus;
1010
using NServiceBus.AcceptanceTesting;
1111
using NUnit.Framework;
12+
using Operations;
1213
using ServiceBus.Management.Infrastructure.Settings;
1314

1415
[TestFixture]
@@ -33,7 +34,7 @@ await Define<ScenarioContext>()
3334
.WithEndpoint<Sender>(b => b
3435
.When(context =>
3536
{
36-
return context.Logs.ToArray().Any(i => i.Message.StartsWith("Ensure started. Infrastructure started"));
37+
return context.Logs.ToArray().Any(i => i.Message.StartsWith(ErrorIngestion.LogMessages.StartedInfrastructure));
3738
}, (_, __) =>
3839
{
3940
PersisterSettings.MinimumStorageLeftRequiredForIngestion = 100;
@@ -43,8 +44,7 @@ await Define<ScenarioContext>()
4344
.When(context =>
4445
{
4546
return context.Logs.ToArray().Any(i =>
46-
i.Message.StartsWith(
47-
"Shutting down due to failed persistence health check. Infrastructure shut down completed"));
47+
i.Message.StartsWith(ErrorIngestion.LogMessages.StoppedInfrastructure));
4848
}, (bus, c) => bus.SendLocal(new MyMessage())
4949
)
5050
.DoNotFailOnErrorMessages())
@@ -62,8 +62,7 @@ await Define<ScenarioContext>()
6262
.When(context =>
6363
{
6464
return context.Logs.ToArray().Any(i =>
65-
i.Message.StartsWith(
66-
"Ensure started. Infrastructure started"));
65+
i.Message.StartsWith(ErrorIngestion.LogMessages.StartedInfrastructure));
6766
}, (session, context) =>
6867
{
6968
PersisterSettings.MinimumStorageLeftRequiredForIngestion = 100;
@@ -73,8 +72,7 @@ await Define<ScenarioContext>()
7372
.When(context =>
7473
{
7574
ingestionShutdown = context.Logs.ToArray().Any(i =>
76-
i.Message.StartsWith(
77-
"Shutting down due to failed persistence health check. Infrastructure shut down completed"));
75+
i.Message.StartsWith(ErrorIngestion.LogMessages.StoppedInfrastructure));
7876

7977
return ingestionShutdown;
8078
},

src/ServiceControl.Audit.AcceptanceTests.RavenDB/Auditing/When_critical_storage_threshold_reached.cs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Linq;
55
using System.Threading.Tasks;
66
using AcceptanceTesting.EndpointTemplates;
7+
using Audit.Auditing;
78
using Microsoft.Extensions.DependencyInjection;
89
using NServiceBus;
910
using NServiceBus.AcceptanceTesting;
@@ -35,8 +36,7 @@ await Define<ScenarioContext>()
3536
.When(context =>
3637
{
3738
return context.Logs.ToArray().Any(i =>
38-
i.Message.StartsWith(
39-
"Ensure started. Infrastructure started"));
39+
i.Message.StartsWith(AuditIngestion.LogMessages.StartedInfrastructure));
4040
}, (_, __) =>
4141
{
4242
var databaseConfiguration = ServiceProvider.GetRequiredService<DatabaseConfiguration>();
@@ -47,8 +47,7 @@ await Define<ScenarioContext>()
4747
.When(context =>
4848
{
4949
return context.Logs.ToArray().Any(i =>
50-
i.Message.StartsWith(
51-
"Shutting down due to failed persistence health check. Infrastructure shut down completed"));
50+
i.Message.StartsWith(AuditIngestion.LogMessages.StoppedInfrastructure));
5251
}, (bus, c) => bus.SendLocal(new MyMessage()))
5352
)
5453
.Done(async c => await this.TryGetSingle<MessagesView>(
@@ -72,7 +71,7 @@ await Define<ScenarioContext>()
7271
{
7372
return context.Logs.ToArray().Any(i =>
7473
i.Message.StartsWith(
75-
"Ensure started. Infrastructure started"));
74+
AuditIngestion.LogMessages.StartedInfrastructure));
7675
}, (session, context) =>
7776
{
7877
var databaseConfiguration = ServiceProvider.GetRequiredService<DatabaseConfiguration>();
@@ -83,8 +82,7 @@ await Define<ScenarioContext>()
8382
.When(context =>
8483
{
8584
ingestionShutdown = context.Logs.ToArray().Any(i =>
86-
i.Message.StartsWith(
87-
"Shutting down due to failed persistence health check. Infrastructure shut down completed"));
85+
i.Message.StartsWith(AuditIngestion.LogMessages.StoppedInfrastructure));
8886

8987
return ingestionShutdown;
9088
},

src/ServiceControl.Audit/Auditing/AuditIngestion.cs

Lines changed: 83 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -68,107 +68,123 @@ Task OnCriticalError(string failure, Exception exception)
6868
return watchdog.OnFailure(failure);
6969
}
7070

71-
async Task EnsureStarted(CancellationToken cancellationToken = default)
71+
async Task EnsureStarted(CancellationToken cancellationToken)
7272
{
7373
try
7474
{
75-
logger.Debug("Ensure started. Start/stop semaphore acquiring");
7675
await startStopSemaphore.WaitAsync(cancellationToken);
77-
logger.Debug("Ensure started. Start/stop semaphore acquired");
7876

79-
if (!unitOfWorkFactory.CanIngestMore())
80-
{
81-
if (queueIngestor != null)
82-
{
83-
var stoppable = queueIngestor;
84-
queueIngestor = null;
85-
logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down commencing");
86-
await stoppable.StopReceive(cancellationToken);
87-
logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down completed");
88-
}
77+
var canIngest = unitOfWorkFactory.CanIngestMore();
8978

90-
return;
91-
}
79+
logger.DebugFormat("Ensure started {0}", canIngest);
9280

93-
if (queueIngestor != null)
81+
if (canIngest)
82+
{
83+
await SetUpAndStartInfrastructure(cancellationToken);
84+
}
85+
else
9486
{
95-
logger.Debug("Ensure started. Already started, skipping start up");
96-
return; //Already started
87+
await StopAndTeardownInfrastructure(cancellationToken);
88+
}
89+
}
90+
catch (Exception e)
91+
{
92+
try
93+
{
94+
await StopAndTeardownInfrastructure(cancellationToken);
95+
}
96+
catch (Exception teardownException)
97+
{
98+
throw new AggregateException(e, teardownException);
9799
}
98100

99-
logger.Info("Ensure started. Infrastructure starting");
101+
throw;
102+
}
103+
finally
104+
{
105+
startStopSemaphore.Release();
106+
}
107+
}
100108

109+
async Task SetUpAndStartInfrastructure(CancellationToken cancellationToken)
110+
{
111+
if (messageReceiver != null)
112+
{
113+
logger.Debug("Infrastructure already Started");
114+
return;
115+
}
116+
117+
try
118+
{
119+
logger.Info("Starting infrastructure");
101120
transportInfrastructure = await transportCustomization.CreateTransportInfrastructure(
102121
inputEndpoint,
103122
transportSettings,
104123
OnMessage,
105124
errorHandlingPolicy.OnError,
106125
OnCriticalError,
107-
TransportTransactionMode.ReceiveOnly);
126+
TransportTransactionMode.ReceiveOnly
127+
);
108128

109-
queueIngestor = transportInfrastructure.Receivers[inputEndpoint];
129+
messageReceiver = transportInfrastructure.Receivers[inputEndpoint];
110130

111131
await auditIngestor.VerifyCanReachForwardingAddress();
132+
await messageReceiver.StartReceive(cancellationToken);
112133

113-
await queueIngestor.StartReceive(cancellationToken);
134+
logger.Info(LogMessages.StartedInfrastructure);
135+
}
136+
catch (Exception e)
137+
{
138+
logger.Error("Failed to start infrastructure", e);
139+
throw;
140+
}
141+
}
114142

115-
logger.Info("Ensure started. Infrastructure started");
143+
async Task StopAndTeardownInfrastructure(CancellationToken cancellationToken)
144+
{
145+
if (transportInfrastructure == null)
146+
{
147+
logger.Debug("Infrastructure already Stopped");
148+
return;
116149
}
117-
catch
150+
151+
try
118152
{
119-
if (queueIngestor != null)
153+
logger.Info("Stopping infrastructure");
154+
try
120155
{
121-
try
122-
{
123-
await queueIngestor.StopReceive(cancellationToken);
124-
}
125-
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
156+
if (messageReceiver != null)
126157
{
127-
logger.Info("StopReceive cancelled");
158+
await messageReceiver.StopReceive(cancellationToken);
128159
}
129160
}
161+
finally
162+
{
163+
await transportInfrastructure.Shutdown(cancellationToken);
164+
}
130165

131-
queueIngestor = null; // Setting to null so that it doesn't exit when it retries in line 185
166+
messageReceiver = null;
167+
transportInfrastructure = null;
132168

133-
throw;
169+
logger.Info(LogMessages.StoppedInfrastructure);
134170
}
135-
finally
171+
catch (Exception e)
136172
{
137-
logger.Debug("Ensure started. Start/stop semaphore releasing");
138-
startStopSemaphore.Release();
139-
logger.Debug("Ensure started. Start/stop semaphore released");
173+
logger.Error("Failed to stop infrastructure", e);
174+
throw;
140175
}
141176
}
142177

143-
async Task EnsureStopped(CancellationToken cancellationToken = default)
178+
async Task EnsureStopped(CancellationToken cancellationToken)
144179
{
145180
try
146181
{
147-
logger.Info("Shutting down. Start/stop semaphore acquiring");
148182
await startStopSemaphore.WaitAsync(cancellationToken);
149-
logger.Info("Shutting down. Start/stop semaphore acquired");
150-
151-
if (queueIngestor == null)
152-
{
153-
logger.Info("Shutting down. Already stopped, skipping shut down");
154-
return; //Already stopped
155-
}
156-
157-
var stoppable = queueIngestor;
158-
queueIngestor = null;
159-
logger.Info("Shutting down. Infrastructure shut down commencing");
160-
await stoppable.StopReceive(cancellationToken);
161-
logger.Info("Shutting down. Infrastructure shut down completed");
162-
}
163-
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
164-
{
165-
logger.Info("StopReceive cancelled");
183+
await StopAndTeardownInfrastructure(cancellationToken);
166184
}
167185
finally
168186
{
169-
logger.Info("Shutting down. Start/stop semaphore releasing");
170187
startStopSemaphore.Release();
171-
logger.Info("Shutting down. Start/stop semaphore released");
172188
}
173189
}
174190

@@ -194,7 +210,7 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati
194210

195211
public override async Task StartAsync(CancellationToken cancellationToken)
196212
{
197-
await watchdog.Start(() => applicationLifetime.StopApplication());
213+
await watchdog.Start(() => applicationLifetime.StopApplication(), cancellationToken);
198214
await base.StartAsync(cancellationToken);
199215
}
200216

@@ -260,7 +276,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)
260276
{
261277
try
262278
{
263-
await watchdog.Stop();
279+
await watchdog.Stop(cancellationToken);
264280
channel.Writer.Complete();
265281
await base.StopAsync(cancellationToken);
266282
}
@@ -281,7 +297,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)
281297
}
282298

283299
TransportInfrastructure transportInfrastructure;
284-
IMessageReceiver queueIngestor;
300+
IMessageReceiver messageReceiver;
285301
long consecutiveBatchFailures = 0;
286302

287303
readonly SemaphoreSlim startStopSemaphore = new(1);
@@ -301,5 +317,11 @@ public override async Task StopAsync(CancellationToken cancellationToken)
301317
readonly IHostApplicationLifetime applicationLifetime;
302318

303319
static readonly ILog logger = LogManager.GetLogger<AuditIngestion>();
320+
321+
internal static class LogMessages
322+
{
323+
internal const string StartedInfrastructure = "Started infrastructure";
324+
internal const string StoppedInfrastructure = "Stopped infrastructure";
325+
}
304326
}
305327
}

0 commit comments

Comments
 (0)