Skip to content

Commit c6f3442

Browse files
committed
Adding metrics to envelopes
1 parent fd8670c commit c6f3442

File tree

8 files changed

+289
-9
lines changed

8 files changed

+289
-9
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry.Metrics;
2+
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using NServiceBus;
8+
using NServiceBus.AcceptanceTesting;
9+
using NServiceBus.Extensibility;
10+
using NServiceBus.Features;
11+
using NUnit.Framework;
12+
using Conventions = AcceptanceTesting.Customization.Conventions;
13+
14+
public class When_envelope_handler_fails : OpenTelemetryAcceptanceTest
15+
{
16+
[Test]
17+
public async Task Should_report_successful_message_metric()
18+
{
19+
using var metricsListener = TestingMetricListener.SetupNServiceBusMetricsListener();
20+
21+
_ = await Scenario.Define<Context>()
22+
.WithEndpoint<EndpointWithMetrics>(b => b.CustomConfig(x =>
23+
{
24+
x.EnableFeature<TestEnvelopeFeature>();
25+
})
26+
.When(async (session, ctx) =>
27+
{
28+
await session.SendLocal(new OutgoingMessage());
29+
}))
30+
.Done(c => c.OutgoingMessagesReceived == 1)
31+
.Run();
32+
33+
metricsListener.AssertMetric("nservicebus.envelope.unwrapping_error", 1);
34+
35+
metricsListener.AssertTags("nservicebus.envelope.unwrapping_error",
36+
new Dictionary<string, object>
37+
{
38+
["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)),
39+
["nservicebus.envelope.unwrapper_type"] = typeof(ThrowingHandler).FullName
40+
});
41+
}
42+
43+
class ThrowingHandler : IEnvelopeHandler
44+
{
45+
public (Dictionary<string, string> headers, ReadOnlyMemory<byte> body)? UnwrapEnvelope(string nativeMessageId, IDictionary<string, string> incomingHeaders,
46+
ContextBag extensions, ReadOnlyMemory<byte> incomingBody)
47+
{
48+
throw new InvalidOperationException("Some exception");
49+
}
50+
}
51+
52+
class TestEnvelopeFeature : Features.Feature
53+
{
54+
protected override void Setup(FeatureConfigurationContext context)
55+
{
56+
context.AddEnvelopeHandler<ThrowingHandler>();
57+
}
58+
}
59+
60+
class Context : ScenarioContext
61+
{
62+
public int OutgoingMessagesReceived;
63+
}
64+
65+
class EndpointWithMetrics : EndpointConfigurationBuilder
66+
{
67+
public EndpointWithMetrics() => EndpointSetup<OpenTelemetryEnabledEndpoint>();
68+
69+
public class MessageHandler : IHandleMessages<OutgoingMessage>
70+
{
71+
readonly Context testContext;
72+
73+
public MessageHandler(Context testContext) => this.testContext = testContext;
74+
75+
public Task Handle(OutgoingMessage message, IMessageHandlerContext context)
76+
{
77+
Interlocked.Increment(ref testContext.OutgoingMessagesReceived);
78+
return Task.CompletedTask;
79+
}
80+
}
81+
}
82+
83+
public class OutgoingMessage : IMessage
84+
{
85+
}
86+
}
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
namespace NServiceBus.Core.Tests.Envelopes;
2+
3+
using OpenTelemetry;
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Text;
7+
using Extensibility;
8+
using NUnit.Framework;
9+
using Transport;
10+
11+
12+
public class EnvelopeUnwrapperTests
13+
{
14+
string nativeId;
15+
Dictionary<string, string> originalHeaders;
16+
ReadOnlyMemory<byte> originalBody;
17+
MessageContext messageContext;
18+
TestMeterFactory meterFactory;
19+
IncomingPipelineMetrics incomingPipelineMetrics;
20+
List<IEnvelopeHandler> envelopeHandlers;
21+
22+
[SetUp]
23+
public void Setup()
24+
{
25+
nativeId = "native-1";
26+
originalHeaders = new()
27+
{
28+
["HeaderA"] = "ValueA"
29+
};
30+
originalBody = Encoding.UTF8.GetBytes("payload").AsMemory();
31+
messageContext = new MessageContext(nativeId, originalHeaders, originalBody, new TransportTransaction(), "receiveAddress", new ContextBag());
32+
meterFactory = new TestMeterFactory();
33+
incomingPipelineMetrics = new IncomingPipelineMetrics(meterFactory, "queue", "disc");
34+
}
35+
36+
[TearDown]
37+
public void TearDown()
38+
{
39+
meterFactory.Dispose();
40+
}
41+
42+
[Test]
43+
public void ReturnsDefaultIncomingMessageWhenNoHandlers()
44+
{
45+
envelopeHandlers = [];
46+
47+
IncomingMessage result = RunTest();
48+
49+
Assert.That(result.NativeMessageId, Is.EqualTo(nativeId));
50+
Assert.That(result.Headers, Is.EqualTo(originalHeaders));
51+
Assert.That(result.Body, Is.EqualTo(originalBody));
52+
}
53+
54+
[Test]
55+
public void ReturnsDefaultIncomingMessageWhenHandlersReturnNull()
56+
{
57+
envelopeHandlers = [
58+
new NullReturningHandler(),
59+
new NullReturningHandler(),
60+
];
61+
62+
IncomingMessage result = RunTest();
63+
64+
Assert.That(result.NativeMessageId, Is.EqualTo(nativeId));
65+
Assert.That(result.Headers, Is.EqualTo(originalHeaders));
66+
Assert.That(result.Body, Is.EqualTo(originalBody));
67+
}
68+
69+
[Test]
70+
public void ReturnsDefaultIncomingMessageWhenHandlersThrow()
71+
{
72+
envelopeHandlers = [
73+
new ThrowingHandler(),
74+
new ThrowingHandler(),
75+
];
76+
77+
IncomingMessage result = RunTest();
78+
79+
Assert.That(result.NativeMessageId, Is.EqualTo(nativeId));
80+
Assert.That(result.Headers, Is.EqualTo(originalHeaders));
81+
Assert.That(result.Body, Is.EqualTo(originalBody));
82+
}
83+
84+
[Test]
85+
public void ReturnsValueFromTheFirstSucceedingHandler()
86+
{
87+
var firstHeaders = new Dictionary<string, string>
88+
{
89+
["HeaderB"] = "ValueB"
90+
};
91+
var firstBody = new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes("firstPayload"));
92+
93+
var secondHeaders = new Dictionary<string, string>
94+
{
95+
["HeaderC"] = "ValueC"
96+
};
97+
var secondBody = new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes("secondPayload"));
98+
envelopeHandlers = [
99+
new NullReturningHandler(),
100+
new ThrowingHandler(),
101+
new ReturningHandler(firstHeaders, firstBody),
102+
new NullReturningHandler(),
103+
new ThrowingHandler(),
104+
new ReturningHandler(secondHeaders, secondBody)
105+
];
106+
107+
IncomingMessage result = RunTest();
108+
109+
Assert.That(result.NativeMessageId, Is.EqualTo(nativeId));
110+
Assert.That(result.Headers, Is.EqualTo(firstHeaders));
111+
Assert.That(result.Body, Is.EqualTo(firstBody));
112+
}
113+
114+
IncomingMessage RunTest()
115+
{
116+
return new EnvelopeUnwrapper(envelopeHandlers.ToArray(), incomingPipelineMetrics).UnwrapEnvelope(messageContext);
117+
}
118+
119+
class ReturningHandler : IEnvelopeHandler
120+
{
121+
readonly Dictionary<string, string> headersToReturn;
122+
readonly ReadOnlyMemory<byte> bodyToReturn;
123+
124+
public ReturningHandler(Dictionary<string, string> headersToReturn, ReadOnlyMemory<byte> bodyToReturn)
125+
{
126+
this.headersToReturn = headersToReturn;
127+
this.bodyToReturn = bodyToReturn;
128+
}
129+
130+
public (Dictionary<string, string> headers, ReadOnlyMemory<byte> body)? UnwrapEnvelope(string nativeMessageId, IDictionary<string, string> incomingHeaders,
131+
ContextBag extensions, ReadOnlyMemory<byte> incomingBody)
132+
{
133+
return (headersToReturn, bodyToReturn);
134+
}
135+
}
136+
137+
class NullReturningHandler : IEnvelopeHandler
138+
{
139+
public (Dictionary<string, string> headers, ReadOnlyMemory<byte> body)? UnwrapEnvelope(string nativeMessageId, IDictionary<string, string> incomingHeaders,
140+
ContextBag extensions, ReadOnlyMemory<byte> incomingBody)
141+
{
142+
return null;
143+
}
144+
}
145+
146+
class ThrowingHandler : IEnvelopeHandler
147+
{
148+
public (Dictionary<string, string> headers, ReadOnlyMemory<byte> body)? UnwrapEnvelope(string nativeMessageId, IDictionary<string, string> incomingHeaders,
149+
ContextBag extensions, ReadOnlyMemory<byte> incomingBody)
150+
{
151+
throw new InvalidOperationException("Some exception");
152+
}
153+
}
154+
}

src/NServiceBus.Core.Tests/NServiceBus.Core.Tests.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,8 @@
2727
<None Update="TestDlls/**/*.*" CopyToOutputDirectory="PreserveNewest" />
2828
</ItemGroup>
2929

30+
<ItemGroup>
31+
<Folder Include="Envelopes\" />
32+
</ItemGroup>
33+
3034
</Project>

src/NServiceBus.Core.Tests/Pipeline/MainPipelineExecutorTests.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,15 +123,16 @@ static MessageContext CreateMessageContext() =>
123123

124124
static MainPipelineExecutor CreateMainPipelineExecutor(ServiceProvider serviceProvider, IPipeline<ITransportReceiveContext> receivePipeline)
125125
{
126+
var incomingPipelineMetrics = new IncomingPipelineMetrics(new TestMeterFactory(), "queue", "disc");
126127
var executor = new MainPipelineExecutor(
127128
serviceProvider,
128129
new PipelineCache(serviceProvider, new PipelineModifications()),
129130
new TestableMessageOperations(),
130131
new Notification<ReceivePipelineCompleted>(),
131132
receivePipeline,
132133
new ActivityFactory(),
133-
new IncomingPipelineMetrics(new TestMeterFactory(), "queue", "disc"),
134-
new EnvelopeUnwrapper([]));
134+
incomingPipelineMetrics,
135+
new EnvelopeUnwrapper([], incomingPipelineMetrics));
135136

136137
return executor;
137138
}

src/NServiceBus.Core/Envelopes/EnvelopeComponent.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ namespace NServiceBus;
1010
class EnvelopeComponent(EnvelopeComponent.Settings settings)
1111
{
1212
public EnvelopeUnwrapper CreateUnwrapper(IServiceProvider serviceProvider)
13-
=> new([.. settings.HandlerFactories.Select(factory => factory(serviceProvider))]);
13+
=> new([.. settings.HandlerFactories.Select(factory => factory(serviceProvider))], serviceProvider.GetRequiredService<IncomingPipelineMetrics>());
1414

1515
public class Settings
1616
{

src/NServiceBus.Core/Envelopes/EnvelopeUnwrapper.cs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ namespace NServiceBus;
77
using Logging;
88
using Transport;
99

10-
class EnvelopeUnwrapper(IEnvelopeHandler[] envelopeHandlers)
10+
class EnvelopeUnwrapper(IEnvelopeHandler[] envelopeHandlers, IncomingPipelineMetrics metrics)
1111
{
1212
static IncomingMessage GetDefaultIncomingMessage(MessageContext messageContext) => new(messageContext.NativeMessageId, messageContext.Headers, messageContext.Body);
1313

@@ -19,8 +19,12 @@ internal IncomingMessage UnwrapEnvelope(MessageContext messageContext)
1919
{
2020
try
2121
{
22-
Log.Debug(
23-
$"Unwrapping the current message (NativeID: {messageContext.NativeMessageId} using {envelopeHandler.GetType().Name}");
22+
if (Log.IsDebugEnabled)
23+
{
24+
Log.Debug(
25+
$"Unwrapping the current message (NativeID: {messageContext.NativeMessageId} using {envelopeHandler.GetType().Name}");
26+
}
27+
2428
(Dictionary<string, string> headers, ReadOnlyMemory<byte> body)? unwrappingResult = envelopeHandler.UnwrapEnvelope(
2529
messageContext.NativeMessageId, messageContext.Headers, messageContext.Extensions,
2630
messageContext.Body);
@@ -32,13 +36,20 @@ internal IncomingMessage UnwrapEnvelope(MessageContext messageContext)
3236
}
3337
catch (Exception e)
3438
{
35-
Log.Warn($"Unwrapper {envelopeHandler} failed to unwrap the message {messageContext.NativeMessageId}", e);
39+
metrics.RecordEnvelopeUnwrappingError(messageContext, envelopeHandler);
40+
if (Log.IsWarnEnabled)
41+
{
42+
Log.Warn(
43+
$"Unwrapper {envelopeHandler} failed to unwrap the message {messageContext.NativeMessageId}",
44+
e);
45+
}
3646
}
3747
}
3848

39-
if (envelopeHandlers.Length > 0)
49+
if (Log.IsDebugEnabled)
4050
{
41-
Log.Info($"No envelope handler found for the current message (NativeID: {messageContext.NativeMessageId}, assuming the default NServiceBus format");
51+
Log.Debug(
52+
$"No envelope handler found for the current message (NativeID: {messageContext.NativeMessageId}, assuming the default NServiceBus format");
4253
}
4354

4455
return GetDefaultIncomingMessage(messageContext);

src/NServiceBus.Core/OpenTelemetry/Metrics/MeterTags.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ static class MeterTags
1111
public const string MessageHandlerType = "nservicebus.message_handler_type";
1212
public const string ExecutionResult = "execution.result";
1313
public const string ErrorType = "error.type";
14+
public const string EnvelopeUnwrapperType = "nservicebus.envelope.unwrapper_type";
1415
}

src/NServiceBus.Core/Pipeline/Incoming/IncomingPipelineMetrics.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ namespace NServiceBus;
66
using System.Collections.Generic;
77
using System.Diagnostics;
88
using System.Diagnostics.Metrics;
9+
using NServiceBus.Transport;
910
using Pipeline;
1011

1112
class IncomingPipelineMetrics
@@ -19,6 +20,7 @@ class IncomingPipelineMetrics
1920
const string RecoverabilityImmediate = "nservicebus.recoverability.immediate";
2021
const string RecoverabilityDelayed = "nservicebus.recoverability.delayed";
2122
const string RecoverabilityError = "nservicebus.recoverability.error";
23+
const string EnvelopeUnwrappingError = "nservicebus.envelope.unwrapping_error";
2224

2325
public IncomingPipelineMetrics(IMeterFactory meterFactory, string queueName, string discriminator)
2426
{
@@ -41,6 +43,8 @@ public IncomingPipelineMetrics(IMeterFactory meterFactory, string queueName, str
4143
description: "Total number of delayed retries requested.");
4244
totalSentToErrorQueue = meter.CreateCounter<long>(RecoverabilityError,
4345
description: "Total number of messages sent to the error queue.");
46+
totalEnvelopeUnwrappingErrors = meter.CreateCounter<long>(EnvelopeUnwrappingError,
47+
description: "Total number of messages unwrapped unsuccessfully by the endpoint.");
4448

4549
queueNameBase = queueName;
4650
endpointDiscriminator = discriminator;
@@ -239,6 +243,24 @@ public void RecordSendToErrorQueue(IRecoverabilityContext recoverabilityContext)
239243
totalSentToErrorQueue.Add(1, meterTags);
240244
}
241245

246+
public void RecordEnvelopeUnwrappingError(MessageContext messageContext, IEnvelopeHandler type)
247+
{
248+
if (!totalEnvelopeUnwrappingErrors.Enabled)
249+
{
250+
return;
251+
}
252+
253+
var incomingPipelineMetricTags = messageContext.Extensions.Get<IncomingPipelineMetricTags>();
254+
TagList meterTags;
255+
incomingPipelineMetricTags.ApplyTags(ref meterTags, [
256+
MeterTags.QueueName,
257+
MeterTags.EndpointDiscriminator,
258+
MeterTags.MessageType,
259+
MeterTags.MessageHandlerType]);
260+
meterTags.Add(new KeyValuePair<string, object?>(MeterTags.EnvelopeUnwrapperType, type.GetType().FullName));
261+
totalEnvelopeUnwrappingErrors.Add(1, meterTags);
262+
}
263+
242264
readonly Counter<long> totalProcessedSuccessfully;
243265
readonly Counter<long> totalFetched;
244266
readonly Counter<long> totalFailures;
@@ -248,6 +270,7 @@ public void RecordSendToErrorQueue(IRecoverabilityContext recoverabilityContext)
248270
readonly Counter<long> totalImmediateRetries;
249271
readonly Counter<long> totalDelayedRetries;
250272
readonly Counter<long> totalSentToErrorQueue;
273+
readonly Counter<long> totalEnvelopeUnwrappingErrors;
251274
string queueNameBase;
252275
string endpointDiscriminator;
253276
}

0 commit comments

Comments
 (0)