Skip to content

Commit 6932fb3

Browse files
committed
Add message type tag
1 parent cdc69ee commit 6932fb3

File tree

6 files changed

+28
-27
lines changed

6 files changed

+28
-27
lines changed

docs/telemetry.md

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,19 @@ The following metrics are available:
1818

1919
### Ingestion
2020

21-
- `sc.audit.ingestion.count` - Successful ingested audit message count
21+
- `sc.audit.ingestion.success` - Successful ingested audit message count
2222
- `sc.audit.ingestion.retry` - Retried audit message count
2323
- `sc.audit.ingestion.failed` - Failed audit message count
2424
- `sc.audit.ingestion.duration` - Audit message processing duration (in milliseconds)
2525
- `sc.audit.ingestion.message_size` - Audit message body size (in kilobytes)
26-
- `sc.audit.ingestion.forwarded_count` - Forwarded audit messages count
26+
- `sc.audit.ingestion.forwarded` - Forwarded audit messages count
2727

2828
### Batching
2929

3030
- `sc.audit.ingestion.batch_duration` - Batch processing duration (in milliseconds)
3131
- `sc.audit.ingestion.batch_size` - Batch size (number of messages)
3232
- `sc.audit.ingestion.consecutive_batch_failures` - Consecutive batch failures
3333

34-
### Storage
35-
36-
- `sc.audit.ingestion.audits_count` - Stored audit message count
37-
- `sc.audit.ingestion.sagas_count` - Stored sagas message count
38-
- `sc.audit.ingestion.commit_duration` - Storage unit of work commit duration (in milliseconds)
39-
4034
## Monitoring
4135

4236
No telemetry is currently available.

src/ServiceControl.Audit/Auditing/AuditIngestion.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati
187187
await channel.Writer.WriteAsync(messageContext, cancellationToken);
188188
await taskCompletionSource.Task;
189189

190-
successfulMessagesCounter.Add(1);
190+
successfulMessagesCounter.Add(1, Telemetry.GetIngestedMessageTags(messageContext.Headers));
191191
messageSize.Record(messageContext.Body.Length / 1024.0);
192192
}
193193
}
@@ -296,7 +296,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)
296296
readonly Histogram<long> auditBatchSize = Telemetry.Meter.CreateHistogram<long>(Telemetry.CreateInstrumentName("ingestion", "batch_size"), description: "Audit ingestion average batch size");
297297
readonly Histogram<double> auditBatchDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "batch_duration"), unit: "ms", "Average audit message batch processing duration");
298298
readonly Histogram<double> messageSize = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "message_size"), unit: "kilobytes", description: "Average audit message body size");
299-
readonly Counter<long> successfulMessagesCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "successful"), description: "Successful ingested audit message count");
299+
readonly Counter<long> successfulMessagesCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "success"), description: "Successful ingested audit message count");
300300
readonly Histogram<long> consecutiveBatchFailuresCounter = Telemetry.Meter.CreateHistogram<long>(Telemetry.CreateInstrumentName("ingestion", "consecutive_batch_failures"), unit: "count", description: "Consecutive audit ingestion batch failure");
301301
readonly Histogram<double> ingestionDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "duration"), unit: "ms", description: "Average incoming audit message processing duration");
302302
readonly Watchdog watchdog;

src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
using System.Threading;
1010
using System.Threading.Tasks;
1111
using Infrastructure;
12+
using NServiceBus;
1213
using NServiceBus.Logging;
1314
using NServiceBus.Transport;
1415
using ServiceControl.Audit.Persistence;
@@ -35,16 +36,19 @@ public AuditIngestionFaultPolicy(IFailedAuditStorage failedAuditStorage, Logging
3536

3637
public async Task<ErrorHandleResult> OnError(ErrorContext errorContext, CancellationToken cancellationToken = default)
3738
{
39+
var tags = Telemetry.GetIngestedMessageTags(errorContext.Message.Headers);
40+
3841
//Same as recoverability policy in NServiceBusFactory
3942
if (errorContext.ImmediateProcessingFailures < 3)
4043
{
41-
retryCounter.Add(1);
44+
retryCounter.Add(1, tags);
4245
return ErrorHandleResult.RetryRequired;
4346
}
4447

4548
await StoreFailedMessageDocument(errorContext, cancellationToken);
4649

47-
failedCounter.Add(1);
50+
failedCounter.Add(1, tags);
51+
4852
return ErrorHandleResult.Handled;
4953
}
5054

src/ServiceControl.Audit/Auditing/AuditIngestor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public async Task VerifyCanReachForwardingAddress()
131131
readonly Settings settings;
132132
readonly Lazy<IMessageDispatcher> messageDispatcher;
133133
readonly string logQueueAddress;
134-
readonly Counter<long> forwardedMessagesCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "forwarded_count"), description: "Audit ingestion forwarded message count");
134+
readonly Counter<long> forwardedMessagesCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "forwarded"), description: "Audit ingestion forwarded message count");
135135

136136
static readonly ILog Log = LogManager.GetLogger<AuditIngestor>();
137137
}

src/ServiceControl.Audit/Auditing/AuditPersister.cs

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
{
33
using System;
44
using System.Collections.Generic;
5-
using System.Diagnostics.Metrics;
65
using System.Text.Json;
76
using System.Threading.Tasks;
87
using Infrastructure;
@@ -61,14 +60,10 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
6160
}
6261

6362
await unitOfWork.RecordProcessedMessage(processedMessage, context.Body);
64-
65-
storedAuditsCounter.Add(1);
6663
}
6764
else if (context.Extensions.TryGet(out SagaSnapshot sagaSnapshot))
6865
{
6966
await unitOfWork.RecordSagaSnapshot(sagaSnapshot);
70-
71-
storedSagasCounter.Add(1);
7267
}
7368

7469
storedContexts.Add(context);
@@ -105,11 +100,8 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
105100

106101
try
107102
{
108-
using (new DurationRecorder(commitDuration))
109-
{
110-
// this can throw even though dispose is never supposed to throw
111-
await unitOfWork.DisposeAsync();
112-
}
103+
// this can throw even though dispose is never supposed to throw
104+
await unitOfWork.DisposeAsync();
113105
}
114106
catch (Exception e)
115107
{
@@ -252,10 +244,6 @@ await messageDispatcher.Value.Dispatch(new TransportOperations(messagesToEmit.To
252244
}
253245
}
254246

255-
readonly Counter<long> storedAuditsCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "audits_count"), description: "Stored audit message count");
256-
readonly Counter<long> storedSagasCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "sagas_count"), description: "Stored saga state count");
257-
readonly Histogram<double> commitDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "commit_duration"), unit: "ms", description: "Storage unit of work commit duration");
258-
259247
static readonly ILog Logger = LogManager.GetLogger<AuditPersister>();
260248
}
261249
}

src/ServiceControl.Audit/Infrastructure/Telemetry.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
namespace ServiceControl.Audit;
22

3+
using System.Collections.Generic;
4+
using System.Diagnostics;
35
using System.Diagnostics.Metrics;
6+
using NServiceBus;
47
using OpenTelemetry.Metrics;
58

69
static class Telemetry
@@ -14,4 +17,16 @@ public static void AddAuditIngestionMeters(this MeterProviderBuilder builder)
1417
{
1518
builder.AddMeter(MeterName);
1619
}
20+
21+
public static TagList GetIngestedMessageTags(IDictionary<string, string> headers)
22+
{
23+
var tags = new TagList();
24+
25+
if (headers.TryGetValue(Headers.EnclosedMessageTypes, out var messageType))
26+
{
27+
tags.Add("nservicebus.message_type", messageType);
28+
}
29+
30+
return tags;
31+
}
1732
}

0 commit comments

Comments
 (0)