Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
eb6e9dc
Emit metrics via otel instead of custom format
andreasohlund Jan 31, 2025
2d0880b
Minimize diff by reverting whitespace changes
ramonsmits Jan 31, 2025
9652aa0
Approvals
andreasohlund Jan 31, 2025
77b2c2c
Fix formatting
andreasohlund Jan 31, 2025
03690a0
Apply suggestions from code review
andreasohlund Jan 31, 2025
14c3474
fix formatting
ramonsmits Jan 31, 2025
7f37e53
Only use standar otel
andreasohlund Feb 2, 2025
7616151
Set instance id
andreasohlund Feb 2, 2025
e6d1f40
Set unit
andreasohlund Feb 3, 2025
88d2e71
Better metrics names
andreasohlund Feb 3, 2025
da4c35b
Emit body size
andreasohlund Feb 3, 2025
786559b
Fix meter name
ramonsmits Feb 3, 2025
a2011fc
Log that OpenTelemetry metrics exporter is enabled
ramonsmits Feb 3, 2025
f9d1986
Stop using prefixes
andreasohlund Feb 4, 2025
c9f9e74
Better name
andreasohlund Feb 4, 2025
e784f0e
Go back to using instance name as service name
andreasohlund Feb 5, 2025
b443a9c
better metrics names
andreasohlund Feb 5, 2025
dfbe780
More cleanup
andreasohlund Feb 5, 2025
4a07a2c
Revert app.config
andreasohlund Feb 6, 2025
9b32ba4
Add duration and failure counters
andreasohlund Feb 6, 2025
4954fd7
Add consecutive batch failures
andreasohlund Feb 6, 2025
fa5e658
Fix namespace for metrics
andreasohlund Feb 6, 2025
9426ea9
Skip interlocked increment
andreasohlund Feb 7, 2025
0abfbb7
Added descriptions for metric instruments
ramonsmits Feb 8, 2025
4469d94
Metric setup moved to extension method
ramonsmits Feb 8, 2025
09facb9
Merge branch 'master' into otel-metrics
andreasohlund Feb 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
<PackageVersion Include="NUnit" Version="4.3.2" />
<PackageVersion Include="NUnit.Analyzers" Version="4.6.0" />
<PackageVersion Include="NUnit3TestAdapter" Version="5.0.0" />
<PackageVersion Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.9.0" />
<PackageVersion Include="OpenTelemetry.Extensions.Hosting" Version="1.9.0" />
<PackageVersion Include="Particular.Approvals" Version="2.0.1" />
<PackageVersion Include="Particular.Licensing.Sources" Version="6.0.1" />
<PackageVersion Include="Particular.LicensingComponent.Report" Version="1.0.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ async Task InitializeServiceControl(ScenarioContext context)
{
var id = messageContext.NativeMessageId;
var headers = messageContext.Headers;

var log = NServiceBus.Logging.LogManager.GetLogger<ServiceControlComponentRunner>();
headers.TryGetValue(Headers.MessageId, out var originalMessageId);
log.Debug($"OnMessage for message '{id}'({originalMessageId ?? string.Empty}).");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"ApiUrl": "http://localhost:8888/api",
"Port": 8888,
"PrintMetrics": false,
"OtlpEndpointUrl": null,
"Hostname": "localhost",
"VirtualDirectory": "",
"TransportType": "LearningTransport",
Expand Down
1 change: 0 additions & 1 deletion src/ServiceControl.Audit/App.config
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ These settings are only here so that we can debug ServiceControl while developin
<add key="ServiceControl.Audit/ServiceControlQueueAddress" value="Particular.ServiceControl" />
<add key="ServiceControl.Audit/HostName" value="localhost" />
<add key="ServiceControl.Audit/DatabaseMaintenancePort" value="44445" />

<!-- DEVS - Pick a transport to run Auditing instance on -->
<add key="ServiceControl.Audit/TransportType" value="LearningTransport" />
<!--<add key="ServiceControl.Audit/TransportType" value="AmazonSQS" />-->
Expand Down
54 changes: 31 additions & 23 deletions src/ServiceControl.Audit/Auditing/AuditIngestion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
Expand All @@ -14,18 +14,14 @@
using Persistence;
using Persistence.UnitOfWork;
using ServiceControl.Infrastructure;
using ServiceControl.Infrastructure.Metrics;
using Transports;

class AuditIngestion : IHostedService
{
static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000;

public AuditIngestion(
Settings settings,
ITransportCustomization transportCustomization,
TransportSettings transportSettings,
Metrics metrics,
IFailedAuditStorage failedImportsStorage,
AuditIngestionCustomCheck.State ingestionState,
AuditIngestor auditIngestor,
Expand All @@ -40,10 +36,6 @@ public AuditIngestion(
this.settings = settings;
this.applicationLifetime = applicationLifetime;

batchSizeMeter = metrics.GetMeter("Audit ingestion - batch size");
batchDurationMeter = metrics.GetMeter("Audit ingestion - batch processing duration", FrequencyInMilliseconds);
receivedMeter = metrics.GetCounter("Audit ingestion - received");

if (!transportSettings.MaxConcurrency.HasValue)
{
throw new ArgumentException("MaxConcurrency is not set in TransportSettings");
Expand Down Expand Up @@ -102,6 +94,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default)
await stoppable.StopReceive(cancellationToken);
logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down completed");
}

return;
}

Expand Down Expand Up @@ -168,6 +161,7 @@ async Task EnsureStopped(CancellationToken cancellationToken = default)
logger.Info("Shutting down. Already stopped, skipping shut down");
return; //Already stopped
}

var stoppable = queueIngestor;
queueIngestor = null;
logger.Info("Shutting down. Infrastructure shut down commencing");
Expand All @@ -188,18 +182,22 @@ async Task EnsureStopped(CancellationToken cancellationToken = default)

async Task OnMessage(MessageContext messageContext, CancellationToken cancellationToken)
{
if (settings.MessageFilter != null && settings.MessageFilter(messageContext))
using (new DurationRecorder(ingestionDuration))
{
return;
}
if (settings.MessageFilter != null && settings.MessageFilter(messageContext))
{
return;
}

var taskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
messageContext.SetTaskCompletionSource(taskCompletionSource);
var taskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
messageContext.SetTaskCompletionSource(taskCompletionSource);

receivedMeter.Mark();
await channel.Writer.WriteAsync(messageContext, cancellationToken);
await taskCompletionSource.Task;

await channel.Writer.WriteAsync(messageContext, cancellationToken);
await taskCompletionSource.Task;
ingestedMessagesCounter.Add(1);
messageSize.Record(messageContext.Body.Length / 1024.0);
}
}

async Task Loop()
Expand All @@ -217,11 +215,14 @@ async Task Loop()
contexts.Add(context);
}

batchSizeMeter.Mark(contexts.Count);
using (batchDurationMeter.Measure())
auditBatchSize.Record(contexts.Count);

using (new DurationRecorder(auditBatchDuration))
{
await auditIngestor.Ingest(contexts);
}

consecutiveBatchFailuresCounter.Record(0);
}
catch (OperationCanceledException)
{
Expand All @@ -240,6 +241,9 @@ async Task Loop()
{
context.GetTaskCompletionSource().TrySetException(e);
}

// no need to do interlocked increment since this is running sequential
consecutiveBatchFailuresCounter.Record(consecutiveBatchFailures++);
}
finally
{
Expand All @@ -251,8 +255,9 @@ async Task Loop()

TransportInfrastructure transportInfrastructure;
IMessageReceiver queueIngestor;
long consecutiveBatchFailures = 0;

readonly SemaphoreSlim startStopSemaphore = new SemaphoreSlim(1);
readonly SemaphoreSlim startStopSemaphore = new(1);
readonly string inputEndpoint;
readonly ITransportCustomization transportCustomization;
readonly TransportSettings transportSettings;
Expand All @@ -261,9 +266,12 @@ async Task Loop()
readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory;
readonly Settings settings;
readonly Channel<MessageContext> channel;
readonly Meter batchSizeMeter;
readonly Meter batchDurationMeter;
readonly Counter receivedMeter;
readonly Histogram<long> auditBatchSize = Telemetry.Meter.CreateHistogram<long>(Telemetry.CreateInstrumentName("ingestion", "batch_size"), description: "Audit ingestion average batch size");
readonly Histogram<double> auditBatchDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "batch_duration"), unit: "ms", "Average audit message batch processing duration");
readonly Histogram<double> messageSize = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "message_size"), unit: "kilobytes", description: "Average audit message body size");
readonly Counter<long> ingestedMessagesCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "count"), description: "Successful ingested audit message count");
readonly Histogram<long> consecutiveBatchFailuresCounter = Telemetry.Meter.CreateHistogram<long>(Telemetry.CreateInstrumentName("ingestion", "consecutive_batch_failures"), unit: "count", description: "Consecutive audit ingestion batch failure");
readonly Histogram<double> ingestionDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "duration"), unit: "ms", description: "Average incoming audit message processing duration");
readonly Watchdog watchdog;
readonly Task ingestionWorker;
readonly IHostApplicationLifetime applicationLifetime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{
using System;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.IO;
using System.Runtime.InteropServices;
using System.Runtime.Versioning;
Expand Down Expand Up @@ -37,10 +38,13 @@ public async Task<ErrorHandleResult> OnError(ErrorContext errorContext, Cancella
//Same as recoverability policy in NServiceBusFactory
if (errorContext.ImmediateProcessingFailures < 3)
{
retryCounter.Add(1);
return ErrorHandleResult.RetryRequired;
}

await StoreFailedMessageDocument(errorContext, cancellationToken);

failedCounter.Add(1);
return ErrorHandleResult.Handled;
}

Expand Down Expand Up @@ -100,6 +104,9 @@ void WriteToEventLog(string message)
EventLog.WriteEntry(EventSourceCreator.SourceName, message, EventLogEntryType.Error);
}

readonly Counter<long> retryCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "retry"), description: "Audit ingestion retries count");
readonly Counter<long> failedCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "failed"), description: "Audit ingestion failure count");

static readonly ILog log = LogManager.GetLogger<AuditIngestionFaultPolicy>();
}
}
49 changes: 11 additions & 38 deletions src/ServiceControl.Audit/Auditing/AuditIngestor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Linq;
using System.Threading.Tasks;
using Infrastructure.Settings;
Expand All @@ -14,13 +14,11 @@
using Persistence.UnitOfWork;
using Recoverability;
using SagaAudit;
using ServiceControl.Infrastructure.Metrics;
using ServiceControl.Transports;

public class AuditIngestor
{
public AuditIngestor(
Metrics metrics,
Settings settings,
IAuditIngestionUnitOfWorkFactory unitOfWorkFactory,
EndpointInstanceMonitoring endpointInstanceMonitoring,
Expand All @@ -32,50 +30,28 @@ ITransportCustomization transportCustomization
{
this.settings = settings;
this.messageDispatcher = messageDispatcher;

var ingestedAuditMeter = metrics.GetCounter("Audit ingestion - ingested audit");
var ingestedSagaAuditMeter = metrics.GetCounter("Audit ingestion - ingested saga audit");
var auditBulkInsertDurationMeter = metrics.GetMeter("Audit ingestion - audit bulk insert duration", FrequencyInMilliseconds);
var sagaAuditBulkInsertDurationMeter = metrics.GetMeter("Audit ingestion - saga audit bulk insert duration", FrequencyInMilliseconds);
var bulkInsertCommitDurationMeter = metrics.GetMeter("Audit ingestion - bulk insert commit duration", FrequencyInMilliseconds);

var enrichers = new IEnrichImportedAuditMessages[]
{
new MessageTypeEnricher(),
new EnrichWithTrackingIds(),
new ProcessingStatisticsEnricher(),
new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring),
new DetectSuccessfulRetriesEnricher(),
new SagaRelationshipsEnricher()
}.Concat(auditEnrichers).ToArray();
var enrichers = new IEnrichImportedAuditMessages[] { new MessageTypeEnricher(), new EnrichWithTrackingIds(), new ProcessingStatisticsEnricher(), new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring), new DetectSuccessfulRetriesEnricher(), new SagaRelationshipsEnricher() }.Concat(auditEnrichers).ToArray();

logQueueAddress = transportCustomization.ToTransportQualifiedQueueName(settings.AuditLogQueue);

auditPersister = new AuditPersister(unitOfWorkFactory, enrichers, ingestedAuditMeter, ingestedSagaAuditMeter, auditBulkInsertDurationMeter, sagaAuditBulkInsertDurationMeter, bulkInsertCommitDurationMeter, messageSession, messageDispatcher);
auditPersister = new AuditPersister(
unitOfWorkFactory,
enrichers,
messageSession,
messageDispatcher
);
}

public async Task Ingest(List<MessageContext> contexts)
{
if (Log.IsDebugEnabled)
{
Log.Debug($"Ingesting {contexts.Count} message contexts");
}

var stored = await auditPersister.Persist(contexts);

try
{
if (settings.ForwardAuditMessages)
{
if (Log.IsDebugEnabled)
{
Log.Debug($"Forwarding {stored.Count} messages");
}
await Forward(stored, logQueueAddress);
if (Log.IsDebugEnabled)
{
Log.Debug("Forwarded messages");
}
forwardedMessagesCounter.Add(stored.Count);
}

foreach (var context in contexts)
Expand All @@ -85,10 +61,7 @@ public async Task Ingest(List<MessageContext> contexts)
}
catch (Exception e)
{
if (Log.IsWarnEnabled)
{
Log.Warn("Forwarding messages failed", e);
}
Log.Warn("Forwarding messages failed", e);

// making sure to rethrow so that all messages get marked as failed
throw;
Expand Down Expand Up @@ -158,8 +131,8 @@ public async Task VerifyCanReachForwardingAddress()
readonly Settings settings;
readonly Lazy<IMessageDispatcher> messageDispatcher;
readonly string logQueueAddress;
readonly Counter<long> forwardedMessagesCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "forwarded_count"), description: "Audit ingestion forwarded message count");

static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000;
static readonly ILog Log = LogManager.GetLogger<AuditIngestor>();
}
}
Loading