Skip to content

Commit 6cdcab7

Browse files
committed
- Applied changes from audit to error
- Removed duplication in EnsureStopped - Removed log events around semaphore handling
1 parent e601ac4 commit 6cdcab7

File tree

2 files changed

+80
-52
lines changed

2 files changed

+80
-52
lines changed

src/ServiceControl.Audit/Auditing/AuditIngestion.cs

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,7 @@ async Task EnsureStarted(CancellationToken cancellationToken)
7272
{
7373
try
7474
{
75-
logger.Debug("Ensure started. Start/stop semaphore acquiring");
7675
await startStopSemaphore.WaitAsync(cancellationToken);
77-
logger.Debug("Ensure started. Start/stop semaphore acquired");
7876

7977
var canIngest = unitOfWorkFactory.CanIngestMore();
8078

@@ -104,9 +102,7 @@ async Task EnsureStarted(CancellationToken cancellationToken)
104102
}
105103
finally
106104
{
107-
logger.Debug("Ensure started. Start/stop semaphore releasing");
108105
startStopSemaphore.Release();
109-
logger.Debug("Ensure started. Start/stop semaphore released");
110106
}
111107
}
112108

@@ -181,25 +177,8 @@ async Task EnsureStopped(CancellationToken cancellationToken)
181177
{
182178
try
183179
{
184-
logger.Info("Shutting down. Start/stop semaphore acquiring");
185180
await startStopSemaphore.WaitAsync(cancellationToken);
186-
logger.Info("Shutting down. Start/stop semaphore acquired");
187-
188-
if (queueIngestor == null)
189-
{
190-
logger.Info("Shutting down. Already stopped, skipping shut down");
191-
return; //Already stopped
192-
}
193-
194-
var stoppable = queueIngestor;
195-
queueIngestor = null;
196-
logger.Info("Shutting down. Infrastructure shut down commencing");
197-
await stoppable.StopReceive(cancellationToken);
198-
logger.Info("Shutting down. Infrastructure shut down completed");
199-
}
200-
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
201-
{
202-
logger.Info("StopReceive cancelled");
181+
await StopAndTeardownInfrastructure(cancellationToken);
203182
}
204183
finally
205184
{

src/ServiceControl/Operations/ErrorIngestion.cs

Lines changed: 79 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -158,30 +158,57 @@ async Task EnsureStarted(CancellationToken cancellationToken = default)
158158
{
159159
await startStopSemaphore.WaitAsync(cancellationToken);
160160

161-
if (!unitOfWorkFactory.CanIngestMore())
161+
var canIngest = unitOfWorkFactory.CanIngestMore();
162+
163+
Logger.DebugFormat("Ensure started {0}", canIngest);
164+
165+
if (canIngest)
162166
{
163-
if (messageReceiver != null)
164-
{
165-
var stoppable = messageReceiver;
166-
messageReceiver = null;
167-
await stoppable.StopReceive(cancellationToken);
168-
Logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down completed");
169-
}
170-
return;
167+
await SetUpAndStartInfrastructure(cancellationToken);
171168
}
172-
173-
if (messageReceiver != null)
169+
else
170+
{
171+
await StopAndTeardownInfrastructure(cancellationToken);
172+
}
173+
}
174+
catch (Exception e)
175+
{
176+
try
174177
{
175-
return; //Already started
178+
await StopAndTeardownInfrastructure(cancellationToken);
179+
}
180+
catch (Exception teardownException)
181+
{
182+
throw new AggregateException(e, teardownException);
176183
}
177184

185+
throw;
186+
}
187+
finally
188+
{
189+
startStopSemaphore.Release();
190+
}
191+
}
192+
193+
async Task SetUpAndStartInfrastructure(CancellationToken cancellationToken)
194+
{
195+
if (messageReceiver != null)
196+
{
197+
Logger.Debug("Infrastructure already Started");
198+
return;
199+
}
200+
201+
try
202+
{
203+
Logger.Info("Starting infrastructure");
178204
transportInfrastructure = await transportCustomization.CreateTransportInfrastructure(
179205
errorQueue,
180206
transportSettings,
181207
OnMessage,
182208
errorHandlingPolicy.OnError,
183209
OnCriticalError,
184-
TransportTransactionMode.ReceiveOnly);
210+
TransportTransactionMode.ReceiveOnly
211+
);
185212

186213
messageReceiver = transportInfrastructure.Receivers[errorQueue];
187214

@@ -192,22 +219,45 @@ async Task EnsureStarted(CancellationToken cancellationToken = default)
192219

193220
await messageReceiver.StartReceive(cancellationToken);
194221

195-
Logger.Info("Ensure started. Infrastructure started");
222+
Logger.Info(LogMessages.StartedInfrastructure);
196223
}
197-
catch
224+
catch (Exception e)
198225
{
199-
if (messageReceiver != null)
226+
Logger.Error("Failed to start infrastructure", e);
227+
throw;
228+
}
229+
}
230+
async Task StopAndTeardownInfrastructure(CancellationToken cancellationToken)
231+
{
232+
if (transportInfrastructure == null)
233+
{
234+
Logger.Debug("Infrastructure already Stopped");
235+
return;
236+
}
237+
try
238+
{
239+
Logger.Info("Stopping infrastructure");
240+
try
241+
{
242+
if (messageReceiver != null)
243+
{
244+
await messageReceiver.StopReceive(cancellationToken);
245+
}
246+
}
247+
finally
200248
{
201-
await messageReceiver.StopReceive(cancellationToken);
249+
await transportInfrastructure.Shutdown(cancellationToken);
202250
}
203251

204-
messageReceiver = null; // Setting to null so that it doesn't exit when it retries in line 134
252+
messageReceiver = null;
253+
transportInfrastructure = null;
205254

206-
throw;
255+
Logger.Info(LogMessages.StoppedInfrastructure);
207256
}
208-
finally
257+
catch (Exception e)
209258
{
210-
startStopSemaphore.Release();
259+
Logger.Error("Failed to stop infrastructure", e);
260+
throw;
211261
}
212262
}
213263

@@ -238,22 +288,15 @@ async Task EnsureStopped(CancellationToken cancellationToken = default)
238288
try
239289
{
240290
await startStopSemaphore.WaitAsync(cancellationToken);
241-
242-
if (messageReceiver == null)
243-
{
244-
return; //Already stopped
245-
}
246-
var stoppable = messageReceiver;
247-
messageReceiver = null;
248-
await stoppable.StopReceive(cancellationToken);
291+
await StopAndTeardownInfrastructure(cancellationToken);
249292
}
250293
finally
251294
{
252295
startStopSemaphore.Release();
253296
}
254297
}
255298

256-
SemaphoreSlim startStopSemaphore = new SemaphoreSlim(1);
299+
SemaphoreSlim startStopSemaphore = new(1);
257300
string errorQueue;
258301
ErrorIngestionFaultPolicy errorHandlingPolicy;
259302
TransportInfrastructure transportInfrastructure;
@@ -272,5 +315,11 @@ async Task EnsureStopped(CancellationToken cancellationToken = default)
272315
readonly IHostApplicationLifetime applicationLifetime;
273316

274317
static readonly ILog Logger = LogManager.GetLogger<ErrorIngestion>();
318+
319+
internal static class LogMessages
320+
{
321+
internal const string StartedInfrastructure = "Started infrastructure";
322+
internal const string StoppedInfrastructure = "Stopped infrastructure";
323+
}
275324
}
276325
}

0 commit comments

Comments
 (0)