Skip to content

Commit dfbe780

Browse files
committed
More cleanup
1 parent b443a9c commit dfbe780

File tree

8 files changed

+41
-77
lines changed

8 files changed

+41
-77
lines changed

src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
"ApiUrl": "http://localhost:8888/api",
1313
"Port": 8888,
1414
"PrintMetrics": false,
15-
"OtelMetricsUrl": null,
15+
"OtlpEndpointUrl": null,
1616
"Hostname": "localhost",
1717
"VirtualDirectory": "",
1818
"TransportType": "LearningTransport",

src/ServiceControl.Audit/App.config

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ These settings are only here so that we can debug ServiceControl while developin
88
<add key="ServiceControl.Audit/ServiceControlQueueAddress" value="Particular.ServiceControl" />
99
<add key="ServiceControl.Audit/HostName" value="localhost" />
1010
<add key="ServiceControl.Audit/DatabaseMaintenancePort" value="44445" />
11-
<add key="ServiceControl.Audit/OtelMetricsUrl" value="http://localhost:4317" />
11+
<add key="ServiceControl.Audit/OtlpEndpointUrl" value="http://localhost:4317" />
1212
<!-- DEVS - Pick a transport to run Auditing instance on -->
1313
<add key="ServiceControl.Audit/TransportType" value="LearningTransport" />
1414
<!--<add key="ServiceControl.Audit/TransportType" value="AmazonSQS" />-->

src/ServiceControl.Audit/Auditing/AuditIngestion.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
{
33
using System;
44
using System.Collections.Generic;
5-
using System.Diagnostics;
65
using System.Diagnostics.Metrics;
76
using System.Threading;
87
using System.Threading.Channels;
@@ -191,10 +190,11 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati
191190
var taskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
192191
messageContext.SetTaskCompletionSource(taskCompletionSource);
193192

194-
receivedAudits.Add(1);
195-
196193
await channel.Writer.WriteAsync(messageContext, cancellationToken);
197194
await taskCompletionSource.Task;
195+
196+
ingestedMessagesCounter.Add(1);
197+
messageSize.Record(messageContext.Body.Length / 1024.0);
198198
}
199199

200200
async Task Loop()
@@ -210,14 +210,14 @@ async Task Loop()
210210
while (channel.Reader.TryRead(out var context))
211211
{
212212
contexts.Add(context);
213-
auditMessageSize.Record(context.Body.Length / 1024.0);
214213
}
215214

216215
auditBatchSize.Record(contexts.Count);
217-
var sw = Stopwatch.StartNew();
218216

219-
await auditIngestor.Ingest(contexts);
220-
auditBatchDuration.Record(sw.ElapsedMilliseconds);
217+
using (new DurationRecorder(auditBatchDuration))
218+
{
219+
await auditIngestor.Ingest(contexts);
220+
}
221221
}
222222
catch (OperationCanceledException)
223223
{
@@ -259,8 +259,8 @@ async Task Loop()
259259
readonly Channel<MessageContext> channel;
260260
readonly Histogram<long> auditBatchSize = Telemetry.Meter.CreateHistogram<long>(Telemetry.CreateInstrumentName("ingestion.", "batch_size"));
261261
readonly Histogram<double> auditBatchDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion.", "batch_duration"), unit: "ms");
262-
readonly Histogram<double> auditMessageSize = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion.", "message_size"), unit: "kilobytes");
263-
readonly Counter<long> receivedAudits = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion.", "count"));
262+
readonly Histogram<double> messageSize = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion.", "message_size"), unit: "kilobytes");
263+
readonly Counter<long> ingestedMessagesCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion.", "count"));
264264
readonly Watchdog watchdog;
265265
readonly Task ingestionWorker;
266266
readonly IHostApplicationLifetime applicationLifetime;

src/ServiceControl.Audit/Auditing/AuditIngestor.cs

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
{
33
using System;
44
using System.Collections.Generic;
5+
using System.Diagnostics.Metrics;
56
using System.Linq;
67
using System.Threading.Tasks;
78
using Infrastructure.Settings;
@@ -43,27 +44,14 @@ ITransportCustomization transportCustomization
4344

4445
public async Task Ingest(List<MessageContext> contexts)
4546
{
46-
if (Log.IsDebugEnabled)
47-
{
48-
Log.Debug($"Ingesting {contexts.Count} message contexts");
49-
}
50-
5147
var stored = await auditPersister.Persist(contexts);
5248

5349
try
5450
{
5551
if (settings.ForwardAuditMessages)
5652
{
57-
if (Log.IsDebugEnabled)
58-
{
59-
Log.Debug($"Forwarding {stored.Count} messages");
60-
}
61-
6253
await Forward(stored, logQueueAddress);
63-
if (Log.IsDebugEnabled)
64-
{
65-
Log.Debug("Forwarded messages");
66-
}
54+
forwardedMessagesCounter.Add(stored.Count);
6755
}
6856

6957
foreach (var context in contexts)
@@ -73,10 +61,7 @@ public async Task Ingest(List<MessageContext> contexts)
7361
}
7462
catch (Exception e)
7563
{
76-
if (Log.IsWarnEnabled)
77-
{
78-
Log.Warn("Forwarding messages failed", e);
79-
}
64+
Log.Warn("Forwarding messages failed", e);
8065

8166
// making sure to rethrow so that all messages get marked as failed
8267
throw;
@@ -146,6 +131,7 @@ public async Task VerifyCanReachForwardingAddress()
146131
readonly Settings settings;
147132
readonly Lazy<IMessageDispatcher> messageDispatcher;
148133
readonly string logQueueAddress;
134+
readonly Counter<long> forwardedMessagesCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion.", "forwarded_count"));
149135

150136
static readonly ILog Log = LogManager.GetLogger<AuditIngestor>();
151137
}

src/ServiceControl.Audit/Auditing/AuditPersister.cs

Lines changed: 9 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
{
33
using System;
44
using System.Collections.Generic;
5-
using System.Diagnostics;
65
using System.Diagnostics.Metrics;
76
using System.Text.Json;
87
using System.Threading.Tasks;
@@ -25,13 +24,6 @@ class AuditPersister(IAuditIngestionUnitOfWorkFactory unitOfWorkFactory,
2524
{
2625
public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageContext> contexts)
2726
{
28-
var stopwatch = Stopwatch.StartNew();
29-
30-
if (Logger.IsDebugEnabled)
31-
{
32-
Logger.Debug($"Batch size {contexts.Count}");
33-
}
34-
3527
var storedContexts = new List<MessageContext>(contexts.Count);
3628
IAuditIngestionUnitOfWork unitOfWork = null;
3729
try
@@ -68,29 +60,15 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
6860
RecordKnownEndpoints(receivingEndpoint, knownEndpoints, processedMessage);
6961
}
7062

71-
if (Logger.IsDebugEnabled)
72-
{
73-
Logger.Debug("Adding audit message for bulk storage");
74-
}
75-
76-
var auditSw = Stopwatch.StartNew();
7763
await unitOfWork.RecordProcessedMessage(processedMessage, context.Body);
78-
auditBulkInsertDuration.Record(auditSw.ElapsedMilliseconds);
7964

80-
storedAudits.Add(1);
65+
storedAuditsCounter.Add(1);
8166
}
8267
else if (context.Extensions.TryGet(out SagaSnapshot sagaSnapshot))
8368
{
84-
if (Logger.IsDebugEnabled)
85-
{
86-
Logger.Debug("Adding SagaSnapshot message for bulk storage");
87-
}
88-
89-
var sagaSw = Stopwatch.StartNew();
9069
await unitOfWork.RecordSagaSnapshot(sagaSnapshot);
91-
sagaAuditBulkInsertDuration.Record(sagaSw.ElapsedMilliseconds);
9270

93-
storedSagas.Add(1);
71+
storedSagasCounter.Add(1);
9472
}
9573

9674
storedContexts.Add(context);
@@ -128,9 +106,10 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
128106
try
129107
{
130108
// this can throw even though dispose is never supposed to throw
131-
var commitSw = Stopwatch.StartNew();
132-
await unitOfWork.DisposeAsync();
133-
auditCommitDuration.Record(commitSw.ElapsedMilliseconds);
109+
using (new DurationRecorder(commitDuration))
110+
{
111+
await unitOfWork.DisposeAsync();
112+
}
134113
}
135114
catch (Exception e)
136115
{
@@ -142,16 +121,6 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
142121
// making sure to rethrow so that all messages get marked as failed
143122
throw;
144123
}
145-
finally
146-
{
147-
stopwatch.Stop();
148-
}
149-
}
150-
151-
stopwatch.Stop();
152-
if (Logger.IsDebugEnabled)
153-
{
154-
Logger.Debug($"Batch size {contexts.Count} took {stopwatch.ElapsedMilliseconds} ms");
155124
}
156125
}
157126

@@ -283,12 +252,9 @@ await messageDispatcher.Value.Dispatch(new TransportOperations(messagesToEmit.To
283252
}
284253
}
285254

286-
readonly Counter<long> storedAudits = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("storage.messages", "count"));
287-
readonly Histogram<double> auditBulkInsertDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("storage.messages", "insert_duration"), unit: "ms");
288-
readonly Histogram<double> auditCommitDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("storage.messages", "commit_duration"), unit: "ms");
289-
290-
readonly Counter<long> storedSagas = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("storage.sagas", "count"));
291-
readonly Histogram<double> sagaAuditBulkInsertDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("storage.sagas", "insert_duration"), unit: "ms");
255+
readonly Counter<long> storedAuditsCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "audits_count"));
256+
readonly Counter<long> storedSagasCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "sagas_count"));
257+
readonly Histogram<double> commitDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "commit_duration"), unit: "ms");
292258

293259
static readonly ILog Logger = LogManager.GetLogger<AuditPersister>();
294260
}

src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,11 @@ public static void AddServiceControlAudit(this IHostApplicationBuilder builder,
6868
NServiceBusFactory.Configure(settings, transportCustomization, transportSettings, onCriticalError, configuration);
6969
builder.UseNServiceBus(configuration);
7070

71-
if (!string.IsNullOrEmpty(settings.OtelMetricsUrl))
71+
if (!string.IsNullOrEmpty(settings.OtlpEndpointUrl))
7272
{
73-
if (!Uri.TryCreate(settings.OtelMetricsUrl, UriKind.Absolute, out var otelMetricsUri))
73+
if (!Uri.TryCreate(settings.OtlpEndpointUrl, UriKind.Absolute, out var otelMetricsUri))
7474
{
75-
throw new UriFormatException($"Invalid OtelMetricsUrl: {settings.OtelMetricsUrl}");
75+
throw new UriFormatException($"Invalid OtlpEndpointUrl: {settings.OtlpEndpointUrl}");
7676
}
7777

7878
builder.Services.AddOpenTelemetry()
@@ -90,7 +90,7 @@ public static void AddServiceControlAudit(this IHostApplicationBuilder builder,
9090
});
9191

9292
var logger = LogManager.GetLogger(typeof(HostApplicationBuilderExtensions));
93-
logger.InfoFormat("OpenTelemetry metrics exporter enabled: {0}", settings.OtelMetricsUrl);
93+
logger.InfoFormat("OpenTelemetry metrics exporter enabled: {0}", settings.OtlpEndpointUrl);
9494
}
9595

9696
// Configure after the NServiceBus hosted service to ensure NServiceBus is already started
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
namespace ServiceControl.Audit;
2+
3+
using System;
4+
using System.Diagnostics;
5+
using System.Diagnostics.Metrics;
6+
7+
record DurationRecorder(Histogram<double> Histogram) : IDisposable
8+
{
9+
readonly Stopwatch sw = Stopwatch.StartNew();
10+
11+
public void Dispose() => Histogram.Record(sw.ElapsedMilliseconds);
12+
}

src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public string RootUrl
109109
public int Port { get; set; }
110110

111111
public bool PrintMetrics => SettingsReader.Read<bool>(SettingsRootNamespace, "PrintMetrics");
112-
public string OtelMetricsUrl { get; set; } = SettingsReader.Read<string>(SettingsRootNamespace, nameof(OtelMetricsUrl));
112+
public string OtlpEndpointUrl { get; set; } = SettingsReader.Read<string>(SettingsRootNamespace, nameof(OtlpEndpointUrl));
113113
public string Hostname { get; private set; }
114114
public string VirtualDirectory => SettingsReader.Read(SettingsRootNamespace, "VirtualDirectory", string.Empty);
115115

0 commit comments

Comments
 (0)