Skip to content

Commit 321be38

Browse files
Merge pull request #7573 from Particular/update-message-forwarding-sample
Update message forwarding sample
2 parents e24cc7e + a3f5b3e commit 321be38

17 files changed

+294
-1
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
2+
Microsoft Visual Studio Solution File, Format Version 12.00
3+
# Visual Studio Version 16
4+
VisualStudioVersion = 16.0.29728.190
5+
MinimumVisualStudioVersion = 15.0.26730.12
6+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sender", "Sender\Sender.csproj", "{59989859-6857-4DBD-A3D7-2A92DAE9D3C8}"
7+
EndProject
8+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Messages", "Messages\Messages.csproj", "{4567444D-738F-4019-9184-21274FE3983B}"
9+
EndProject
10+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OriginalDestination", "OriginalDestination\OriginalDestination.csproj", "{2FAFD933-5518-49C4-98E5-AD65803E584A}"
11+
EndProject
12+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "UpgradedDestination", "UpgradedDestination\UpgradedDestination.csproj", "{78BBEBC7-F9F5-4E4C-B918-6E11AC32FBC1}"
13+
EndProject
14+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NServiceBus.MessageForwarding", "NServiceBus.MessageForwarding\NServiceBus.MessageForwarding.csproj", "{592371F0-4FC8-4162-B5F0-B9C9CB9D5105}"
15+
EndProject
16+
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Sample", "Sample", "{63EC3285-2C21-4B81-8041-E4AA8C8559D0}"
17+
EndProject
18+
Global
19+
GlobalSection(SolutionConfigurationPlatforms) = preSolution
20+
Debug|Any CPU = Debug|Any CPU
21+
EndGlobalSection
22+
GlobalSection(ProjectConfigurationPlatforms) = postSolution
23+
{59989859-6857-4DBD-A3D7-2A92DAE9D3C8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
24+
{59989859-6857-4DBD-A3D7-2A92DAE9D3C8}.Debug|Any CPU.Build.0 = Debug|Any CPU
25+
{4567444D-738F-4019-9184-21274FE3983B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
26+
{4567444D-738F-4019-9184-21274FE3983B}.Debug|Any CPU.Build.0 = Debug|Any CPU
27+
{2FAFD933-5518-49C4-98E5-AD65803E584A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
28+
{2FAFD933-5518-49C4-98E5-AD65803E584A}.Debug|Any CPU.Build.0 = Debug|Any CPU
29+
{78BBEBC7-F9F5-4E4C-B918-6E11AC32FBC1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
30+
{78BBEBC7-F9F5-4E4C-B918-6E11AC32FBC1}.Debug|Any CPU.Build.0 = Debug|Any CPU
31+
{592371F0-4FC8-4162-B5F0-B9C9CB9D5105}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
32+
{592371F0-4FC8-4162-B5F0-B9C9CB9D5105}.Debug|Any CPU.Build.0 = Debug|Any CPU
33+
EndGlobalSection
34+
GlobalSection(SolutionProperties) = preSolution
35+
HideSolutionNode = FALSE
36+
EndGlobalSection
37+
GlobalSection(NestedProjects) = preSolution
38+
{59989859-6857-4DBD-A3D7-2A92DAE9D3C8} = {63EC3285-2C21-4B81-8041-E4AA8C8559D0}
39+
{4567444D-738F-4019-9184-21274FE3983B} = {63EC3285-2C21-4B81-8041-E4AA8C8559D0}
40+
{2FAFD933-5518-49C4-98E5-AD65803E584A} = {63EC3285-2C21-4B81-8041-E4AA8C8559D0}
41+
{78BBEBC7-F9F5-4E4C-B918-6E11AC32FBC1} = {63EC3285-2C21-4B81-8041-E4AA8C8559D0}
42+
EndGlobalSection
43+
GlobalSection(ExtensibilityGlobals) = postSolution
44+
SolutionGuid = {ABEA9BEB-1A5B-429A-AA98-A581901D6770}
45+
EndGlobalSection
46+
EndGlobal
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using NServiceBus;
2+
3+
namespace Messages
4+
{
5+
public class ImportantMessage :
6+
ICommand
7+
{
8+
public string Text { get; set; }
9+
}
10+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<TargetFramework>net10.0</TargetFramework>
4+
<LangVersion>preview</LangVersion>
5+
</PropertyGroup>
6+
<ItemGroup>
7+
<PackageReference Include="NServiceBus" Version="10.0.0-alpha.1" />
8+
</ItemGroup>
9+
</Project>
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
using NServiceBus.Pipeline;
2+
using NServiceBus.Routing;
3+
using NServiceBus.Transport;
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Threading.Tasks;
7+
8+
#region forward-processed-messages-behavior
9+
class ForwardProcessedMessagesBehavior : ForkConnector<IIncomingPhysicalMessageContext, IRoutingContext>
10+
{
11+
private string forwardingAddress;
12+
13+
public ForwardProcessedMessagesBehavior(string forwardingAddress)
14+
{
15+
this.forwardingAddress = forwardingAddress;
16+
}
17+
18+
public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next, Func<IRoutingContext, Task> fork)
19+
{
20+
var messageToForward = new OutgoingMessage(
21+
context.Message.MessageId,
22+
new Dictionary<string, string>(context.Message.Headers),
23+
context.Message.Body);
24+
25+
await next();
26+
27+
var forwardingRoutingStrategy = new UnicastRoutingStrategy(forwardingAddress);
28+
29+
var routingContext = new ForwardedRoutingContext(
30+
messageToForward,
31+
forwardingRoutingStrategy,
32+
context);
33+
34+
await fork(routingContext);
35+
}
36+
}
37+
#endregion
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
using NServiceBus.Extensibility;
2+
using NServiceBus.Pipeline;
3+
using NServiceBus.Routing;
4+
using NServiceBus.Transport;
5+
using System;
6+
using System.Collections.Generic;
7+
using System.Threading;
8+
9+
class ForwardedRoutingContext : IRoutingContext
10+
{
11+
public ForwardedRoutingContext(OutgoingMessage message, RoutingStrategy forwardingRoutingStrategy, IBehaviorContext innerContext)
12+
{
13+
Message = message;
14+
RoutingStrategies = new[] { forwardingRoutingStrategy };
15+
Builder = innerContext.Builder;
16+
Extensions = innerContext.Extensions;
17+
}
18+
19+
public OutgoingMessage Message { get; }
20+
21+
public IReadOnlyCollection<RoutingStrategy> RoutingStrategies { get; set; }
22+
23+
public ContextBag Extensions { get; }
24+
25+
public CancellationToken CancellationToken { get; }
26+
27+
public IServiceProvider Builder { get; }
28+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
namespace NServiceBus
2+
{
3+
#region message-forwarding-configuration
4+
public static class MessageForwardingConfigurationExtensions
5+
{
6+
public static void ForwardMessagesAfterProcessingTo(this EndpointConfiguration endpointConfiguration, string forwardingAddress)
7+
{
8+
endpointConfiguration.Pipeline.Register(
9+
new ForwardProcessedMessagesBehavior(forwardingAddress),
10+
"Forwards a copy of each processed message to a forwarding address"
11+
);
12+
}
13+
}
14+
#endregion
15+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<TargetFramework>net10.0</TargetFramework>
4+
<LangVersion>preview</LangVersion>
5+
</PropertyGroup>
6+
<ItemGroup>
7+
<PackageReference Include="NServiceBus" Version="10.0.0-alpha.1" />
8+
</ItemGroup>
9+
</Project>
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using Messages;
4+
using NServiceBus;
5+
6+
#region old-handler
7+
8+
class ImportantMessageHandler :
9+
IHandleMessages<ImportantMessage>
10+
{
11+
public Task Handle(ImportantMessage message, IMessageHandlerContext context)
12+
{
13+
Console.WriteLine($"Got {message.Text}");
14+
return Task.CompletedTask;
15+
}
16+
}
17+
18+
#endregion
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<TargetFramework>net10.0</TargetFramework>
4+
<LangVersion>preview</LangVersion>
5+
<OutputType>Exe</OutputType>
6+
</PropertyGroup>
7+
<ItemGroup>
8+
<ProjectReference Include="..\Messages\Messages.csproj" />
9+
<ProjectReference Include="..\NServiceBus.MessageForwarding\NServiceBus.MessageForwarding.csproj" />
10+
</ItemGroup>
11+
</Project>
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
using System;
2+
using NServiceBus;
3+
4+
Console.Title = "OriginalDestination";
5+
6+
#region forward-messages-after-processing
7+
8+
var config = new EndpointConfiguration("OriginalDestination");
9+
config.UseTransport(new LearningTransport());
10+
11+
config.ForwardMessagesAfterProcessingTo("UpgradedDestination");
12+
13+
#endregion
14+
15+
config.UseSerialization<SystemJsonSerializer>();
16+
17+
var endpoint = await Endpoint.Start(config);
18+
19+
Console.WriteLine("Endpoint Started. Press any key to exit");
20+
21+
Console.ReadKey();
22+
23+
await endpoint.Stop();

0 commit comments

Comments
 (0)