Skip to content

Commit a975b47

Browse files
Update outbox/sql sample to NServiceBus v10 (#7610)
* Remove the imput loop service from the v9 sample * Add v10 sample * It cannot be plural * tweaks
1 parent 1f4b9b3 commit a975b47

17 files changed

+409
-45
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using NServiceBus;
4+
using Microsoft.Extensions.Logging;
5+
using NServiceBus.Persistence.Sql;
6+
7+
sealed class OrderLifecycleSaga(ILogger<OrderLifecycleSaga> logger) :
8+
SqlSaga<OrderLifecycleSaga.SagaData>,
9+
IAmStartedByMessages<OrderSubmitted>,
10+
IHandleTimeouts<OrderTimeout>
11+
{
12+
protected override void ConfigureMapping(IMessagePropertyMapper mapper)
13+
{
14+
mapper.ConfigureMapping<OrderSubmitted>(_ => _.OrderId);
15+
}
16+
17+
protected override string CorrelationPropertyName => nameof(SagaData.OrderId);
18+
19+
#region Timeout
20+
21+
public Task Handle(OrderSubmitted message, IMessageHandlerContext context)
22+
{
23+
var orderTimeout = new OrderTimeout();
24+
return RequestTimeout(context, TimeSpan.FromSeconds(5), orderTimeout);
25+
}
26+
27+
#endregion
28+
29+
public Task Timeout(OrderTimeout state, IMessageHandlerContext context)
30+
{
31+
logger.LogInformation("Got timeout");
32+
return Task.CompletedTask;
33+
}
34+
35+
public class SagaData :
36+
ContainSagaData
37+
{
38+
public Guid OrderId { get; set; }
39+
}
40+
41+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
using System.Threading.Tasks;
2+
using Microsoft.Data.SqlClient;
3+
using NServiceBus;
4+
using Microsoft.Extensions.Logging;
5+
6+
sealed class OrderSubmittedHandler(ILogger<OrderSubmittedHandler> logger) :
7+
IHandleMessages<OrderSubmitted>
8+
{
9+
public async Task Handle(OrderSubmitted message, IMessageHandlerContext context)
10+
{
11+
logger.LogInformation("Order {OrderId} worth {OrderValue} submitted",
12+
message.OrderId,
13+
message.Value
14+
);
15+
16+
#region StoreUserData
17+
18+
var session = context.SynchronizedStorageSession.SqlPersistenceSession();
19+
20+
var sql = @"insert into receiver.SubmittedOrder
21+
(Id, Value)
22+
values (@Id, @Value)";
23+
24+
await using (var command = new SqlCommand(
25+
cmdText: sql,
26+
connection: (SqlConnection)session.Connection,
27+
transaction: (SqlTransaction)session.Transaction))
28+
{
29+
var parameters = command.Parameters;
30+
parameters.AddWithValue("Id", message.OrderId);
31+
parameters.AddWithValue("Value", message.Value);
32+
await command.ExecuteNonQueryAsync(context.CancellationToken);
33+
}
34+
35+
#endregion
36+
37+
#region Reply
38+
39+
var orderAccepted = new OrderAccepted(OrderId: message.OrderId);
40+
await context.Reply(orderAccepted);
41+
42+
#endregion
43+
}
44+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
public record OrderTimeout;
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
using System;
2+
using System.IO;
3+
using Microsoft.Data.SqlClient;
4+
using Microsoft.Extensions.Hosting;
5+
using NServiceBus;
6+
using NServiceBus.Transport.SqlServer;
7+
8+
var host = Host.CreateDefaultBuilder(args)
9+
.ConfigureServices((hostContext, services) => { Console.Title = "Server"; })
10+
.UseNServiceBus(x =>
11+
{
12+
Console.Title = "Receiver";
13+
14+
//for local instance or SqlExpress
15+
//string connectionString = @"Data Source=(localdb)\mssqllocaldb;Database=NsbSamplesSqlOutbox;Trusted_Connection=True;MultipleActiveResultSets=true";
16+
const string connectionString = @"Server=localhost,1433;Initial Catalog=NsbSamplesSqlOutbox;User Id=SA;Password=yourStrong(!)Password;Max Pool Size=100;Encrypt=false";
17+
18+
var endpointConfiguration = new EndpointConfiguration("Samples.SqlOutbox.Receiver");
19+
endpointConfiguration.EnableInstallers();
20+
endpointConfiguration.SendFailedMessagesTo("error");
21+
22+
#region ReceiverConfiguration
23+
24+
var transport = new SqlServerTransport(connectionString)
25+
{
26+
DefaultSchema = "receiver",
27+
TransportTransactionMode = TransportTransactionMode.ReceiveOnly
28+
};
29+
transport.SchemaAndCatalog.UseSchemaForQueue("error", "dbo");
30+
transport.SchemaAndCatalog.UseSchemaForQueue("audit", "dbo");
31+
32+
var routing = endpointConfiguration.UseTransport(transport);
33+
routing.UseSchemaForEndpoint("Samples.SqlOutbox.Sender", "sender");
34+
35+
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
36+
persistence.ConnectionBuilder(
37+
connectionBuilder: () => new SqlConnection(connectionString)
38+
);
39+
var dialect = persistence.SqlDialect<SqlDialect.MsSqlServer>();
40+
dialect.Schema("receiver");
41+
persistence.TablePrefix("");
42+
43+
transport.Subscriptions.DisableCaching = true;
44+
transport.Subscriptions.SubscriptionTableName = new SubscriptionTableName(
45+
table: "Subscriptions",
46+
schema: "dbo"
47+
);
48+
49+
endpointConfiguration.EnableOutbox();
50+
51+
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
52+
53+
#endregion
54+
55+
SqlHelper.CreateSchema(connectionString, "receiver");
56+
57+
SqlHelper.ExecuteSql(connectionString, File.ReadAllText("Startup.sql"));
58+
return endpointConfiguration;
59+
})
60+
.Build();
61+
62+
63+
await host.RunAsync();
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<OutputType>Exe</OutputType>
4+
<TargetFramework>net10.0</TargetFramework>
5+
<LangVersion>preview</LangVersion>
6+
</PropertyGroup>
7+
<ItemGroup>
8+
<ProjectReference Include="..\Shared\Shared.csproj" />
9+
</ItemGroup>
10+
<ItemGroup>
11+
<PackageReference Include="Newtonsoft.Json" Version="13.*" />
12+
<PackageReference Include="NServiceBus.Persistence.Sql" Version="9.0.0-alpha.2" />
13+
<PackageReference Include="NServiceBus.Transport.SqlServer" Version="9.0.0-alpha.2" />
14+
<PackageReference Include="NServiceBus.Extensions.Hosting" Version="4.0.0-alpha.2" />
15+
</ItemGroup>
16+
<ItemGroup>
17+
<None Update="Startup.sql" CopyToOutputDirectory="PreserveNewest" />
18+
</ItemGroup>
19+
</Project>
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
if object_id('receiver.SubmittedOrder', 'U') is null
2+
create table receiver.SubmittedOrder (
3+
Id uniqueidentifier not null primary key,
4+
Value int not null
5+
)
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using System.Threading.Tasks;
2+
using NServiceBus;
3+
using Microsoft.Extensions.Logging;
4+
5+
sealed class OrderAcceptedHandler(ILogger<OrderAcceptedHandler> logger) :
6+
IHandleMessages<OrderAccepted>
7+
{
8+
public Task Handle(OrderAccepted message, IMessageHandlerContext context)
9+
{
10+
logger.LogInformation("Order {OrderId} accepted", message.OrderId);
11+
return Task.CompletedTask;
12+
}
13+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using Microsoft.Data.SqlClient;
4+
using Microsoft.Extensions.DependencyInjection;
5+
using Microsoft.Extensions.Hosting;
6+
using NServiceBus;
7+
using NServiceBus.Transport.SqlServer;
8+
9+
Console.Title = "Sender";
10+
var host = Host.CreateDefaultBuilder(args)
11+
.UseNServiceBus(x =>
12+
{
13+
var endpointConfiguration = new EndpointConfiguration("Samples.SqlOutbox.Sender");
14+
endpointConfiguration.EnableInstallers();
15+
endpointConfiguration.SendFailedMessagesTo("error");
16+
17+
#region SenderConfiguration
18+
19+
//for local instance or SqlExpress
20+
// string connectionString = @"Data Source=(localdb)\mssqllocaldb;Database=NsbSamplesSqlOutbox;Trusted_Connection=True;MultipleActiveResultSets=true";
21+
const string connectionString = @"Server=localhost,1433;Initial Catalog=NsbSamplesSqlOutbox;User Id=SA;Password=yourStrong(!)Password;Max Pool Size=100;Encrypt=false";
22+
23+
var transport = new SqlServerTransport(connectionString)
24+
{
25+
DefaultSchema = "sender",
26+
TransportTransactionMode = TransportTransactionMode.ReceiveOnly
27+
};
28+
transport.SchemaAndCatalog.UseSchemaForQueue("error", "dbo");
29+
transport.SchemaAndCatalog.UseSchemaForQueue("audit", "dbo");
30+
31+
endpointConfiguration.UseTransport(transport);
32+
33+
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
34+
persistence.ConnectionBuilder(
35+
connectionBuilder: () => new SqlConnection(connectionString)
36+
);
37+
var dialect = persistence.SqlDialect<SqlDialect.MsSqlServer>();
38+
dialect.Schema("sender");
39+
persistence.TablePrefix("");
40+
41+
transport.Subscriptions.DisableCaching = true;
42+
transport.Subscriptions.SubscriptionTableName = new SubscriptionTableName(
43+
table: "Subscriptions",
44+
schema: "dbo"
45+
);
46+
47+
endpointConfiguration.EnableOutbox();
48+
49+
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
50+
51+
#endregion
52+
53+
SqlHelper.CreateSchema(connectionString, "sender");
54+
Console.WriteLine("Press enter to send a message");
55+
return endpointConfiguration;
56+
})
57+
.Build();
58+
59+
await host.StartAsync();
60+
61+
var messageSession = host.Services.GetService<IMessageSession>();
62+
var random = new Random();
63+
64+
while (true)
65+
{
66+
if (!Console.KeyAvailable)
67+
{
68+
await Task.Delay(100);
69+
continue;
70+
}
71+
var key = Console.ReadKey();
72+
Console.WriteLine();
73+
74+
if (key.Key != ConsoleKey.Enter)
75+
{
76+
break;
77+
}
78+
79+
var orderSubmitted = new OrderSubmitted(
80+
OrderId: Guid.NewGuid(),
81+
Value: random.Next(100)
82+
);
83+
84+
await messageSession.Publish(orderSubmitted);
85+
}
86+
87+
await host.StopAsync();
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<OutputType>Exe</OutputType>
4+
<TargetFramework>net10.0</TargetFramework>
5+
<LangVersion>preview</LangVersion>
6+
</PropertyGroup>
7+
<ItemGroup>
8+
<ProjectReference Include="..\Shared\Shared.csproj" />
9+
</ItemGroup>
10+
<ItemGroup>
11+
<PackageReference Include="Newtonsoft.Json" Version="13.*" />
12+
<PackageReference Include="NServiceBus.Extensions.Hosting" Version="4.0.0-alpha.2" />
13+
<PackageReference Include="NServiceBus.Persistence.Sql" Version="9.0.0-alpha.2" />
14+
<PackageReference Include="NServiceBus.Transport.SqlServer" Version="9.0.0-alpha.2" />
15+
</ItemGroup>
16+
</Project>
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
using System;
2+
using NServiceBus;
3+
4+
public record OrderAccepted(Guid OrderId) : IMessage;

0 commit comments

Comments
 (0)