Skip to content

Commit 0d2ef57

Browse files
committed
Emit metrics via otel instead of custom format
1 parent 4013cb3 commit 0d2ef57

File tree

12 files changed

+102
-141
lines changed

12 files changed

+102
-141
lines changed

src/Directory.Packages.props

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
<PackageVersion Include="AWSSDK.CloudWatch" Version="3.7.402.21" />
99
<PackageVersion Include="AWSSDK.SecurityToken" Version="3.7.401.22" />
1010
<PackageVersion Include="Azure.Identity" Version="1.13.1" />
11+
<PackageVersion Include="Azure.Monitor.OpenTelemetry.Exporter" Version="1.3.0" />
1112
<PackageVersion Include="Azure.Monitor.Query" Version="1.6.0" />
1213
<PackageVersion Include="Azure.ResourceManager.ServiceBus" Version="1.0.1" />
1314
<PackageVersion Include="ByteSize" Version="2.1.2" />
@@ -50,6 +51,9 @@
5051
<PackageVersion Include="NUnit" Version="4.3.2" />
5152
<PackageVersion Include="NUnit.Analyzers" Version="4.6.0" />
5253
<PackageVersion Include="NUnit3TestAdapter" Version="4.6.0" />
54+
<PackageVersion Include="OpenTelemetry.Exporter.Console" Version="1.9.0" />
55+
<PackageVersion Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.9.0" />
56+
<PackageVersion Include="OpenTelemetry.Extensions.Hosting" Version="1.9.0" />
5357
<PackageVersion Include="Particular.Approvals" Version="2.0.1" />
5458
<PackageVersion Include="Particular.Licensing.Sources" Version="6.0.0" />
5559
<PackageVersion Include="Particular.LicensingComponent.Report" Version="1.0.0" />
@@ -87,4 +91,4 @@
8791
<GlobalPackageReference Include="Microsoft.Build.CopyOnWrite" Version="1.0.334" />
8892
<GlobalPackageReference Include="Particular.Packaging" Version="4.2.0" />
8993
</ItemGroup>
90-
</Project>
94+
</Project>

src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,17 +51,19 @@ async Task InitializeServiceControl(ScenarioContext context)
5151
TransportConnectionString = transportToUse.ConnectionString,
5252
MaximumConcurrencyLevel = 2,
5353
ServiceControlQueueAddress = "SHOULDNOTBEUSED",
54+
OtelMetricsUrl = "http://localhost:4317",
5455
MessageFilter = messageContext =>
5556
{
5657
var id = messageContext.NativeMessageId;
5758
var headers = messageContext.Headers;
58-
5959
var log = NServiceBus.Logging.LogManager.GetLogger<ServiceControlComponentRunner>();
60-
headers.TryGetValue(Headers.MessageId, out var originalMessageId);
60+
headers.TryGetValue(Headers.MessageId,
61+
out var originalMessageId);
6162
log.Debug($"OnMessage for message '{id}'({originalMessageId ?? string.Empty}).");
6263

6364
//Do not filter out CC, SA and HB messages as they can't be stamped
64-
if (headers.TryGetValue(Headers.EnclosedMessageTypes, out var messageTypes)
65+
if (headers.TryGetValue(Headers.EnclosedMessageTypes,
66+
out var messageTypes)
6567
&& (messageTypes.StartsWith("ServiceControl.Contracts") || messageTypes.StartsWith("ServiceControl.EndpointPlugin")))
6668
{
6769
return false;

src/ServiceControl.Audit/App.config

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ These settings are only here so that we can debug ServiceControl while developin
88
<add key="ServiceControl.Audit/ServiceControlQueueAddress" value="Particular.ServiceControl" />
99
<add key="ServiceControl.Audit/HostName" value="localhost" />
1010
<add key="ServiceControl.Audit/DatabaseMaintenancePort" value="44445" />
11-
11+
<add key="ServiceControl.Audit/OtelMetricsUrl" value="http://localhost:4317" />
1212
<!-- DEVS - Pick a transport to run Auditing instance on -->
1313
<add key="ServiceControl.Audit/TransportType" value="LearningTransport" />
1414
<!--<add key="ServiceControl.Audit/TransportType" value="AmazonSQS" />-->

src/ServiceControl.Audit/Auditing/AuditIngestion.cs

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System;
44
using System.Collections.Generic;
55
using System.Diagnostics;
6+
using System.Diagnostics.Metrics;
67
using System.Threading;
78
using System.Threading.Channels;
89
using System.Threading.Tasks;
@@ -14,18 +15,14 @@
1415
using Persistence;
1516
using Persistence.UnitOfWork;
1617
using ServiceControl.Infrastructure;
17-
using ServiceControl.Infrastructure.Metrics;
1818
using Transports;
1919

2020
class AuditIngestion : IHostedService
2121
{
22-
static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000;
23-
2422
public AuditIngestion(
2523
Settings settings,
2624
ITransportCustomization transportCustomization,
2725
TransportSettings transportSettings,
28-
Metrics metrics,
2926
IFailedAuditStorage failedImportsStorage,
3027
AuditIngestionCustomCheck.State ingestionState,
3128
AuditIngestor auditIngestor,
@@ -40,10 +37,6 @@ public AuditIngestion(
4037
this.settings = settings;
4138
this.applicationLifetime = applicationLifetime;
4239

43-
batchSizeMeter = metrics.GetMeter("Audit ingestion - batch size");
44-
batchDurationMeter = metrics.GetMeter("Audit ingestion - batch processing duration", FrequencyInMilliseconds);
45-
receivedMeter = metrics.GetCounter("Audit ingestion - received");
46-
4740
if (!transportSettings.MaxConcurrency.HasValue)
4841
{
4942
throw new ArgumentException("MaxConcurrency is not set in TransportSettings");
@@ -102,6 +95,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default)
10295
await stoppable.StopReceive(cancellationToken);
10396
logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down completed");
10497
}
98+
10599
return;
106100
}
107101

@@ -168,6 +162,7 @@ async Task EnsureStopped(CancellationToken cancellationToken = default)
168162
logger.Info("Shutting down. Already stopped, skipping shut down");
169163
return; //Already stopped
170164
}
165+
171166
var stoppable = queueIngestor;
172167
queueIngestor = null;
173168
logger.Info("Shutting down. Infrastructure shut down commencing");
@@ -196,7 +191,7 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati
196191
var taskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
197192
messageContext.SetTaskCompletionSource(taskCompletionSource);
198193

199-
receivedMeter.Mark();
194+
receivedMeter.Add(1);
200195

201196
await channel.Writer.WriteAsync(messageContext, cancellationToken);
202197
await taskCompletionSource.Task;
@@ -217,11 +212,11 @@ async Task Loop()
217212
contexts.Add(context);
218213
}
219214

220-
batchSizeMeter.Mark(contexts.Count);
221-
using (batchDurationMeter.Measure())
222-
{
223-
await auditIngestor.Ingest(contexts);
224-
}
215+
batchSizeMeter.Record(contexts.Count);
216+
var sw = Stopwatch.StartNew();
217+
218+
await auditIngestor.Ingest(contexts);
219+
batchDurationMeter.Record(sw.ElapsedMilliseconds);
225220
}
226221
catch (OperationCanceledException)
227222
{
@@ -261,9 +256,9 @@ async Task Loop()
261256
readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory;
262257
readonly Settings settings;
263258
readonly Channel<MessageContext> channel;
264-
readonly Meter batchSizeMeter;
265-
readonly Meter batchDurationMeter;
266-
readonly Counter receivedMeter;
259+
readonly Histogram<long> batchSizeMeter = AuditMetrics.Meter.CreateHistogram<long>($"{AuditMetrics.Prefix}.batch_size");
260+
readonly Histogram<double> batchDurationMeter = AuditMetrics.Meter.CreateHistogram<double>($"{AuditMetrics.Prefix}.batch_duration_ms");
261+
readonly Counter<long> receivedMeter = AuditMetrics.Meter.CreateCounter<long>($"{AuditMetrics.Prefix}.received");
267262
readonly Watchdog watchdog;
268263
readonly Task ingestionWorker;
269264
readonly IHostApplicationLifetime applicationLifetime;

src/ServiceControl.Audit/Auditing/AuditIngestor.cs

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,11 @@
1414
using Persistence.UnitOfWork;
1515
using Recoverability;
1616
using SagaAudit;
17-
using ServiceControl.Infrastructure.Metrics;
1817
using ServiceControl.Transports;
1918

2019
public class AuditIngestor
2120
{
2221
public AuditIngestor(
23-
Metrics metrics,
2422
Settings settings,
2523
IAuditIngestionUnitOfWorkFactory unitOfWorkFactory,
2624
EndpointInstanceMonitoring endpointInstanceMonitoring,
@@ -32,26 +30,11 @@ ITransportCustomization transportCustomization
3230
{
3331
this.settings = settings;
3432
this.messageDispatcher = messageDispatcher;
35-
36-
var ingestedAuditMeter = metrics.GetCounter("Audit ingestion - ingested audit");
37-
var ingestedSagaAuditMeter = metrics.GetCounter("Audit ingestion - ingested saga audit");
38-
var auditBulkInsertDurationMeter = metrics.GetMeter("Audit ingestion - audit bulk insert duration", FrequencyInMilliseconds);
39-
var sagaAuditBulkInsertDurationMeter = metrics.GetMeter("Audit ingestion - saga audit bulk insert duration", FrequencyInMilliseconds);
40-
var bulkInsertCommitDurationMeter = metrics.GetMeter("Audit ingestion - bulk insert commit duration", FrequencyInMilliseconds);
41-
42-
var enrichers = new IEnrichImportedAuditMessages[]
43-
{
44-
new MessageTypeEnricher(),
45-
new EnrichWithTrackingIds(),
46-
new ProcessingStatisticsEnricher(),
47-
new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring),
48-
new DetectSuccessfulRetriesEnricher(),
49-
new SagaRelationshipsEnricher()
50-
}.Concat(auditEnrichers).ToArray();
33+
var enrichers = new IEnrichImportedAuditMessages[] { new MessageTypeEnricher(), new EnrichWithTrackingIds(), new ProcessingStatisticsEnricher(), new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring), new DetectSuccessfulRetriesEnricher(), new SagaRelationshipsEnricher() }.Concat(auditEnrichers).ToArray();
5134

5235
logQueueAddress = transportCustomization.ToTransportQualifiedQueueName(settings.AuditLogQueue);
5336

54-
auditPersister = new AuditPersister(unitOfWorkFactory, enrichers, ingestedAuditMeter, ingestedSagaAuditMeter, auditBulkInsertDurationMeter, sagaAuditBulkInsertDurationMeter, bulkInsertCommitDurationMeter, messageSession, messageDispatcher);
37+
auditPersister = new AuditPersister(unitOfWorkFactory, enrichers, messageSession, messageDispatcher);
5538
}
5639

5740
public async Task Ingest(List<MessageContext> contexts)
@@ -71,6 +54,7 @@ public async Task Ingest(List<MessageContext> contexts)
7154
{
7255
Log.Debug($"Forwarding {stored.Count} messages");
7356
}
57+
7458
await Forward(stored, logQueueAddress);
7559
if (Log.IsDebugEnabled)
7660
{
@@ -159,7 +143,6 @@ public async Task VerifyCanReachForwardingAddress()
159143
readonly Lazy<IMessageDispatcher> messageDispatcher;
160144
readonly string logQueueAddress;
161145

162-
static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000;
163146
static readonly ILog Log = LogManager.GetLogger<AuditIngestor>();
164147
}
165148
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
namespace ServiceControl.Audit.Auditing;
2+
3+
using System.Diagnostics.Metrics;
4+
5+
static class AuditMetrics
6+
{
7+
public static readonly Meter Meter = new("ServiceControl", "0.1.0");
8+
public static readonly string Prefix = "particular.servicecontrol.audit";
9+
}

src/ServiceControl.Audit/Auditing/AuditPersister.cs

Lines changed: 27 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System;
44
using System.Collections.Generic;
55
using System.Diagnostics;
6+
using System.Diagnostics.Metrics;
67
using System.Text.Json;
78
using System.Threading.Tasks;
89
using Infrastructure;
@@ -15,29 +16,13 @@
1516
using ServiceControl.Audit.Persistence.Monitoring;
1617
using ServiceControl.EndpointPlugin.Messages.SagaState;
1718
using ServiceControl.Infrastructure;
18-
using ServiceControl.Infrastructure.Metrics;
1919
using ServiceControl.SagaAudit;
2020

21-
class AuditPersister
21+
class AuditPersister(IAuditIngestionUnitOfWorkFactory unitOfWorkFactory,
22+
IEnrichImportedAuditMessages[] enrichers,
23+
IMessageSession messageSession,
24+
Lazy<IMessageDispatcher> messageDispatcher)
2225
{
23-
public AuditPersister(IAuditIngestionUnitOfWorkFactory unitOfWorkFactory,
24-
IEnrichImportedAuditMessages[] enrichers,
25-
Counter ingestedAuditMeter, Counter ingestedSagaAuditMeter, Meter auditBulkInsertDurationMeter,
26-
Meter sagaAuditBulkInsertDurationMeter, Meter bulkInsertCommitDurationMeter, IMessageSession messageSession,
27-
Lazy<IMessageDispatcher> messageDispatcher)
28-
{
29-
this.unitOfWorkFactory = unitOfWorkFactory;
30-
this.enrichers = enrichers;
31-
32-
this.ingestedAuditMeter = ingestedAuditMeter;
33-
this.ingestedSagaAuditMeter = ingestedSagaAuditMeter;
34-
this.auditBulkInsertDurationMeter = auditBulkInsertDurationMeter;
35-
this.sagaAuditBulkInsertDurationMeter = sagaAuditBulkInsertDurationMeter;
36-
this.bulkInsertCommitDurationMeter = bulkInsertCommitDurationMeter;
37-
this.messageSession = messageSession;
38-
this.messageDispatcher = messageDispatcher;
39-
}
40-
4126
public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageContext> contexts)
4227
{
4328
var stopwatch = Stopwatch.StartNew();
@@ -51,7 +36,6 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
5136
IAuditIngestionUnitOfWork unitOfWork = null;
5237
try
5338
{
54-
5539
// deliberately not using the using statement because we dispose async explicitly
5640
unitOfWork = await unitOfWorkFactory.StartNew(contexts.Count);
5741
var inserts = new List<Task>(contexts.Count);
@@ -89,12 +73,13 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
8973
Logger.Debug("Adding audit message for bulk storage");
9074
}
9175

92-
using (auditBulkInsertDurationMeter.Measure())
93-
{
94-
await unitOfWork.RecordProcessedMessage(processedMessage, context.Body);
95-
}
76+
var auditSw = Stopwatch.StartNew();
77+
await unitOfWork.RecordProcessedMessage(processedMessage, context.Body);
78+
auditSw.Stop();
79+
80+
auditBulkInsertDurationMeter.Record(auditSw.ElapsedMilliseconds);
9681

97-
ingestedAuditMeter.Mark();
82+
ingestedAuditMeter.Add(1);
9883
}
9984
else if (context.Extensions.TryGet(out SagaSnapshot sagaSnapshot))
10085
{
@@ -103,12 +88,13 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
10388
Logger.Debug("Adding SagaSnapshot message for bulk storage");
10489
}
10590

106-
using (sagaAuditBulkInsertDurationMeter.Measure())
107-
{
108-
await unitOfWork.RecordSagaSnapshot(sagaSnapshot);
109-
}
91+
var sagaSw = Stopwatch.StartNew();
92+
await unitOfWork.RecordSagaSnapshot(sagaSnapshot);
93+
sagaSw.Stop();
94+
95+
sagaAuditBulkInsertDurationMeter.Record(sagaSw.ElapsedMilliseconds);
11096

111-
ingestedSagaAuditMeter.Mark();
97+
ingestedSagaAuditMeter.Add(1);
11298
}
11399

114100
storedContexts.Add(context);
@@ -146,10 +132,10 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
146132
try
147133
{
148134
// this can throw even though dispose is never supposed to throw
149-
using (bulkInsertCommitDurationMeter.Measure())
150-
{
151-
await unitOfWork.DisposeAsync();
152-
}
135+
var commitSw = Stopwatch.StartNew();
136+
await unitOfWork.DisposeAsync();
137+
commitSw.Stop();
138+
bulkInsertCommitDurationMeter.Record(commitSw.ElapsedMilliseconds);
153139
}
154140
catch (Exception e)
155141
{
@@ -263,6 +249,7 @@ async Task ProcessAuditMessage(MessageContext context)
263249
{
264250
Logger.Debug($"Emitting {commandsToEmit.Count} commands and {messagesToEmit.Count} control messages.");
265251
}
252+
266253
foreach (var commandToEmit in commandsToEmit)
267254
{
268255
await messageSession.Send(commandToEmit);
@@ -301,16 +288,12 @@ await messageDispatcher.Value.Dispatch(new TransportOperations(messagesToEmit.To
301288
}
302289
}
303290

304-
readonly Counter ingestedAuditMeter;
305-
readonly Counter ingestedSagaAuditMeter;
306-
readonly Meter auditBulkInsertDurationMeter;
307-
readonly Meter sagaAuditBulkInsertDurationMeter;
308-
readonly Meter bulkInsertCommitDurationMeter;
309-
readonly IMessageSession messageSession;
310-
readonly Lazy<IMessageDispatcher> messageDispatcher;
291+
readonly Counter<long> ingestedAuditMeter = AuditMetrics.Meter.CreateCounter<long>($"{AuditMetrics.Prefix}.ingested_audit_messages"); // metrics.GetCounter("Audit ingestion - ingested audit");
292+
readonly Counter<long> ingestedSagaAuditMeter = AuditMetrics.Meter.CreateCounter<long>($"{AuditMetrics.Prefix}.ingested_saga_audits"); // metrics.GetCounter("Audit ingestion - ingested audit");
293+
readonly Histogram<double> auditBulkInsertDurationMeter = AuditMetrics.Meter.CreateHistogram<double>($"{AuditMetrics.Prefix}.audit_bulk_insert_duration_ms"); // metrics.GetCounter("Audit ingestion - ingested audit");
294+
readonly Histogram<double> sagaAuditBulkInsertDurationMeter = AuditMetrics.Meter.CreateHistogram<double>($"{AuditMetrics.Prefix}.saga_bulk_insert_duration_ms"); // metrics.GetCounter("Audit ingestion - ingested audit");
295+
readonly Histogram<double> bulkInsertCommitDurationMeter = AuditMetrics.Meter.CreateHistogram<double>($"{AuditMetrics.Prefix}.audit_commit_duration_ms"); // metrics.GetCounter("Audit ingestion - ingested audit");
311296

312-
readonly IEnrichImportedAuditMessages[] enrichers;
313-
readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory;
314297
static readonly ILog Logger = LogManager.GetLogger<AuditPersister>();
315298
}
316299
}

0 commit comments

Comments
 (0)