Skip to content

Commit fe8f5bb

Browse files
Create Core_10 Sample
1 parent 24148b5 commit fe8f5bb

File tree

8 files changed

+380
-0
lines changed

8 files changed

+380
-0
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFrameworks>net9.0;net8.0</TargetFrameworks>
6+
<LangVersion>12.0</LangVersion>
7+
</PropertyGroup>
8+
9+
<ItemGroup>
10+
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.*" />
11+
<PackageReference Include="Azure.Monitor.OpenTelemetry.Exporter" Version="1.*" />
12+
<PackageReference Include="NServiceBus" Version="9.*" />
13+
<PackageReference Include="Microsoft.ApplicationInsights" Version="2.*" />
14+
</ItemGroup>
15+
16+
</Project>
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using NServiceBus;
5+
6+
// Simulates busy (almost no delay) / quite time in a sine wave
7+
class LoadSimulator
8+
{
9+
public LoadSimulator(IEndpointInstance endpointInstance, TimeSpan minimumDelay, TimeSpan idleDuration)
10+
{
11+
this.endpointInstance = endpointInstance;
12+
this.minimumDelay = minimumDelay;
13+
this.idleDuration = TimeSpan.FromTicks(idleDuration.Ticks / 2);
14+
}
15+
16+
public void Start(CancellationToken cancellationToken)
17+
{
18+
_ = Task.Run(() => Loop(cancellationToken), cancellationToken);
19+
}
20+
21+
async Task Loop(CancellationToken cancellationToken)
22+
{
23+
try
24+
{
25+
while (!cancellationToken.IsCancellationRequested)
26+
{
27+
await Work(cancellationToken);
28+
var delay = NextDelay();
29+
await Task.Delay(delay, cancellationToken);
30+
}
31+
}
32+
catch (OperationCanceledException)
33+
{
34+
}
35+
}
36+
37+
TimeSpan NextDelay()
38+
{
39+
var angleInRadians = Math.PI / 180.0 * ++index;
40+
var delay = TimeSpan.FromMilliseconds(idleDuration.TotalMilliseconds * Math.Sin(angleInRadians));
41+
delay += idleDuration;
42+
delay += minimumDelay;
43+
return delay;
44+
}
45+
46+
Task Work(CancellationToken cancellationToken)
47+
{
48+
var sendOptions = new SendOptions();
49+
50+
sendOptions.RouteToThisEndpoint();
51+
52+
if (Random.Shared.Next(100) <= 10)
53+
{
54+
sendOptions.SetHeader("simulate-immediate-retry", bool.TrueString);
55+
}
56+
57+
if (Random.Shared.Next(100) <= 5)
58+
{
59+
sendOptions.SetHeader("simulate-failure", bool.TrueString);
60+
}
61+
62+
return endpointInstance.Send(new SomeMessage(), sendOptions, cancellationToken);
63+
}
64+
65+
public Task Stop(CancellationToken cancellationToken)
66+
{
67+
return Task.CompletedTask;
68+
}
69+
70+
IEndpointInstance endpointInstance;
71+
TimeSpan minimumDelay;
72+
TimeSpan idleDuration;
73+
int index;
74+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Diagnostics;
4+
using System.Diagnostics.Metrics;
5+
using System.Reflection;
6+
using System.Threading.Tasks;
7+
using NServiceBus;
8+
using NServiceBus.Features;
9+
using NServiceBus.Settings;
10+
11+
#region metrics-shim
12+
13+
class EmitNServiceBusMetrics : Feature
14+
{
15+
public EmitNServiceBusMetrics()
16+
{
17+
EnableByDefault();
18+
}
19+
20+
protected override void Setup(FeatureConfigurationContext context)
21+
{
22+
if (context.Settings.GetOrDefault<bool>("Endpoint.SendOnly"))
23+
{
24+
// there are no metrics relevant for send only endpoints yet
25+
return;
26+
}
27+
28+
var queueName = context.LocalQueueAddress().BaseAddress;
29+
var discriminator = context.InstanceSpecificQueueAddress()?.Discriminator;
30+
31+
var recoverabilitySettings = (RecoverabilitySettings)typeof(RecoverabilitySettings).GetConstructor(
32+
BindingFlags.NonPublic | BindingFlags.Instance,
33+
null, new Type[] { typeof(SettingsHolder) },
34+
null).Invoke(new object[] { (SettingsHolder)context.Settings });
35+
36+
recoverabilitySettings.Immediate(i => i.OnMessageBeingRetried((m, _) => RecordRetry(m.Headers, queueName, discriminator, true)));
37+
recoverabilitySettings.Delayed(d => d.OnMessageBeingRetried((m, _) => RecordRetry(m.Headers, queueName, discriminator, false)));
38+
recoverabilitySettings.Failed(f => f.OnMessageSentToErrorQueue((m, _) => RecordFailure(m.Headers, queueName, discriminator)));
39+
40+
context.Pipeline.OnReceivePipelineCompleted((e, _) =>
41+
{
42+
e.TryGetMessageType(out var messageType);
43+
44+
var tags = CreateTags(queueName, discriminator, messageType);
45+
46+
ProcessingTime.Record((e.CompletedAt - e.StartedAt).TotalMilliseconds, tags);
47+
48+
if (e.TryGetDeliverAt(out DateTimeOffset startTime) || e.TryGetTimeSent(out startTime))
49+
{
50+
CriticalTime.Record((e.CompletedAt - startTime).TotalMilliseconds, tags);
51+
}
52+
53+
return Task.CompletedTask;
54+
});
55+
}
56+
57+
static Task RecordRetry(Dictionary<string, string> headers, string queueName, string discriminator, bool immediate)
58+
{
59+
headers.TryGetMessageType(out var messageType);
60+
61+
var tags = CreateTags(queueName, discriminator, messageType);
62+
63+
if (immediate)
64+
{
65+
ImmedidateRetries.Add(1, tags);
66+
}
67+
else
68+
{
69+
DelayedRetries.Add(1, tags);
70+
}
71+
Retries.Add(1, tags);
72+
73+
return Task.CompletedTask;
74+
}
75+
76+
static Task RecordFailure(Dictionary<string, string> headers, string queueName, string discriminator)
77+
{
78+
headers.TryGetMessageType(out var messageType);
79+
80+
MessageSentToErrorQueue.Add(1, CreateTags(queueName, discriminator, messageType));
81+
82+
return Task.CompletedTask;
83+
}
84+
85+
static TagList CreateTags(string queueName, string discriminator, string messageType)
86+
{
87+
var tags = new TagList(new KeyValuePair<string, object>[] { new(Tags.QueueName, queueName) });
88+
89+
if (!string.IsNullOrWhiteSpace(discriminator))
90+
{
91+
tags.Add(Tags.EndpointDiscriminator, discriminator);
92+
}
93+
94+
if (!string.IsNullOrWhiteSpace(messageType))
95+
{
96+
tags.Add(Tags.MessageType, messageType);
97+
}
98+
99+
return tags;
100+
}
101+
102+
static readonly Meter NServiceBusMeter = new Meter("NServiceBus.Core", "0.1.0");
103+
104+
public static readonly Counter<long> ImmedidateRetries =
105+
NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.immediate", description: "Number of immediate retries performed by the endpoint.");
106+
107+
public static readonly Counter<long> DelayedRetries =
108+
NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.delayed", description: "Number of delayed retries performed by the endpoint.");
109+
110+
public static readonly Counter<long> Retries =
111+
NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.retries", description: "Number of retries performed by the endpoint.");
112+
113+
public static readonly Counter<long> MessageSentToErrorQueue =
114+
NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.error", description: "Number of messages sent to the error queue.");
115+
116+
public static readonly Histogram<double> ProcessingTime =
117+
NServiceBusMeter.CreateHistogram<double>("nservicebus.messaging.processing_time", "ms", "The time in milliseconds between when the message was pulled from the queue until processed by the endpoint.");
118+
119+
public static readonly Histogram<double> CriticalTime =
120+
NServiceBusMeter.CreateHistogram<double>("nservicebus.messaging.critical_time", "ms", "The time in milliseconds between when the message was sent until processed by the endpoint.");
121+
122+
public static class Tags
123+
{
124+
public const string EndpointDiscriminator = "nservicebus.discriminator";
125+
public const string QueueName = "nservicebus.queue";
126+
public const string MessageType = "nservicebus.message_type";
127+
}
128+
}
129+
#endregion
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using NServiceBus;
4+
5+
static class Extensions
6+
{
7+
public static bool TryGetTimeSent(this ReceivePipelineCompleted completed, out DateTimeOffset timeSent)
8+
{
9+
var headers = completed.ProcessedMessage.Headers;
10+
if (headers.TryGetValue(Headers.TimeSent, out var timeSentString))
11+
{
12+
timeSent = DateTimeOffsetHelper.ToDateTimeOffset(timeSentString);
13+
return true;
14+
}
15+
timeSent = DateTimeOffset.MinValue;
16+
return false;
17+
}
18+
19+
public static bool TryGetDeliverAt(this ReceivePipelineCompleted completed, out DateTimeOffset deliverAt)
20+
{
21+
var headers = completed.ProcessedMessage.Headers;
22+
if (headers.TryGetValue(Headers.DeliverAt, out var deliverAtString))
23+
{
24+
deliverAt = DateTimeOffsetHelper.ToDateTimeOffset(deliverAtString);
25+
return true;
26+
}
27+
deliverAt = DateTimeOffset.MinValue;
28+
return false;
29+
}
30+
31+
public static bool TryGetMessageType(this ReceivePipelineCompleted completed, out string processedMessageType)
32+
=> completed.ProcessedMessage.Headers.TryGetMessageType(out processedMessageType);
33+
34+
internal static bool TryGetMessageType(this IReadOnlyDictionary<string, string> headers, out string processedMessageType)
35+
{
36+
if (headers.TryGetValue(Headers.EnclosedMessageTypes, out var enclosedMessageType))
37+
{
38+
processedMessageType = enclosedMessageType;
39+
return true;
40+
}
41+
processedMessageType = null;
42+
return false;
43+
}
44+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
using Azure.Monitor.OpenTelemetry.Exporter;
2+
using NServiceBus;
3+
using System;
4+
using OpenTelemetry;
5+
using OpenTelemetry.Resources;
6+
using OpenTelemetry.Trace;
7+
using System.Collections.Generic;
8+
using System.Threading;
9+
using OpenTelemetry.Metrics;
10+
11+
var endpointName = "Samples.OpenTelemetry.MetricsShim";
12+
13+
Console.Title = endpointName;
14+
15+
var attributes = new Dictionary<string, object>
16+
{
17+
["service.name"] = endpointName,
18+
["service.instance.id"] = Guid.NewGuid().ToString(),
19+
};
20+
21+
var appInsightsConnectionString = "<YOUR CONNECTION STRING HERE>";
22+
23+
var resourceBuilder = ResourceBuilder.CreateDefault().AddAttributes(attributes);
24+
25+
#region enable-meters
26+
27+
var meterProvider = Sdk.CreateMeterProviderBuilder()
28+
.SetResourceBuilder(resourceBuilder)
29+
.AddMeter("NServiceBus.Core*")
30+
.AddAzureMonitorMetricExporter(o => o.ConnectionString = appInsightsConnectionString)
31+
.AddConsoleExporter()
32+
.Build();
33+
34+
#endregion
35+
36+
#region enable-open-telemetry
37+
var endpointConfiguration = new EndpointConfiguration(endpointName);
38+
endpointConfiguration.EnableOpenTelemetry();
39+
#endregion
40+
41+
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
42+
endpointConfiguration.UseTransport<LearningTransport>();
43+
var cancellation = new CancellationTokenSource();
44+
var endpointInstance = await Endpoint.Start(endpointConfiguration, cancellation.Token);
45+
46+
var simulator = new LoadSimulator(endpointInstance, TimeSpan.Zero, TimeSpan.FromSeconds(10));
47+
simulator.Start(cancellation.Token);
48+
49+
try
50+
{
51+
Console.WriteLine("Endpoint started. Press any key to send a message. Press ESC to stop");
52+
53+
while (Console.ReadKey(true).Key != ConsoleKey.Escape)
54+
{
55+
await endpointInstance.SendLocal(new SomeMessage(), cancellation.Token);
56+
}
57+
}
58+
finally
59+
{
60+
await simulator.Stop(cancellation.Token);
61+
await endpointInstance.Stop(cancellation.Token);
62+
meterProvider?.Dispose();
63+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
using NServiceBus;
2+
3+
class SomeMessage : IMessage
4+
{
5+
6+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using NServiceBus;
4+
5+
class SomeMessageHandler : IHandleMessages<SomeMessage>
6+
{
7+
public async Task Handle(SomeMessage message, IMessageHandlerContext context)
8+
{
9+
await Task.Delay(Random.Shared.Next(50, 250), context.CancellationToken);
10+
11+
if (context.MessageHeaders.ContainsKey("simulate-failure"))
12+
{
13+
throw new Exception("Simulated failure");
14+
}
15+
16+
if (context.MessageHeaders.ContainsKey("simulate-immediate-retry") && !context.MessageHeaders.ContainsKey(Headers.ImmediateRetries))
17+
{
18+
throw new Exception("Simulated immediate retry");
19+
}
20+
21+
Console.WriteLine("Message processed");
22+
}
23+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
2+
Microsoft Visual Studio Solution File, Format Version 12.00
3+
# Visual Studio Version 17
4+
VisualStudioVersion = 17.2.32616.157
5+
MinimumVisualStudioVersion = 10.0.40219.1
6+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Endpoint", "Endpoint\Endpoint.csproj", "{E70FE4B3-B277-4483-B66F-4217C22C9D26}"
7+
EndProject
8+
Global
9+
GlobalSection(SolutionConfigurationPlatforms) = preSolution
10+
Debug|Any CPU = Debug|Any CPU
11+
Release|Any CPU = Release|Any CPU
12+
EndGlobalSection
13+
GlobalSection(ProjectConfigurationPlatforms) = postSolution
14+
{E70FE4B3-B277-4483-B66F-4217C22C9D26}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
15+
{E70FE4B3-B277-4483-B66F-4217C22C9D26}.Debug|Any CPU.Build.0 = Debug|Any CPU
16+
{E70FE4B3-B277-4483-B66F-4217C22C9D26}.Release|Any CPU.ActiveCfg = Release|Any CPU
17+
{E70FE4B3-B277-4483-B66F-4217C22C9D26}.Release|Any CPU.Build.0 = Release|Any CPU
18+
EndGlobalSection
19+
GlobalSection(SolutionProperties) = preSolution
20+
HideSolutionNode = FALSE
21+
EndGlobalSection
22+
GlobalSection(ExtensibilityGlobals) = postSolution
23+
SolutionGuid = {3E6F6C3D-DA9F-4A8B-B43A-FD9A6F678A67}
24+
EndGlobalSection
25+
EndGlobal

0 commit comments

Comments
 (0)