Skip to content

Commit ef83cac

Browse files
Add NServiceBus 10 sample (#7615)
1 parent 8e672ac commit ef83cac

File tree

8 files changed

+179
-0
lines changed

8 files changed

+179
-0
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
2+
Microsoft Visual Studio Solution File, Format Version 12.00
3+
# Visual Studio Version 16
4+
VisualStudioVersion = 16.0.29728.190
5+
MinimumVisualStudioVersion = 15.0.26730.12
6+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample", "Sample\Sample.csproj", "{FAA2FC1D-4554-4AA7-8D5A-334DC8AB601C}"
7+
EndProject
8+
Global
9+
GlobalSection(SolutionConfigurationPlatforms) = preSolution
10+
Debug|Any CPU = Debug|Any CPU
11+
EndGlobalSection
12+
GlobalSection(ProjectConfigurationPlatforms) = postSolution
13+
{FAA2FC1D-4554-4AA7-8D5A-334DC8AB601C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
14+
{FAA2FC1D-4554-4AA7-8D5A-334DC8AB601C}.Debug|Any CPU.Build.0 = Debug|Any CPU
15+
EndGlobalSection
16+
GlobalSection(SolutionProperties) = preSolution
17+
HideSolutionNode = FALSE
18+
EndGlobalSection
19+
EndGlobal
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
using NServiceBus;
5+
using Microsoft.Azure.Cosmos;
6+
public static class Helper
7+
{
8+
public static async Task CreateContainerAndDbIfNotExists(string connectionString)
9+
{
10+
var cosmosClient = new CosmosClient(connectionString);
11+
Database database = await cosmosClient.CreateDatabaseIfNotExistsAsync("Samples.Database.Demo");
12+
Container container = await database.CreateContainerIfNotExistsAsync(
13+
id: "Outbox",
14+
partitionKeyPath: "/messageId"
15+
);
16+
}
17+
18+
public static Task SendDuplicates<TMessage>(IMessageSession context, TMessage message, int totalCount)
19+
{
20+
var duplicatedMessageId = Guid.NewGuid().ToString();
21+
22+
var tasks = Enumerable.Range(0, totalCount)
23+
.Select(i =>
24+
{
25+
var options = new SendOptions();
26+
options.SetHeader("PartitionKeyHeader", "/messageId");//set the partition key
27+
options.RouteToThisEndpoint();
28+
options.SetMessageId(duplicatedMessageId);
29+
30+
return context.Send(message, options);
31+
});
32+
33+
return Task.WhenAll(tasks);
34+
}
35+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using NServiceBus;
4+
5+
public class MyHandler : IHandleMessages<MyMessage>
6+
{
7+
#region Handler
8+
public async Task Handle(MyMessage message, IMessageHandlerContext context)
9+
{
10+
Console.WriteLine($"Processing MessageId {context.MessageId}");
11+
await Task.CompletedTask;
12+
}
13+
#endregion
14+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
using NServiceBus;
2+
3+
public class MyMessage : IMessage;
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using Microsoft.Azure.Cosmos;
4+
using NServiceBus;
5+
6+
class Program
7+
{
8+
static async Task Main()
9+
{
10+
Console.Title = "RabbitMQCosmosDBOutbox";
11+
12+
#region ConfigureTransport
13+
var endpointConfiguration = new EndpointConfiguration("Samples.CosmosDb.Outbox");
14+
15+
var rabbitMqTransport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Classic), "host=localhost;username=rabbitmq;password=rabbitmq")
16+
{
17+
TransportTransactionMode = TransportTransactionMode.ReceiveOnly
18+
};
19+
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
20+
endpointConfiguration.UseTransport(rabbitMqTransport);
21+
#endregion
22+
23+
//add your own account here
24+
var connectionString = "AccountEndpoint=https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";
25+
26+
#region Create CosmosDb resources
27+
28+
await Helper.CreateContainerAndDbIfNotExists(connectionString);
29+
#endregion
30+
31+
#region ConfigurePersistence
32+
33+
var persistence = endpointConfiguration.UsePersistence<CosmosPersistence>();
34+
35+
endpointConfiguration.UsePersistence<CosmosPersistence>()
36+
.CosmosClient(new CosmosClient(connectionString))
37+
.DatabaseName("Samples.Database.Demo");
38+
39+
persistence.DefaultContainer("Outbox", "/messageId");
40+
#endregion
41+
42+
endpointConfiguration.EnableInstallers();
43+
44+
#region SampleSteps
45+
46+
// STEP 1: Run code as is, duplicates can be observed in console and database
47+
var transactionInformation = persistence.TransactionInformation();
48+
transactionInformation.ExtractPartitionKeyFromHeader("PartitionKeyHeader");
49+
50+
// STEP 2: Uncomment this line to enable the Outbox. Duplicates will be suppressed.
51+
// endpointConfiguration.EnableOutbox();
52+
53+
// STEP 3: Comment out this line to allow concurrent processing. Concurrency exceptions will
54+
// occur in the console window, but only 5 entries will be made in the database.
55+
endpointConfiguration.LimitMessageProcessingConcurrencyTo(1);
56+
57+
#endregion
58+
var endpointInstance = await Endpoint.Start(endpointConfiguration);
59+
60+
Console.WriteLine("Endpoint started. Press Enter to send 5 sets of duplicate messages...");
61+
Console.ReadLine();
62+
63+
for (var i = 0; i < 5; i++)
64+
{
65+
var myMessage = new MyMessage();
66+
await Helper.SendDuplicates(endpointInstance, myMessage, totalCount: 2);
67+
}
68+
69+
await Task.Delay(5000);
70+
Console.WriteLine("Press any key to exit");
71+
Console.ReadKey();
72+
await endpointInstance.Stop();
73+
}
74+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<TargetFramework>net10.0</TargetFramework>
4+
<OutputType>Exe</OutputType>
5+
<LangVersion>preview</LangVersion>
6+
</PropertyGroup>
7+
<ItemGroup>
8+
<PackageReference Include="NServiceBus" Version="10.0.0-alpha.2" />
9+
<PackageReference Include="NServiceBus.Persistence.CosmosDB" Version="4.0.0-alpha.2" />
10+
<PackageReference Include="NServiceBus.RabbitMQ" Version="11.0.0-alpha.2" />
11+
</ItemGroup>
12+
</Project>
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
name: outbox
2+
services:
3+
rabbit:
4+
image: rabbitmq:3-management
5+
environment:
6+
RABBITMQ_DEFAULT_USER: rabbitmq
7+
RABBITMQ_DEFAULT_PASS: rabbitmq
8+
RABBITMQ_DEFAULT_VHOST: /
9+
ports:
10+
- "15672:15672"
11+
- "5672:5672"
12+
cosmosdb:
13+
image: mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator
14+
container_name: cosmosdb
15+
ports:
16+
- "8081:8081" # HTTPS endpoint
17+
- "11252:10252" # Change this if necessary, e.g., "11252:10252"
18+
environment:
19+
# You can configure the number of partitions (default is 1)
20+
AZURE_COSMOS_EMULATOR_PARTITION_COUNT: 1
21+
# Enable persistence if you need to retain data between restarts
22+
AZURE_COSMOS_EMULATOR_ENABLE_DATA_PERSISTENCE: "true"

samples/outbox/cosmosdb/Core_10/prerelease.txt

Whitespace-only changes.

0 commit comments

Comments
 (0)