Skip to content

Commit 9a27516

Browse files
NServiceBus 10 Outbox/RabbitMQ sample (#7451)
1 parent eb0c208 commit 9a27516

File tree

8 files changed

+202
-0
lines changed

8 files changed

+202
-0
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
2+
Microsoft Visual Studio Solution File, Format Version 12.00
3+
# Visual Studio Version 16
4+
VisualStudioVersion = 16.0.29728.190
5+
MinimumVisualStudioVersion = 15.0.26730.12
6+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample", "Sample\Sample.csproj", "{FAA2FC1D-4554-4AA7-8D5A-334DC8AB601C}"
7+
EndProject
8+
Global
9+
GlobalSection(SolutionConfigurationPlatforms) = preSolution
10+
Debug|Any CPU = Debug|Any CPU
11+
EndGlobalSection
12+
GlobalSection(ProjectConfigurationPlatforms) = postSolution
13+
{FAA2FC1D-4554-4AA7-8D5A-334DC8AB601C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
14+
{FAA2FC1D-4554-4AA7-8D5A-334DC8AB601C}.Debug|Any CPU.Build.0 = Debug|Any CPU
15+
EndGlobalSection
16+
GlobalSection(SolutionProperties) = preSolution
17+
HideSolutionNode = FALSE
18+
EndGlobalSection
19+
EndGlobal
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
using Microsoft.Data.SqlClient;
5+
using NServiceBus;
6+
7+
public static class Helper
8+
{
9+
public static void EnsureDatabaseExists(string connectionString)
10+
{
11+
var builder = new SqlConnectionStringBuilder(connectionString);
12+
var database = builder.InitialCatalog;
13+
14+
var masterConnection = connectionString.Replace(builder.InitialCatalog, "master");
15+
16+
using (var connection = new SqlConnection(masterConnection))
17+
{
18+
connection.Open();
19+
20+
using (var command = connection.CreateCommand())
21+
{
22+
command.CommandText = $"""
23+
if(db_id('{database}') is null)
24+
create database [{database}]
25+
""";
26+
command.ExecuteNonQuery();
27+
}
28+
}
29+
30+
using (var connection = new SqlConnection(connectionString))
31+
{
32+
connection.Open();
33+
34+
using (var command = connection.CreateCommand())
35+
{
36+
command.CommandText = """
37+
if not exists (select * from sys.tables where name = 'BusinessObject')
38+
create table BusinessObject ( Id int identity(1,1) not null primary key, MessageId varchar(40) not null)
39+
""";
40+
41+
command.ExecuteNonQuery();
42+
}
43+
}
44+
}
45+
46+
public static Task SendDuplicates<TMessage>(IMessageSession context, TMessage message, int totalCount)
47+
{
48+
var duplicatedMessageId = Guid.NewGuid().ToString();
49+
50+
var tasks = Enumerable.Range(0, totalCount)
51+
.Select(i =>
52+
{
53+
var options = new SendOptions();
54+
options.RouteToThisEndpoint();
55+
options.SetMessageId(duplicatedMessageId);
56+
57+
return context.Send(message, options);
58+
});
59+
60+
return Task.WhenAll(tasks);
61+
}
62+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using NServiceBus;
4+
5+
public class MyHandler : IHandleMessages<MyMessage>
6+
{
7+
#region Handler
8+
public async Task Handle(MyMessage message, IMessageHandlerContext context)
9+
{
10+
Console.WriteLine($"Processing MessageId {context.MessageId}");
11+
12+
var sqlPersistenceSession = context.SynchronizedStorageSession.SqlPersistenceSession();
13+
14+
await using var command = sqlPersistenceSession.Connection.CreateCommand();
15+
command.CommandText = $"insert into BusinessObject (MessageId) values ('{context.MessageId}')";
16+
command.Transaction = sqlPersistenceSession.Transaction;
17+
await command.ExecuteNonQueryAsync(context.CancellationToken);
18+
}
19+
#endregion
20+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
using NServiceBus;
2+
3+
public class MyMessage : IMessage;
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using Microsoft.Data.SqlClient;
4+
using NServiceBus;
5+
6+
class Program
7+
{
8+
static async Task Main()
9+
{
10+
Console.Title = "RabbitMQOutbox";
11+
12+
#region ConfigureTransport
13+
var endpointConfiguration = new EndpointConfiguration("Samples.RabbitMQ.Outbox");
14+
15+
var rabbitMqTransport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Classic), "host=localhost;username=rabbitmq;password=rabbitmq")
16+
{
17+
TransportTransactionMode = TransportTransactionMode.ReceiveOnly
18+
};
19+
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
20+
endpointConfiguration.UseTransport(rabbitMqTransport);
21+
#endregion
22+
23+
#region ConfigurePersistence
24+
// for SqlExpress use Data Source=.\SqlExpress;Initial Catalog=NsbRabbitMqOutbox;Integrated Security=True;Max Pool Size=100;Encrypt=false
25+
// Password must match value in docker-compose.yml
26+
var connectionString = @"Server=localhost,11433;Initial Catalog=NsbRabbitMqOutbox;User Id=SA;Password=NServiceBus!;Max Pool Size=100;Encrypt=false";
27+
28+
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
29+
persistence.SqlDialect<SqlDialect.MsSqlServer>();
30+
persistence.ConnectionBuilder(() => new SqlConnection(connectionString));
31+
#endregion
32+
33+
endpointConfiguration.EnableInstallers();
34+
35+
#region SampleSteps
36+
37+
// STEP 1: Run code as is, duplicates can be observed in console and database
38+
39+
// STEP 2: Uncomment this line to enable the Outbox. Duplicates will be suppressed.
40+
// endpointConfiguration.EnableOutbox();
41+
42+
// STEP 3: Comment out this line to allow concurrent processing. Concurrency exceptions will
43+
// occur in the console window, but only 5 entries will be made in the database.
44+
endpointConfiguration.LimitMessageProcessingConcurrencyTo(1);
45+
46+
#endregion
47+
48+
Helper.EnsureDatabaseExists(connectionString);
49+
50+
var endpointInstance = await Endpoint.Start(endpointConfiguration);
51+
52+
Console.WriteLine("Endpoint started. Press Enter to send 5 sets of duplicate messages...");
53+
Console.ReadLine();
54+
55+
for (var i = 0; i < 5; i++)
56+
{
57+
var myMessage = new MyMessage();
58+
await Helper.SendDuplicates(endpointInstance, myMessage, totalCount: 2);
59+
}
60+
61+
await Task.Delay(5000);
62+
Console.WriteLine("Press any key to exit");
63+
Console.ReadKey();
64+
await endpointInstance.Stop();
65+
}
66+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<TargetFramework>net10.0</TargetFramework>
4+
<OutputType>Exe</OutputType>
5+
<LangVersion>preview</LangVersion>
6+
</PropertyGroup>
7+
<ItemGroup>
8+
<PackageReference Include="NServiceBus" Version="10.0.0-alpha.1" />
9+
<PackageReference Include="NServiceBus.RabbitMQ" Version="11.0.0-alpha.1" />
10+
<PackageReference Include="NServiceBus.Persistence.Sql" Version="9.0.0-alpha.1" />
11+
<PackageReference Include="Microsoft.Data.SqlClient" Version="5.*" />
12+
</ItemGroup>
13+
</Project>
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
name: outbox
2+
services:
3+
rabbit:
4+
image: rabbitmq:3-management
5+
environment:
6+
RABBITMQ_DEFAULT_USER: rabbitmq
7+
RABBITMQ_DEFAULT_PASS: rabbitmq
8+
RABBITMQ_DEFAULT_VHOST: /
9+
ports:
10+
- "15672:15672"
11+
- "5672:5672"
12+
sql:
13+
image: mcr.microsoft.com/mssql/server
14+
environment:
15+
ACCEPT_EULA: Y
16+
MSSQL_PID: Express
17+
SA_PASSWORD: NServiceBus!
18+
ports:
19+
- "11433:1433"

samples/outbox/rabbit/Core_10/prerelease.txt

Whitespace-only changes.

0 commit comments

Comments
 (0)