Skip to content

Commit 593dacd

Browse files
authored
Add NServiceBus 10 pipeline session filtering sample (#7729)
* Cleanup and fixed DI issues that prevented the rotating session key from working. * Added NServiceBus 10 pipeline session filtering sample * Fixed snippet references * Didn't commit Core_8 changes
1 parent 6b4a72d commit 593dacd

23 files changed

+505
-157
lines changed
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
using System;
2+
using Microsoft.Extensions.DependencyInjection;
3+
using Microsoft.Extensions.Hosting;
4+
using NServiceBus;
5+
6+
Console.Title = "Receiver";
7+
var builder = Host.CreateApplicationBuilder();
8+
9+
builder.Services.AddSingleton<ISessionKeyProvider, RotatingSessionKeyProvider>();
10+
11+
var endpointConfiguration = new EndpointConfiguration("Receiver");
12+
endpointConfiguration.UsePersistence<LearningPersistence>();
13+
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
14+
endpointConfiguration.ApplySessionFilter();
15+
16+
endpointConfiguration.UseTransport(new LearningTransport());
17+
18+
19+
builder.UseNServiceBus(endpointConfiguration);
20+
21+
var host = builder.Build();
22+
23+
await host.StartAsync();
24+
25+
var sessionKeyProvider = host.Services.GetRequiredService<ISessionKeyProvider>();
26+
PrintMenu(sessionKeyProvider);
27+
28+
while (true)
29+
{
30+
var key = Console.ReadKey(true).Key;
31+
32+
if (key == ConsoleKey.Q)
33+
{
34+
break;
35+
}
36+
37+
if (key == ConsoleKey.C)
38+
{
39+
sessionKeyProvider.NextKey();
40+
PrintMenu(sessionKeyProvider);
41+
}
42+
}
43+
44+
await host.StopAsync();
45+
46+
static void PrintMenu(ISessionKeyProvider sessionKeyProvider)
47+
{
48+
Console.Clear();
49+
Console.WriteLine($"Session Key: {sessionKeyProvider.SessionKey}");
50+
Console.WriteLine("C) Change Session Key");
51+
Console.WriteLine("Q) Close");
52+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<OutputType>Exe</OutputType>
4+
<LangVersion>preview</LangVersion>
5+
<TargetFramework>net10.0</TargetFramework>
6+
</PropertyGroup>
7+
<ItemGroup>
8+
<ProjectReference Include="..\SessionFilter\Shared.csproj" />
9+
</ItemGroup>
10+
</Project>
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using Microsoft.Extensions.Logging;
4+
using NServiceBus;
5+
6+
namespace Receiver
7+
{
8+
class SomeMessageHandler(ILogger<SomeMessageHandler> logger) : IHandleMessages<SomeMessage>
9+
{
10+
public Task Handle(SomeMessage message, IMessageHandlerContext context)
11+
{
12+
logger.LogInformation("Got message {Counter}", message.Counter);
13+
return Task.CompletedTask;
14+
}
15+
}
16+
17+
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
using System;
2+
using Microsoft.Extensions.DependencyInjection;
3+
using Microsoft.Extensions.Hosting;
4+
using NServiceBus;
5+
6+
Console.Title = "Sender";
7+
var builder = Host.CreateApplicationBuilder();
8+
9+
#region register-session-key-provider
10+
builder.Services.AddSingleton<ISessionKeyProvider, RotatingSessionKeyProvider>();
11+
#endregion
12+
13+
var endpointConfiguration = new EndpointConfiguration("Sender");
14+
endpointConfiguration.UsePersistence<LearningPersistence>();
15+
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
16+
endpointConfiguration.ApplySessionFilter();
17+
18+
var routing = endpointConfiguration.UseTransport(new LearningTransport());
19+
20+
routing.RouteToEndpoint(
21+
typeof(SomeMessage),
22+
"Receiver"
23+
);
24+
25+
builder.UseNServiceBus(endpointConfiguration);
26+
27+
var host = builder.Build();
28+
29+
await host.StartAsync();
30+
31+
var sessionKeyProvider = host.Services.GetRequiredService<ISessionKeyProvider>();
32+
var messageSession = host.Services.GetRequiredService<IMessageSession>();
33+
PrintMenu(sessionKeyProvider);
34+
35+
var index = 1;
36+
37+
while (true)
38+
{
39+
var key = Console.ReadKey(true).Key;
40+
41+
if (key == ConsoleKey.Q)
42+
{
43+
break;
44+
}
45+
46+
if (key == ConsoleKey.C)
47+
{
48+
sessionKeyProvider.NextKey();
49+
PrintMenu(sessionKeyProvider);
50+
continue;
51+
}
52+
53+
54+
await messageSession.Send(new SomeMessage { Counter = index });
55+
Console.WriteLine($"Sent message {index++}");
56+
}
57+
58+
await host.StopAsync();
59+
60+
static void PrintMenu(ISessionKeyProvider sessionKeyProvider)
61+
{
62+
Console.Clear();
63+
Console.WriteLine($"Session Key: {sessionKeyProvider.SessionKey}");
64+
Console.WriteLine("C) Change Session Key");
65+
Console.WriteLine("Q) Close");
66+
Console.WriteLine("any other key to send a message");
67+
}
68+
69+
// class Program
70+
// {
71+
//
72+
// public static async Task Main(string[] args)
73+
// {
74+
// var host = CreateHostBuilder(args).Build();
75+
//
76+
// await host.StartAsync();
77+
//
78+
//
79+
//
80+
// await host.StopAsync();
81+
// }
82+
//
83+
// public static IHostBuilder CreateHostBuilder(string[] args)
84+
// {
85+
// var sessionKeyProvider = new RotatingSessionKeyProvider();
86+
// var builder = Host.CreateDefaultBuilder(args)
87+
// .ConfigureServices((hostContext, services) =>
88+
// {
89+
// Console.Title = "Sender";
90+
// services.AddSingleton(sessionKeyProvider); // Register the service
91+
//
92+
// }).UseNServiceBus(x =>
93+
// {
94+
// var endpointConfiguration = new EndpointConfiguration("c");
95+
//
96+
// endpointConfiguration.UsePersistence<LearningPersistence>();
97+
// endpointConfiguration.UseSerialization<SystemJsonSerializer>();
98+
// var routing = endpointConfiguration.UseTransport(new LearningTransport());
99+
//
100+
// routing.RouteToEndpoint(
101+
// typeof(SomeMessage),
102+
// "Samples.SessionFilter.Receiver"
103+
// );
104+
//
105+
// #region add-filter-behavior
106+
//
107+
// var logger = new LoggerFactory().CreateLogger<FilterIncomingMessages>();
108+
// x.
109+
// endpointConfiguration.ApplySessionFilter(sessionKeyProvider, logger);
110+
//
111+
// #endregion
112+
//
113+
// return endpointConfiguration;
114+
// });
115+
//
116+
// return builder;
117+
// }
118+
//
119+
//
120+
//
121+
//
122+
//
123+
// }
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<OutputType>Exe</OutputType>
4+
<LangVersion>preview</LangVersion>
5+
<TargetFramework>net10.0</TargetFramework>
6+
</PropertyGroup>
7+
<ItemGroup>
8+
<PackageReference Include="NServiceBus.Extensions.Hosting" Version="4.0.0-alpha.2" />
9+
</ItemGroup>
10+
<ItemGroup>
11+
<ProjectReference Include="..\SessionFilter\Shared.csproj" />
12+
</ItemGroup>
13+
</Project>
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
Microsoft Visual Studio Solution File, Format Version 12.00
2+
# Visual Studio Version 16
3+
VisualStudioVersion = 16.0.29728.190
4+
MinimumVisualStudioVersion = 15.0.26730.12
5+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Shared", "SessionFilter\Shared.csproj", "{4ADF8A12-97B9-44A2-8F1A-A858086AB3CB}"
6+
EndProject
7+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sender", "Sender\Sender.csproj", "{5963C171-0E3E-4BC3-91B6-D6852E5D1CDE}"
8+
EndProject
9+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Receiver", "Receiver\Receiver.csproj", "{462A355C-6758-44F0-A840-6D4494E31DC3}"
10+
EndProject
11+
Global
12+
GlobalSection(SolutionConfigurationPlatforms) = preSolution
13+
Debug|Any CPU = Debug|Any CPU
14+
EndGlobalSection
15+
GlobalSection(ProjectConfigurationPlatforms) = postSolution
16+
{4ADF8A12-97B9-44A2-8F1A-A858086AB3CB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
17+
{4ADF8A12-97B9-44A2-8F1A-A858086AB3CB}.Debug|Any CPU.Build.0 = Debug|Any CPU
18+
{5963C171-0E3E-4BC3-91B6-D6852E5D1CDE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
19+
{5963C171-0E3E-4BC3-91B6-D6852E5D1CDE}.Debug|Any CPU.Build.0 = Debug|Any CPU
20+
{462A355C-6758-44F0-A840-6D4494E31DC3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
21+
{462A355C-6758-44F0-A840-6D4494E31DC3}.Debug|Any CPU.Build.0 = Debug|Any CPU
22+
EndGlobalSection
23+
GlobalSection(SolutionProperties) = preSolution
24+
HideSolutionNode = FALSE
25+
EndGlobalSection
26+
GlobalSection(ExtensibilityGlobals) = postSolution
27+
SolutionGuid = {B3B8E2BC-972D-4F2A-AAAD-81BA9AAE8099}
28+
EndGlobalSection
29+
EndGlobal
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using NServiceBus.Pipeline;
4+
5+
#region apply-session-header
6+
public class ApplySessionFilterHeader : Behavior<IRoutingContext>
7+
{
8+
readonly ISessionKeyProvider sessionKeyProvider;
9+
10+
public ApplySessionFilterHeader(ISessionKeyProvider sessionKeyProvider)
11+
{
12+
this.sessionKeyProvider = sessionKeyProvider;
13+
}
14+
15+
public override Task Invoke(IRoutingContext context, Func<Task> next)
16+
{
17+
context.Message.Headers["NServiceBus.SessionKey"] = sessionKeyProvider.SessionKey;
18+
return next();
19+
}
20+
}
21+
#endregion
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
using Microsoft.Extensions.Logging;
2+
using NServiceBus;
3+
4+
public static class ConfigExtensions
5+
{
6+
#region config-extension
7+
public static void ApplySessionFilter(this EndpointConfiguration endpointConfiguration)
8+
{
9+
var pipeline = endpointConfiguration.Pipeline;
10+
pipeline.Register(typeof(ApplySessionFilterHeader), "Adds session key to outgoing messages");
11+
pipeline.Register(typeof(FilterIncomingMessages), "Filters out messages that don't match the current session key");
12+
}
13+
#endregion
14+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using Microsoft.Extensions.Logging;
4+
using NServiceBus.Pipeline;
5+
using NServiceBus.Transport;
6+
7+
#region filter-incoming-messages
8+
public class FilterIncomingMessages: Behavior<ITransportReceiveContext>
9+
{
10+
readonly ISessionKeyProvider sessionKeyProvider;
11+
private readonly ILogger<FilterIncomingMessages> logger;
12+
13+
public FilterIncomingMessages(ISessionKeyProvider sessionKeyProvider, ILogger<FilterIncomingMessages> logger)
14+
{
15+
this.sessionKeyProvider = sessionKeyProvider;
16+
this.logger = logger;
17+
}
18+
19+
public override async Task Invoke(ITransportReceiveContext context, Func<Task> next)
20+
{
21+
if (IsFromCurrentSession(context.Message))
22+
{
23+
await next();
24+
}
25+
else
26+
{
27+
logger.LogInformation("Dropping message {MessageId} as it does not match the current session", context.Message.MessageId);
28+
}
29+
}
30+
31+
bool IsFromCurrentSession(IncomingMessage message)
32+
=> message.Headers.TryGetValue("NServiceBus.SessionKey", out string sessionKey)
33+
&& sessionKey == sessionKeyProvider.SessionKey;
34+
35+
}
36+
#endregion
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
#region session-key-provider-interface
2+
public interface ISessionKeyProvider
3+
{
4+
void NextKey();
5+
string SessionKey { get; }
6+
}
7+
#endregion

0 commit comments

Comments
 (0)