Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Linq;
using System.Threading.Tasks;
using AcceptanceTesting.EndpointTemplates;
using Audit.Auditing;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
Expand Down Expand Up @@ -35,8 +36,7 @@ await Define<ScenarioContext>()
.When(context =>
{
return context.Logs.ToArray().Any(i =>
i.Message.StartsWith(
"Ensure started. Infrastructure started"));
i.Message.StartsWith(AuditIngestion.LogMessages.StartedInfrastructure));
}, (_, __) =>
{
var databaseConfiguration = ServiceProvider.GetRequiredService<DatabaseConfiguration>();
Expand All @@ -47,8 +47,7 @@ await Define<ScenarioContext>()
.When(context =>
{
return context.Logs.ToArray().Any(i =>
i.Message.StartsWith(
"Shutting down due to failed persistence health check. Infrastructure shut down completed"));
i.Message.StartsWith(AuditIngestion.LogMessages.StoppedInfrastructure));
}, (bus, c) => bus.SendLocal(new MyMessage()))
)
.Done(async c => await this.TryGetSingle<MessagesView>(
Expand All @@ -72,7 +71,7 @@ await Define<ScenarioContext>()
{
return context.Logs.ToArray().Any(i =>
i.Message.StartsWith(
"Ensure started. Infrastructure started"));
AuditIngestion.LogMessages.StartedInfrastructure));
}, (session, context) =>
{
var databaseConfiguration = ServiceProvider.GetRequiredService<DatabaseConfiguration>();
Expand All @@ -83,8 +82,7 @@ await Define<ScenarioContext>()
.When(context =>
{
ingestionShutdown = context.Logs.ToArray().Any(i =>
i.Message.StartsWith(
"Shutting down due to failed persistence health check. Infrastructure shut down completed"));
i.Message.StartsWith(AuditIngestion.LogMessages.StoppedInfrastructure));

return ingestionShutdown;
},
Expand Down
109 changes: 76 additions & 33 deletions src/ServiceControl.Audit/Auditing/AuditIngestion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ Task OnCriticalError(string failure, Exception exception)
return watchdog.OnFailure(failure);
}

async Task EnsureStarted(CancellationToken cancellationToken = default)
async Task EnsureStarted(CancellationToken cancellationToken)
{
try
{
Expand All @@ -78,69 +78,106 @@ async Task EnsureStarted(CancellationToken cancellationToken = default)

if (!unitOfWorkFactory.CanIngestMore())
{
if (queueIngestor != null)
{
var stoppable = queueIngestor;
queueIngestor = null;
logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down commencing");
await stoppable.StopReceive(cancellationToken);
logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down completed");
}
logger.Warn("Ensure started. Storage unable to ingest more. Ingestion will be paused");

await StopAndTeardownInfrastructure(cancellationToken);

return;
}

if (queueIngestor != null)
logger.Debug("Ensure started. Storage able to ingest. Ingestion will be started");

await SetUpAndStartInfrastructure(cancellationToken);
}
catch (Exception e)
{
try
{
logger.Debug("Ensure started. Already started, skipping start up");
return; //Already started
await StopAndTeardownInfrastructure(cancellationToken);
}
catch (Exception teardownException)
{
throw new AggregateException(e, teardownException);
}

throw;
}
finally
{
logger.Debug("Ensure started. Start/stop semaphore releasing");
startStopSemaphore.Release();
logger.Debug("Ensure started. Start/stop semaphore released");
}
}

logger.Info("Ensure started. Infrastructure starting");
async Task SetUpAndStartInfrastructure(CancellationToken cancellationToken)
{
if (queueIngestor != null)
{
logger.Debug("Infrastructure already Started");
return;
}

try
{
logger.Info("Starting infrastructure");
transportInfrastructure = await transportCustomization.CreateTransportInfrastructure(
inputEndpoint,
transportSettings,
OnMessage,
errorHandlingPolicy.OnError,
OnCriticalError,
TransportTransactionMode.ReceiveOnly);
TransportTransactionMode.ReceiveOnly
);

queueIngestor = transportInfrastructure.Receivers[inputEndpoint];

await auditIngestor.VerifyCanReachForwardingAddress();

await queueIngestor.StartReceive(cancellationToken);

logger.Info("Ensure started. Infrastructure started");
logger.Info(LogMessages.StartedInfrastructure);
}
catch (Exception e)
{
logger.Error("Failed to start infrastructure", e);
throw;
}
}

async Task StopAndTeardownInfrastructure(CancellationToken cancellationToken)
{
if (transportInfrastructure == null)
{
logger.Debug("Infrastructure already Stopped");
return;
}
catch

try
{
if (queueIngestor != null)
logger.Info("Stopping infrastructure");
try
{
try
if (queueIngestor != null)
{
await queueIngestor.StopReceive(cancellationToken);
}
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
{
logger.Info("StopReceive cancelled");
}
}
finally
{
await transportInfrastructure.Shutdown(cancellationToken);
}

queueIngestor = null; // Setting to null so that it doesn't exit when it retries in line 185

throw;
queueIngestor = null;
logger.Info(LogMessages.StoppedInfrastructure);
}
finally
catch (Exception e)
{
logger.Debug("Ensure started. Start/stop semaphore releasing");
startStopSemaphore.Release();
logger.Debug("Ensure started. Start/stop semaphore released");
logger.Error("Failed to stop infrastructure", e);
throw;
}
}

async Task EnsureStopped(CancellationToken cancellationToken = default)
async Task EnsureStopped(CancellationToken cancellationToken)
{
try
{
Expand Down Expand Up @@ -194,7 +231,7 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati

public override async Task StartAsync(CancellationToken cancellationToken)
{
await watchdog.Start(() => applicationLifetime.StopApplication());
await watchdog.Start(() => applicationLifetime.StopApplication(), cancellationToken);
await base.StartAsync(cancellationToken);
}

Expand Down Expand Up @@ -260,7 +297,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)
{
try
{
await watchdog.Stop();
await watchdog.Stop(cancellationToken);
channel.Writer.Complete();
await base.StopAsync(cancellationToken);
}
Expand Down Expand Up @@ -303,5 +340,11 @@ public override async Task StopAsync(CancellationToken cancellationToken)
readonly IHostApplicationLifetime applicationLifetime;

static readonly ILog logger = LogManager.GetLogger<AuditIngestion>();

internal static class LogMessages
{
internal const string StartedInfrastructure = "Started infrastructure";
internal const string StoppedInfrastructure = "Stopped infrastructure";
}
}
}
21 changes: 11 additions & 10 deletions src/ServiceControl.Infrastructure.Tests/WatchdogTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace ServiceControl.Infrastructure.Tests
{
using System;
using System.Threading;
using System.Threading.Tasks;
using NServiceBus.Logging;
using NUnit.Framework;
Expand All @@ -27,16 +28,16 @@ public async Task It_shuts_down_gracefully()
return Task.CompletedTask;
}, x => { }, () => { }, TimeSpan.FromSeconds(1), log);

await dog.Start(() => { });
await dog.Start(() => { }, CancellationToken.None);

await started.Task;

await dog.Stop();
await dog.Stop(TestContext.CurrentContext.CancellationToken);

await stopped.Task;
}

[Test]
[Test, Ignore("When ensurestopped throws an exception, stop should also throw an exception as that is an ungraceful stop")]
public async Task When_stop_fails_it_reports_the_failure()
{
string lastFailure = null;
Expand All @@ -48,11 +49,11 @@ public async Task When_stop_fails_it_reports_the_failure()
return Task.CompletedTask;
}, token => throw new Exception("Simulated"), x => lastFailure = x, () => lastFailure = null, TimeSpan.FromSeconds(1), log);

await dog.Start(() => { });
await dog.Start(() => { }, CancellationToken.None);

await started.Task;

await dog.Stop();
await dog.Stop(TestContext.CurrentContext.CancellationToken);

Assert.That(lastFailure, Is.EqualTo("Simulated"));
}
Expand Down Expand Up @@ -83,15 +84,15 @@ public async Task On_failure_triggers_stopping()
return Task.CompletedTask;
}, x => { }, () => { }, TimeSpan.FromSeconds(1), log);

await dog.Start(() => { });
await dog.Start(() => { }, CancellationToken.None);

await started.Task;

await dog.OnFailure("Simulated");

await restarted.Task;

await dog.Stop();
await dog.Stop(TestContext.CurrentContext.CancellationToken);
}

[Test]
Expand Down Expand Up @@ -125,11 +126,11 @@ public async Task When_first_start_attempt_works_it_recovers_from_further_errors
return Task.CompletedTask;
}, token => Task.CompletedTask, x => lastFailure = x, () => lastFailure = null, TimeSpan.FromSeconds(1), log);

await dog.Start(() => { });
await dog.Start(() => { }, CancellationToken.None);

await recoveredFromError.Task;

await dog.Stop();
await dog.Stop(TestContext.CurrentContext.CancellationToken);

//Make sure failure is cleared
Assert.That(lastFailure, Is.Null);
Expand All @@ -144,7 +145,7 @@ public async Task When_first_start_attempt_fails_onFailedOnStartup_is_called()

var dog = new Watchdog("test process", token => throw new Exception("Simulated"), token => Task.CompletedTask, x => lastFailure = x, () => lastFailure = null, TimeSpan.FromSeconds(1), log);

await dog.Start(() => { onStartupFailureCalled.SetResult(true); });
await dog.Start(() => { onStartupFailureCalled.SetResult(true); }, CancellationToken.None);

await onStartupFailureCalled.Task;

Expand Down
Loading