Skip to content

Commit 286ec83

Browse files
committed
- Applied changes from audit to error
- Removed duplication in EnsureStopped - Removed log events around semaphore handling
1 parent e601ac4 commit 286ec83

File tree

2 files changed

+89
-61
lines changed

2 files changed

+89
-61
lines changed

src/ServiceControl.Audit/Auditing/AuditIngestion.cs

Lines changed: 10 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,7 @@ async Task EnsureStarted(CancellationToken cancellationToken)
7272
{
7373
try
7474
{
75-
logger.Debug("Ensure started. Start/stop semaphore acquiring");
7675
await startStopSemaphore.WaitAsync(cancellationToken);
77-
logger.Debug("Ensure started. Start/stop semaphore acquired");
7876

7977
var canIngest = unitOfWorkFactory.CanIngestMore();
8078

@@ -104,15 +102,13 @@ async Task EnsureStarted(CancellationToken cancellationToken)
104102
}
105103
finally
106104
{
107-
logger.Debug("Ensure started. Start/stop semaphore releasing");
108105
startStopSemaphore.Release();
109-
logger.Debug("Ensure started. Start/stop semaphore released");
110106
}
111107
}
112108

113109
async Task SetUpAndStartInfrastructure(CancellationToken cancellationToken)
114110
{
115-
if (queueIngestor != null)
111+
if (messageReceiver != null)
116112
{
117113
logger.Debug("Infrastructure already Started");
118114
return;
@@ -130,10 +126,10 @@ async Task SetUpAndStartInfrastructure(CancellationToken cancellationToken)
130126
TransportTransactionMode.ReceiveOnly
131127
);
132128

133-
queueIngestor = transportInfrastructure.Receivers[inputEndpoint];
129+
messageReceiver = transportInfrastructure.Receivers[inputEndpoint];
134130

135131
await auditIngestor.VerifyCanReachForwardingAddress();
136-
await queueIngestor.StartReceive(cancellationToken);
132+
await messageReceiver.StartReceive(cancellationToken);
137133

138134
logger.Info(LogMessages.StartedInfrastructure);
139135
}
@@ -157,17 +153,19 @@ async Task StopAndTeardownInfrastructure(CancellationToken cancellationToken)
157153
logger.Info("Stopping infrastructure");
158154
try
159155
{
160-
if (queueIngestor != null)
156+
if (messageReceiver != null)
161157
{
162-
await queueIngestor.StopReceive(cancellationToken);
158+
await messageReceiver.StopReceive(cancellationToken);
163159
}
164160
}
165161
finally
166162
{
167163
await transportInfrastructure.Shutdown(cancellationToken);
168164
}
169165

170-
queueIngestor = null;
166+
messageReceiver = null;
167+
transportInfrastructure = null;
168+
171169
logger.Info(LogMessages.StoppedInfrastructure);
172170
}
173171
catch (Exception e)
@@ -181,31 +179,12 @@ async Task EnsureStopped(CancellationToken cancellationToken)
181179
{
182180
try
183181
{
184-
logger.Info("Shutting down. Start/stop semaphore acquiring");
185182
await startStopSemaphore.WaitAsync(cancellationToken);
186-
logger.Info("Shutting down. Start/stop semaphore acquired");
187-
188-
if (queueIngestor == null)
189-
{
190-
logger.Info("Shutting down. Already stopped, skipping shut down");
191-
return; //Already stopped
192-
}
193-
194-
var stoppable = queueIngestor;
195-
queueIngestor = null;
196-
logger.Info("Shutting down. Infrastructure shut down commencing");
197-
await stoppable.StopReceive(cancellationToken);
198-
logger.Info("Shutting down. Infrastructure shut down completed");
199-
}
200-
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
201-
{
202-
logger.Info("StopReceive cancelled");
183+
await StopAndTeardownInfrastructure(cancellationToken);
203184
}
204185
finally
205186
{
206-
logger.Info("Shutting down. Start/stop semaphore releasing");
207187
startStopSemaphore.Release();
208-
logger.Info("Shutting down. Start/stop semaphore released");
209188
}
210189
}
211190

@@ -318,7 +297,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)
318297
}
319298

320299
TransportInfrastructure transportInfrastructure;
321-
IMessageReceiver queueIngestor;
300+
IMessageReceiver messageReceiver;
322301
long consecutiveBatchFailures = 0;
323302

324303
readonly SemaphoreSlim startStopSemaphore = new(1);

src/ServiceControl/Operations/ErrorIngestion.cs

Lines changed: 79 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -158,30 +158,57 @@ async Task EnsureStarted(CancellationToken cancellationToken = default)
158158
{
159159
await startStopSemaphore.WaitAsync(cancellationToken);
160160

161-
if (!unitOfWorkFactory.CanIngestMore())
161+
var canIngest = unitOfWorkFactory.CanIngestMore();
162+
163+
Logger.DebugFormat("Ensure started {0}", canIngest);
164+
165+
if (canIngest)
162166
{
163-
if (messageReceiver != null)
164-
{
165-
var stoppable = messageReceiver;
166-
messageReceiver = null;
167-
await stoppable.StopReceive(cancellationToken);
168-
Logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down completed");
169-
}
170-
return;
167+
await SetUpAndStartInfrastructure(cancellationToken);
171168
}
172-
173-
if (messageReceiver != null)
169+
else
170+
{
171+
await StopAndTeardownInfrastructure(cancellationToken);
172+
}
173+
}
174+
catch (Exception e)
175+
{
176+
try
174177
{
175-
return; //Already started
178+
await StopAndTeardownInfrastructure(cancellationToken);
179+
}
180+
catch (Exception teardownException)
181+
{
182+
throw new AggregateException(e, teardownException);
176183
}
177184

185+
throw;
186+
}
187+
finally
188+
{
189+
startStopSemaphore.Release();
190+
}
191+
}
192+
193+
async Task SetUpAndStartInfrastructure(CancellationToken cancellationToken)
194+
{
195+
if (messageReceiver != null)
196+
{
197+
Logger.Debug("Infrastructure already Started");
198+
return;
199+
}
200+
201+
try
202+
{
203+
Logger.Info("Starting infrastructure");
178204
transportInfrastructure = await transportCustomization.CreateTransportInfrastructure(
179205
errorQueue,
180206
transportSettings,
181207
OnMessage,
182208
errorHandlingPolicy.OnError,
183209
OnCriticalError,
184-
TransportTransactionMode.ReceiveOnly);
210+
TransportTransactionMode.ReceiveOnly
211+
);
185212

186213
messageReceiver = transportInfrastructure.Receivers[errorQueue];
187214

@@ -192,22 +219,45 @@ async Task EnsureStarted(CancellationToken cancellationToken = default)
192219

193220
await messageReceiver.StartReceive(cancellationToken);
194221

195-
Logger.Info("Ensure started. Infrastructure started");
222+
Logger.Info(LogMessages.StartedInfrastructure);
196223
}
197-
catch
224+
catch (Exception e)
198225
{
199-
if (messageReceiver != null)
226+
Logger.Error("Failed to start infrastructure", e);
227+
throw;
228+
}
229+
}
230+
async Task StopAndTeardownInfrastructure(CancellationToken cancellationToken)
231+
{
232+
if (transportInfrastructure == null)
233+
{
234+
Logger.Debug("Infrastructure already Stopped");
235+
return;
236+
}
237+
try
238+
{
239+
Logger.Info("Stopping infrastructure");
240+
try
241+
{
242+
if (messageReceiver != null)
243+
{
244+
await messageReceiver.StopReceive(cancellationToken);
245+
}
246+
}
247+
finally
200248
{
201-
await messageReceiver.StopReceive(cancellationToken);
249+
await transportInfrastructure.Shutdown(cancellationToken);
202250
}
203251

204-
messageReceiver = null; // Setting to null so that it doesn't exit when it retries in line 134
252+
messageReceiver = null;
253+
transportInfrastructure = null;
205254

206-
throw;
255+
Logger.Info(LogMessages.StoppedInfrastructure);
207256
}
208-
finally
257+
catch (Exception e)
209258
{
210-
startStopSemaphore.Release();
259+
Logger.Error("Failed to stop infrastructure", e);
260+
throw;
211261
}
212262
}
213263

@@ -238,22 +288,15 @@ async Task EnsureStopped(CancellationToken cancellationToken = default)
238288
try
239289
{
240290
await startStopSemaphore.WaitAsync(cancellationToken);
241-
242-
if (messageReceiver == null)
243-
{
244-
return; //Already stopped
245-
}
246-
var stoppable = messageReceiver;
247-
messageReceiver = null;
248-
await stoppable.StopReceive(cancellationToken);
291+
await StopAndTeardownInfrastructure(cancellationToken);
249292
}
250293
finally
251294
{
252295
startStopSemaphore.Release();
253296
}
254297
}
255298

256-
SemaphoreSlim startStopSemaphore = new SemaphoreSlim(1);
299+
SemaphoreSlim startStopSemaphore = new(1);
257300
string errorQueue;
258301
ErrorIngestionFaultPolicy errorHandlingPolicy;
259302
TransportInfrastructure transportInfrastructure;
@@ -272,5 +315,11 @@ async Task EnsureStopped(CancellationToken cancellationToken = default)
272315
readonly IHostApplicationLifetime applicationLifetime;
273316

274317
static readonly ILog Logger = LogManager.GetLogger<ErrorIngestion>();
318+
319+
internal static class LogMessages
320+
{
321+
internal const string StartedInfrastructure = "Started infrastructure";
322+
internal const string StoppedInfrastructure = "Stopped infrastructure";
323+
}
275324
}
276325
}

0 commit comments

Comments
 (0)