Skip to content

Commit 8ec014d

Browse files
Merge pull request #7578 from Particular/samples-open-telemetry-metrics-shim-net10
Update Open-Telemetry Metrics Shim Sample to .NET 10
2 parents 117adba + 798f653 commit 8ec014d

File tree

9 files changed

+358
-0
lines changed

9 files changed

+358
-0
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net10.0</TargetFramework>
6+
<LangVersion>preview</LangVersion>
7+
<ImplicitUsings>enable</ImplicitUsings>
8+
</PropertyGroup>
9+
10+
<ItemGroup>
11+
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.*" />
12+
<PackageReference Include="Azure.Monitor.OpenTelemetry.Exporter" Version="1.*" />
13+
<PackageReference Include="NServiceBus" Version="10.0.0-alpha.1" />
14+
<PackageReference Include="Microsoft.ApplicationInsights" Version="2.*" />
15+
</ItemGroup>
16+
17+
</Project>
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Simulates busy (almost no delay) / quite time in a sine wave
2+
class LoadSimulator
3+
{
4+
public LoadSimulator(IEndpointInstance endpointInstance, TimeSpan minimumDelay, TimeSpan idleDuration)
5+
{
6+
this.endpointInstance = endpointInstance;
7+
this.minimumDelay = minimumDelay;
8+
this.idleDuration = TimeSpan.FromTicks(idleDuration.Ticks / 2);
9+
}
10+
11+
public void Start(CancellationToken cancellationToken)
12+
{
13+
_ = Task.Run(() => Loop(cancellationToken), cancellationToken);
14+
}
15+
16+
async Task Loop(CancellationToken cancellationToken)
17+
{
18+
try
19+
{
20+
while (!cancellationToken.IsCancellationRequested)
21+
{
22+
await Work(cancellationToken);
23+
var delay = NextDelay();
24+
await Task.Delay(delay, cancellationToken);
25+
}
26+
}
27+
catch (OperationCanceledException)
28+
{
29+
}
30+
}
31+
32+
TimeSpan NextDelay()
33+
{
34+
var angleInRadians = Math.PI / 180.0 * ++index;
35+
var delay = TimeSpan.FromMilliseconds(idleDuration.TotalMilliseconds * Math.Sin(angleInRadians));
36+
delay += idleDuration;
37+
delay += minimumDelay;
38+
return delay;
39+
}
40+
41+
Task Work(CancellationToken cancellationToken)
42+
{
43+
var sendOptions = new SendOptions();
44+
45+
sendOptions.RouteToThisEndpoint();
46+
47+
if (Random.Shared.Next(100) <= 10)
48+
{
49+
sendOptions.SetHeader("simulate-immediate-retry", bool.TrueString);
50+
}
51+
52+
if (Random.Shared.Next(100) <= 5)
53+
{
54+
sendOptions.SetHeader("simulate-failure", bool.TrueString);
55+
}
56+
57+
return endpointInstance.Send(new SomeMessage(), sendOptions, cancellationToken);
58+
}
59+
60+
public Task Stop(CancellationToken cancellationToken)
61+
{
62+
return Task.CompletedTask;
63+
}
64+
65+
IEndpointInstance endpointInstance;
66+
TimeSpan minimumDelay;
67+
TimeSpan idleDuration;
68+
int index;
69+
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
using System.Diagnostics;
2+
using System.Diagnostics.Metrics;
3+
using System.Reflection;
4+
using NServiceBus.Features;
5+
using NServiceBus.Settings;
6+
7+
#region metrics-shim
8+
9+
class EmitNServiceBusMetrics : Feature
10+
{
11+
public EmitNServiceBusMetrics()
12+
{
13+
EnableByDefault();
14+
}
15+
16+
protected override void Setup(FeatureConfigurationContext context)
17+
{
18+
if (context.Settings.GetOrDefault<bool>("Endpoint.SendOnly"))
19+
{
20+
// there are no metrics relevant for send only endpoints yet
21+
return;
22+
}
23+
24+
var queueName = context.LocalQueueAddress().BaseAddress;
25+
var discriminator = context.InstanceSpecificQueueAddress()?.Discriminator;
26+
27+
var recoverabilitySettings = (RecoverabilitySettings)typeof(RecoverabilitySettings).GetConstructor(
28+
BindingFlags.NonPublic | BindingFlags.Instance,
29+
null, new Type[] { typeof(SettingsHolder) },
30+
null).Invoke(new object[] { (SettingsHolder)context.Settings });
31+
32+
recoverabilitySettings.Immediate(i => i.OnMessageBeingRetried((m, _) => RecordRetry(m.Headers, queueName, discriminator, true)));
33+
recoverabilitySettings.Delayed(d => d.OnMessageBeingRetried((m, _) => RecordRetry(m.Headers, queueName, discriminator, false)));
34+
recoverabilitySettings.Failed(f => f.OnMessageSentToErrorQueue((m, _) => RecordFailure(m.Headers, queueName, discriminator)));
35+
36+
context.Pipeline.OnReceivePipelineCompleted((e, _) =>
37+
{
38+
e.TryGetMessageType(out var messageType);
39+
40+
var tags = CreateTags(queueName, discriminator, messageType);
41+
42+
ProcessingTime.Record((e.CompletedAt - e.StartedAt).TotalMilliseconds, tags);
43+
44+
if (e.TryGetDeliverAt(out DateTimeOffset startTime) || e.TryGetTimeSent(out startTime))
45+
{
46+
CriticalTime.Record((e.CompletedAt - startTime).TotalMilliseconds, tags);
47+
}
48+
49+
return Task.CompletedTask;
50+
});
51+
}
52+
53+
static Task RecordRetry(Dictionary<string, string> headers, string queueName, string discriminator, bool immediate)
54+
{
55+
headers.TryGetMessageType(out var messageType);
56+
57+
var tags = CreateTags(queueName, discriminator, messageType);
58+
59+
if (immediate)
60+
{
61+
ImmedidateRetries.Add(1, tags);
62+
}
63+
else
64+
{
65+
DelayedRetries.Add(1, tags);
66+
}
67+
Retries.Add(1, tags);
68+
69+
return Task.CompletedTask;
70+
}
71+
72+
static Task RecordFailure(Dictionary<string, string> headers, string queueName, string discriminator)
73+
{
74+
headers.TryGetMessageType(out var messageType);
75+
76+
MessageSentToErrorQueue.Add(1, CreateTags(queueName, discriminator, messageType));
77+
78+
return Task.CompletedTask;
79+
}
80+
81+
static TagList CreateTags(string queueName, string discriminator, string messageType)
82+
{
83+
var tags = new TagList(new KeyValuePair<string, object>[] { new(Tags.QueueName, queueName) });
84+
85+
if (!string.IsNullOrWhiteSpace(discriminator))
86+
{
87+
tags.Add(Tags.EndpointDiscriminator, discriminator);
88+
}
89+
90+
if (!string.IsNullOrWhiteSpace(messageType))
91+
{
92+
tags.Add(Tags.MessageType, messageType);
93+
}
94+
95+
return tags;
96+
}
97+
98+
static readonly Meter NServiceBusMeter = new Meter("NServiceBus.Core", "0.1.0");
99+
100+
public static readonly Counter<long> ImmedidateRetries =
101+
NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.immediate", description: "Number of immediate retries performed by the endpoint.");
102+
103+
public static readonly Counter<long> DelayedRetries =
104+
NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.delayed", description: "Number of delayed retries performed by the endpoint.");
105+
106+
public static readonly Counter<long> Retries =
107+
NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.retries", description: "Number of retries performed by the endpoint.");
108+
109+
public static readonly Counter<long> MessageSentToErrorQueue =
110+
NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.error", description: "Number of messages sent to the error queue.");
111+
112+
public static readonly Histogram<double> ProcessingTime =
113+
NServiceBusMeter.CreateHistogram<double>("nservicebus.messaging.processing_time", "ms", "The time in milliseconds between when the message was pulled from the queue until processed by the endpoint.");
114+
115+
public static readonly Histogram<double> CriticalTime =
116+
NServiceBusMeter.CreateHistogram<double>("nservicebus.messaging.critical_time", "ms", "The time in milliseconds between when the message was sent until processed by the endpoint.");
117+
118+
public static class Tags
119+
{
120+
public const string EndpointDiscriminator = "nservicebus.discriminator";
121+
public const string QueueName = "nservicebus.queue";
122+
public const string MessageType = "nservicebus.message_type";
123+
}
124+
}
125+
#endregion
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
static class Extensions
2+
{
3+
public static bool TryGetTimeSent(this ReceivePipelineCompleted completed, out DateTimeOffset timeSent)
4+
{
5+
var headers = completed.ProcessedMessage.Headers;
6+
if (headers.TryGetValue(Headers.TimeSent, out var timeSentString))
7+
{
8+
timeSent = DateTimeOffsetHelper.ToDateTimeOffset(timeSentString);
9+
return true;
10+
}
11+
timeSent = DateTimeOffset.MinValue;
12+
return false;
13+
}
14+
15+
public static bool TryGetDeliverAt(this ReceivePipelineCompleted completed, out DateTimeOffset deliverAt)
16+
{
17+
var headers = completed.ProcessedMessage.Headers;
18+
if (headers.TryGetValue(Headers.DeliverAt, out var deliverAtString))
19+
{
20+
deliverAt = DateTimeOffsetHelper.ToDateTimeOffset(deliverAtString);
21+
return true;
22+
}
23+
deliverAt = DateTimeOffset.MinValue;
24+
return false;
25+
}
26+
27+
public static bool TryGetMessageType(this ReceivePipelineCompleted completed, out string processedMessageType)
28+
=> completed.ProcessedMessage.Headers.TryGetMessageType(out processedMessageType);
29+
30+
internal static bool TryGetMessageType(this IReadOnlyDictionary<string, string> headers, out string processedMessageType)
31+
{
32+
if (headers.TryGetValue(Headers.EnclosedMessageTypes, out var enclosedMessageType))
33+
{
34+
processedMessageType = enclosedMessageType;
35+
return true;
36+
}
37+
processedMessageType = null;
38+
return false;
39+
}
40+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
using Azure.Monitor.OpenTelemetry.Exporter;
2+
using OpenTelemetry;
3+
using OpenTelemetry.Resources;
4+
using OpenTelemetry.Trace;
5+
using OpenTelemetry.Metrics;
6+
7+
var endpointName = "Samples.OpenTelemetry.MetricsShim";
8+
9+
Console.Title = endpointName;
10+
11+
var attributes = new Dictionary<string, object>
12+
{
13+
["service.name"] = endpointName,
14+
["service.instance.id"] = Guid.NewGuid().ToString(),
15+
};
16+
17+
var appInsightsConnectionString = "<YOUR CONNECTION STRING HERE>";
18+
19+
var resourceBuilder = ResourceBuilder.CreateDefault().AddAttributes(attributes);
20+
21+
#region enable-meters
22+
23+
var meterProvider = Sdk.CreateMeterProviderBuilder()
24+
.SetResourceBuilder(resourceBuilder)
25+
.AddMeter("NServiceBus.Core*")
26+
.AddAzureMonitorMetricExporter(o => o.ConnectionString = appInsightsConnectionString)
27+
.AddConsoleExporter()
28+
.Build();
29+
30+
#endregion
31+
32+
#region enable-open-telemetry
33+
var endpointConfiguration = new EndpointConfiguration(endpointName);
34+
endpointConfiguration.EnableOpenTelemetry();
35+
#endregion
36+
37+
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
38+
endpointConfiguration.UseTransport<LearningTransport>();
39+
var cancellation = new CancellationTokenSource();
40+
var endpointInstance = await Endpoint.Start(endpointConfiguration, cancellation.Token);
41+
42+
var simulator = new LoadSimulator(endpointInstance, TimeSpan.Zero, TimeSpan.FromSeconds(10));
43+
simulator.Start(cancellation.Token);
44+
45+
try
46+
{
47+
Console.WriteLine("Endpoint started. Press any key to send a message. Press ESC to stop");
48+
49+
while (Console.ReadKey(true).Key != ConsoleKey.Escape)
50+
{
51+
await endpointInstance.SendLocal(new SomeMessage(), cancellation.Token);
52+
}
53+
}
54+
finally
55+
{
56+
await simulator.Stop(cancellation.Token);
57+
await endpointInstance.Stop(cancellation.Token);
58+
meterProvider?.Dispose();
59+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
class SomeMessage : IMessage
2+
{
3+
4+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
class SomeMessageHandler : IHandleMessages<SomeMessage>
2+
{
3+
public async Task Handle(SomeMessage message, IMessageHandlerContext context)
4+
{
5+
await Task.Delay(Random.Shared.Next(50, 250), context.CancellationToken);
6+
7+
if (context.MessageHeaders.ContainsKey("simulate-failure"))
8+
{
9+
throw new Exception("Simulated failure");
10+
}
11+
12+
if (context.MessageHeaders.ContainsKey("simulate-immediate-retry") && !context.MessageHeaders.ContainsKey(Headers.ImmediateRetries))
13+
{
14+
throw new Exception("Simulated immediate retry");
15+
}
16+
17+
Console.WriteLine("Message processed");
18+
}
19+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
2+
Microsoft Visual Studio Solution File, Format Version 12.00
3+
# Visual Studio Version 17
4+
VisualStudioVersion = 17.2.32616.157
5+
MinimumVisualStudioVersion = 10.0.40219.1
6+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Endpoint", "Endpoint\Endpoint.csproj", "{E70FE4B3-B277-4483-B66F-4217C22C9D26}"
7+
EndProject
8+
Global
9+
GlobalSection(SolutionConfigurationPlatforms) = preSolution
10+
Debug|Any CPU = Debug|Any CPU
11+
Release|Any CPU = Release|Any CPU
12+
EndGlobalSection
13+
GlobalSection(ProjectConfigurationPlatforms) = postSolution
14+
{E70FE4B3-B277-4483-B66F-4217C22C9D26}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
15+
{E70FE4B3-B277-4483-B66F-4217C22C9D26}.Debug|Any CPU.Build.0 = Debug|Any CPU
16+
{E70FE4B3-B277-4483-B66F-4217C22C9D26}.Release|Any CPU.ActiveCfg = Release|Any CPU
17+
{E70FE4B3-B277-4483-B66F-4217C22C9D26}.Release|Any CPU.Build.0 = Release|Any CPU
18+
EndGlobalSection
19+
GlobalSection(SolutionProperties) = preSolution
20+
HideSolutionNode = FALSE
21+
EndGlobalSection
22+
GlobalSection(ExtensibilityGlobals) = postSolution
23+
SolutionGuid = {3E6F6C3D-DA9F-4A8B-B43A-FD9A6F678A67}
24+
EndGlobalSection
25+
EndGlobal

samples/open-telemetry/metrics-shim/Core_10/prerelease.txt

Whitespace-only changes.

0 commit comments

Comments
 (0)