Skip to content

Commit ba4fb02

Browse files
Better names for exposed audit metrics (#4803)
* Include reading from channel in batch duration * Better name for the ingested count * Add message type tag * Move a few instruments to be attributes instead
1 parent c817d54 commit ba4fb02

File tree

8 files changed

+60
-52
lines changed

8 files changed

+60
-52
lines changed

docs/telemetry.md

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,28 @@ The following metrics are available:
1818

1919
### Ingestion
2020

21-
- `sc.audit.ingestion.count` - Successful ingested audit message count
22-
- `sc.audit.ingestion.retry` - Retried audit message count
23-
- `sc.audit.ingestion.failed` - Failed audit message count
24-
- `sc.audit.ingestion.duration` - Audit message processing duration (in milliseconds)
25-
- `sc.audit.ingestion.message_size` - Audit message body size (in kilobytes)
26-
- `sc.audit.ingestion.forwarded_count` - Forwarded audit messages count
21+
#### Success or failure
2722

28-
### Batching
23+
- `sc.audit.ingestion.success` - Successful ingested audit message count (Counter)
24+
- `sc.audit.ingestion.retry` - Retried audit message count (Counter)
25+
- `sc.audit.ingestion.failed` - Failed audit message count (Counter)
26+
27+
The above metrics also have the following attributes attached:
28+
29+
- `messaging.message.body.size` - The size of the message body in bytes
30+
- `messaging.message.type` - The logical message type of the message if present
2931

30-
- `sc.audit.ingestion.batch_duration` - Batch processing duration (in milliseconds)
31-
- `sc.audit.ingestion.batch_size` - Batch size (number of messages)
32-
- `sc.audit.ingestion.consecutive_batch_failures` - Consecutive batch failures
32+
#### Details
3333

34-
### Storage
34+
- `sc.audit.ingestion.duration` - Audit message processing duration in milliseconds (Histogram)
35+
- `sc.audit.ingestion.forwarded` - Count of the number of forwarded audit messages if forwarding is enabled (Counter)
36+
37+
### Batching
3538

36-
- `sc.audit.ingestion.audits_count` - Stored audit message count
37-
- `sc.audit.ingestion.sagas_count` - Stored sagas message count
38-
- `sc.audit.ingestion.commit_duration` - Storage unit of work commit duration (in milliseconds)
39+
- `sc.audit.ingestion.batch_duration` - Batch processing duration in milliseconds (Histogram)
40+
- Attributes:
41+
- `ingestion.batch_size`
42+
- `sc.audit.ingestion.consecutive_batch_failures` - Consecutive batch failures (Counter)
3943

4044
## Monitoring
4145

src/ServiceControl.Audit/Auditing/AuditIngestion.cs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,8 @@ async Task EnsureStopped(CancellationToken cancellationToken = default)
174174

175175
async Task OnMessage(MessageContext messageContext, CancellationToken cancellationToken)
176176
{
177-
using (new DurationRecorder(ingestionDuration))
177+
var tags = Telemetry.GetIngestedMessageTags(messageContext.Headers, messageContext.Body);
178+
using (new DurationRecorder(ingestionDuration, tags))
178179
{
179180
if (settings.MessageFilter != null && settings.MessageFilter(messageContext))
180181
{
@@ -187,8 +188,7 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati
187188
await channel.Writer.WriteAsync(messageContext, cancellationToken);
188189
await taskCompletionSource.Task;
189190

190-
ingestedMessagesCounter.Add(1);
191-
messageSize.Record(messageContext.Body.Length / 1024.0);
191+
successfulMessagesCounter.Add(1, tags);
192192
}
193193
}
194194

@@ -210,15 +210,15 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
210210
try
211211
{
212212
// as long as there is something to read this will fetch up to MaximumConcurrency items
213-
while (channel.Reader.TryRead(out var context))
213+
using (var recorder = new DurationRecorder(batchDuration))
214214
{
215-
contexts.Add(context);
216-
}
215+
while (channel.Reader.TryRead(out var context))
216+
{
217+
contexts.Add(context);
218+
}
217219

218-
auditBatchSize.Record(contexts.Count);
220+
recorder.Tags.Add("ingestion.batch_size", contexts.Count);
219221

220-
using (new DurationRecorder(auditBatchDuration))
221-
{
222222
await auditIngestor.Ingest(contexts);
223223
}
224224

@@ -293,10 +293,8 @@ public override async Task StopAsync(CancellationToken cancellationToken)
293293
readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory;
294294
readonly Settings settings;
295295
readonly Channel<MessageContext> channel;
296-
readonly Histogram<long> auditBatchSize = Telemetry.Meter.CreateHistogram<long>(Telemetry.CreateInstrumentName("ingestion", "batch_size"), description: "Audit ingestion average batch size");
297-
readonly Histogram<double> auditBatchDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "batch_duration"), unit: "ms", "Average audit message batch processing duration");
298-
readonly Histogram<double> messageSize = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "message_size"), unit: "kilobytes", description: "Average audit message body size");
299-
readonly Counter<long> ingestedMessagesCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "count"), description: "Successful ingested audit message count");
296+
readonly Histogram<double> batchDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "batch_duration"), unit: "ms", "Average audit message batch processing duration");
297+
readonly Counter<long> successfulMessagesCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "success"), description: "Successful ingested audit message count");
300298
readonly Histogram<long> consecutiveBatchFailuresCounter = Telemetry.Meter.CreateHistogram<long>(Telemetry.CreateInstrumentName("ingestion", "consecutive_batch_failures"), unit: "count", description: "Consecutive audit ingestion batch failure");
301299
readonly Histogram<double> ingestionDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "duration"), unit: "ms", description: "Average incoming audit message processing duration");
302300
readonly Watchdog watchdog;

src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
using Infrastructure;
1212
using NServiceBus.Logging;
1313
using NServiceBus.Transport;
14-
using ServiceControl.Audit.Persistence;
15-
using ServiceControl.Configuration;
14+
using Persistence;
15+
using Configuration;
1616
using ServiceControl.Infrastructure;
1717

1818
class AuditIngestionFaultPolicy
@@ -35,16 +35,19 @@ public AuditIngestionFaultPolicy(IFailedAuditStorage failedAuditStorage, Logging
3535

3636
public async Task<ErrorHandleResult> OnError(ErrorContext errorContext, CancellationToken cancellationToken = default)
3737
{
38+
var tags = Telemetry.GetIngestedMessageTags(errorContext.Message.Headers, errorContext.Message.Body);
39+
3840
//Same as recoverability policy in NServiceBusFactory
3941
if (errorContext.ImmediateProcessingFailures < 3)
4042
{
41-
retryCounter.Add(1);
43+
retryCounter.Add(1, tags);
4244
return ErrorHandleResult.RetryRequired;
4345
}
4446

4547
await StoreFailedMessageDocument(errorContext, cancellationToken);
4648

47-
failedCounter.Add(1);
49+
failedCounter.Add(1, tags);
50+
4851
return ErrorHandleResult.Handled;
4952
}
5053

src/ServiceControl.Audit/Auditing/AuditIngestor.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
using Persistence.UnitOfWork;
1515
using Recoverability;
1616
using SagaAudit;
17+
using ServiceControl.Infrastructure;
1718
using ServiceControl.Transports;
1819

1920
public class AuditIngestor
@@ -131,7 +132,7 @@ public async Task VerifyCanReachForwardingAddress()
131132
readonly Settings settings;
132133
readonly Lazy<IMessageDispatcher> messageDispatcher;
133134
readonly string logQueueAddress;
134-
readonly Counter<long> forwardedMessagesCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "forwarded_count"), description: "Audit ingestion forwarded message count");
135+
readonly Counter<long> forwardedMessagesCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "forwarded"), description: "Audit ingestion forwarded message count");
135136

136137
static readonly ILog Log = LogManager.GetLogger<AuditIngestor>();
137138
}

src/ServiceControl.Audit/Auditing/AuditPersister.cs

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
{
33
using System;
44
using System.Collections.Generic;
5-
using System.Diagnostics.Metrics;
65
using System.Text.Json;
76
using System.Threading.Tasks;
87
using Infrastructure;
@@ -61,14 +60,10 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
6160
}
6261

6362
await unitOfWork.RecordProcessedMessage(processedMessage, context.Body);
64-
65-
storedAuditsCounter.Add(1);
6663
}
6764
else if (context.Extensions.TryGet(out SagaSnapshot sagaSnapshot))
6865
{
6966
await unitOfWork.RecordSagaSnapshot(sagaSnapshot);
70-
71-
storedSagasCounter.Add(1);
7267
}
7368

7469
storedContexts.Add(context);
@@ -105,11 +100,8 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
105100

106101
try
107102
{
108-
using (new DurationRecorder(commitDuration))
109-
{
110-
// this can throw even though dispose is never supposed to throw
111-
await unitOfWork.DisposeAsync();
112-
}
103+
// this can throw even though dispose is never supposed to throw
104+
await unitOfWork.DisposeAsync();
113105
}
114106
catch (Exception e)
115107
{
@@ -252,10 +244,6 @@ await messageDispatcher.Value.Dispatch(new TransportOperations(messagesToEmit.To
252244
}
253245
}
254246

255-
readonly Counter<long> storedAuditsCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "audits_count"), description: "Stored audit message count");
256-
readonly Counter<long> storedSagasCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "sagas_count"), description: "Stored saga state count");
257-
readonly Histogram<double> commitDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "commit_duration"), unit: "ms", description: "Storage unit of work commit duration");
258-
259247
static readonly ILog Logger = LogManager.GetLogger<AuditPersister>();
260248
}
261249
}

src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ namespace ServiceControl.Audit;
1919
using NServiceBus.Transport;
2020
using Persistence;
2121
using Transports;
22+
using ServiceControl.Infrastructure;
2223
using OpenTelemetry.Metrics;
2324
using OpenTelemetry.Resources;
2425

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
namespace ServiceControl.Audit;
1+
namespace ServiceControl.Infrastructure;
22

33
using System;
44
using System.Diagnostics;
55
using System.Diagnostics.Metrics;
66

7-
record DurationRecorder(Histogram<double> Histogram) : IDisposable
7+
record DurationRecorder(Histogram<double> Histogram, TagList Tags = default) : IDisposable
88
{
99
readonly Stopwatch sw = Stopwatch.StartNew();
1010

11-
public void Dispose() => Histogram.Record(sw.ElapsedMilliseconds);
11+
public void Dispose() => Histogram.Record(sw.ElapsedMilliseconds, Tags);
1212
}
Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1-
namespace ServiceControl.Audit;
1+
namespace ServiceControl.Infrastructure;
22

3+
using System;
4+
using System.Collections.Generic;
5+
using System.Diagnostics;
36
using System.Diagnostics.Metrics;
7+
using NServiceBus;
48
using OpenTelemetry.Metrics;
59

610
static class Telemetry
@@ -10,8 +14,17 @@ static class Telemetry
1014

1115
public static string CreateInstrumentName(string instrumentNamespace, string instrumentName) => $"sc.audit.{instrumentNamespace}.{instrumentName}".ToLower();
1216

13-
public static void AddAuditIngestionMeters(this MeterProviderBuilder builder)
17+
public static void AddAuditIngestionMeters(this MeterProviderBuilder builder) => builder.AddMeter(MeterName);
18+
19+
public static TagList GetIngestedMessageTags(IDictionary<string, string> headers, ReadOnlyMemory<byte> body)
1420
{
15-
builder.AddMeter(MeterName);
21+
var tags = new TagList { { "messaging.message.body.size", body.Length } };
22+
23+
if (headers.TryGetValue(Headers.EnclosedMessageTypes, out var messageType))
24+
{
25+
tags.Add("messaging.message.type", messageType);
26+
}
27+
28+
return tags;
1629
}
1730
}

0 commit comments

Comments
 (0)