Skip to content

Commit bae2138

Browse files
authored
Update to sample for saga batching to NSB 10 (#7550)
* update to nsb10 for saga batching
1 parent 078fa05 commit bae2138

File tree

13 files changed

+460
-0
lines changed

13 files changed

+460
-0
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
2+
Microsoft Visual Studio Solution File, Format Version 12.00
3+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkGenerator", "WorkGenerator\WorkGenerator.csproj", "{6CF24DEE-1B76-49EC-895A-2F03E1040F6B}"
4+
EndProject
5+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkProcessor", "WorkProcessor\WorkProcessor.csproj", "{86E06E82-9C49-4189-A923-AB00972EFEAD}"
6+
EndProject
7+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SharedMessages", "SharedMessages\SharedMessages.csproj", "{C3634A7F-F0A6-4036-82B0-294BDA5A1C85}"
8+
EndProject
9+
Global
10+
GlobalSection(SolutionConfigurationPlatforms) = preSolution
11+
Debug|Any CPU = Debug|Any CPU
12+
Release|Any CPU = Release|Any CPU
13+
EndGlobalSection
14+
GlobalSection(ProjectConfigurationPlatforms) = postSolution
15+
{6CF24DEE-1B76-49EC-895A-2F03E1040F6B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
16+
{6CF24DEE-1B76-49EC-895A-2F03E1040F6B}.Debug|Any CPU.Build.0 = Debug|Any CPU
17+
{6CF24DEE-1B76-49EC-895A-2F03E1040F6B}.Release|Any CPU.ActiveCfg = Release|Any CPU
18+
{6CF24DEE-1B76-49EC-895A-2F03E1040F6B}.Release|Any CPU.Build.0 = Release|Any CPU
19+
{86E06E82-9C49-4189-A923-AB00972EFEAD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
20+
{86E06E82-9C49-4189-A923-AB00972EFEAD}.Debug|Any CPU.Build.0 = Debug|Any CPU
21+
{86E06E82-9C49-4189-A923-AB00972EFEAD}.Release|Any CPU.ActiveCfg = Release|Any CPU
22+
{86E06E82-9C49-4189-A923-AB00972EFEAD}.Release|Any CPU.Build.0 = Release|Any CPU
23+
{C3634A7F-F0A6-4036-82B0-294BDA5A1C85}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
24+
{C3634A7F-F0A6-4036-82B0-294BDA5A1C85}.Debug|Any CPU.Build.0 = Debug|Any CPU
25+
{C3634A7F-F0A6-4036-82B0-294BDA5A1C85}.Release|Any CPU.ActiveCfg = Release|Any CPU
26+
{C3634A7F-F0A6-4036-82B0-294BDA5A1C85}.Release|Any CPU.Build.0 = Release|Any CPU
27+
EndGlobalSection
28+
EndGlobal
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
public class EndpointNames
2+
{
3+
public const string WorkGenerator = "WorkGenerator";
4+
public const string WorkProcessor = "WorkProcessor";
5+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
using System;
2+
using NServiceBus;
3+
4+
public record StartProcessing : ICommand
5+
{
6+
public Guid ProcessId { get; set; }
7+
public int WorkCount { get; set; }
8+
}
9+
10+
public record ProcessWorkOrder : ICommand
11+
{
12+
public Guid ProcessId { get; set; }
13+
public int WorkOrder { get; set; }
14+
}
15+
16+
public enum WorkStatus
17+
{
18+
Success,
19+
Failed
20+
}
21+
22+
public record WorkOrderCompleted : IMessage
23+
{
24+
public Guid ProcessId { get; set; }
25+
public int WorkOrderNo { get; set; }
26+
public WorkStatus Status { get; set; }
27+
public string Error { get; set; }
28+
}
29+
30+
public record WorkAllDone : IMessage
31+
{
32+
public Guid ProcessId { get; set; }
33+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using System.Threading.Tasks;
2+
using NServiceBus;
3+
4+
public static class Endpoint
5+
{
6+
public static Task<IEndpointInstance> StartWithDefaultRoutes(this EndpointConfiguration config, RoutingSettings<LearningTransport> routing)
7+
{
8+
routing.RouteToEndpoint(typeof(StartProcessing), EndpointNames.WorkGenerator);
9+
routing.RouteToEndpoint(typeof(WorkOrderCompleted), EndpointNames.WorkGenerator);
10+
routing.RouteToEndpoint(typeof(WorkAllDone), EndpointNames.WorkGenerator);
11+
routing.RouteToEndpoint(typeof(ProcessWorkOrder), EndpointNames.WorkProcessor);
12+
13+
return NServiceBus.Endpoint.Start(config);
14+
}
15+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net10.0</TargetFramework>
5+
<LangVersion>preview</LangVersion>
6+
</PropertyGroup>
7+
8+
<ItemGroup>
9+
<PackageReference Include="NServiceBus" Version="10.0.0-alpha.1" />
10+
</ItemGroup>
11+
12+
</Project>
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
using System.Collections.Generic;
2+
using System.Linq;
3+
4+
public static class Extensions
5+
{
6+
public const int DefaultBatchSize = 100;
7+
8+
public static IEnumerable<IEnumerable<T>> BatchWithDefaultSize<T>(this IEnumerable<T> items)
9+
{
10+
return Batch(items, DefaultBatchSize);
11+
}
12+
13+
public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> items, int batchSize)
14+
{
15+
return items.Select((item, inx) => new { item, inx })
16+
.GroupBy(x => x.inx / batchSize)
17+
.Select(g => g.Select(x => x.item));
18+
}
19+
}
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Threading.Tasks;
6+
using NServiceBus;
7+
8+
public class ProcessingSaga : Saga<ProcessingSagaData>,
9+
IAmStartedByMessages<StartProcessing>,
10+
IHandleMessages<WorkOrderCompleted>,
11+
IHandleMessages<WorkAllDone>
12+
{
13+
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<ProcessingSagaData> mapper)
14+
{
15+
mapper.MapSaga(saga => saga.ProcessId)
16+
.ToMessage<StartProcessing>(msg => msg.ProcessId)
17+
.ToMessage<WorkOrderCompleted>(msg => msg.ProcessId)
18+
.ToMessage<WorkAllDone>(msg => msg.ProcessId);
19+
}
20+
21+
public async Task Handle(StartProcessing message, IMessageHandlerContext context)
22+
{
23+
Console.WriteLine($"Processing saga started: '{message.ProcessId}'");
24+
Console.WriteLine($"Starting the process for '{message.WorkCount}' work orders.");
25+
26+
Data.WorkCount = message.WorkCount;
27+
Data.StartedAt = DateTime.UtcNow;
28+
Data.Progress = new WorkProgress();
29+
30+
await ImportNextBatch(context);
31+
}
32+
33+
#region saga-import-nextbatch
34+
private async Task ImportNextBatch(IMessageHandlerContext context)
35+
{
36+
if (Data.Progress.AllWorkCompleted(Data.WorkCount))
37+
{
38+
await FinishWork(context);
39+
}
40+
else if (Data.Progress.HasRemainingWork(Data.WorkCount))
41+
{
42+
var importedPages = Data.Progress.ImportedPages();
43+
var remainingPages = Data.WorkCount - importedPages;
44+
var range = Enumerable.Range(importedPages + 1, remainingPages);
45+
var nextBatch = range.Batch(batchSize: 100).First().ToList();
46+
47+
await SendWorkRequest(nextBatch, context);
48+
}
49+
}
50+
#endregion
51+
52+
#region saga-work-completed
53+
public async Task Handle(WorkOrderCompleted message, IMessageHandlerContext context)
54+
{
55+
if (message.Status == WorkStatus.Failed)
56+
{
57+
//NOTE: The work can be marked as completed (to continue with the next pages), or
58+
//as failed so the next set of pages are stalled until the issue is resolved.
59+
await Console.Error.WriteLineAsync($"Work {message.WorkOrderNo} finished with error: {message.Error}");
60+
Data.FailedJobs += 1;
61+
}
62+
else
63+
{
64+
Console.WriteLine($"Work {message.WorkOrderNo} finished.");
65+
}
66+
67+
Data.Progress.MarkWorkComplete(message.WorkOrderNo);
68+
69+
if (Data.Progress.AllWorkCompleted(Data.WorkCount))
70+
{
71+
await FinishWork(context);
72+
}
73+
else if (Data.Progress.IsCurrentBatchCompleted())
74+
{
75+
await ImportNextBatch(context);
76+
}
77+
}
78+
#endregion
79+
80+
public Task Handle(WorkAllDone message, IMessageHandlerContext context)
81+
{
82+
var took = DateTime.UtcNow - Data.StartedAt;
83+
Console.WriteLine($"Total {Data.FailedJobs} failed jobs.");
84+
Console.WriteLine($"All done. Took {took.TotalSeconds}");
85+
MarkAsComplete();
86+
return Task.CompletedTask;
87+
}
88+
89+
private async Task SendWorkRequest(List<int> orders, IMessageHandlerContext context)
90+
{
91+
var orderRange = $"{orders[0]} - {orders[orders.Count-1]}";
92+
Console.WriteLine($"Queueing next batch of work orders: ({orderRange}).");
93+
94+
Data.Progress.StartNewBatch(orders);
95+
96+
foreach (var order in orders)
97+
{
98+
await context.Send(new ProcessWorkOrder
99+
{
100+
ProcessId = Data.ProcessId,
101+
WorkOrder = order
102+
});
103+
}
104+
}
105+
106+
private async Task FinishWork(IMessageHandlerContext context)
107+
{
108+
if (Data.Progress.AllWorkCompleted(Data.WorkCount))
109+
{
110+
await context.SendLocal(new WorkAllDone
111+
{
112+
ProcessId = Data.ProcessId
113+
});
114+
}
115+
}
116+
}
117+
118+
[Serializable]
119+
public class WorkProgress
120+
{
121+
public WorkProgress()
122+
{
123+
CompletedWork = new List<int>();
124+
BatchPages = new ConcurrentDictionary<int, bool>();
125+
}
126+
127+
public List<int> CompletedWork { get; set; }
128+
public IDictionary<int, bool> BatchPages { get; set; }
129+
130+
public void MarkWorkComplete(int workNo)
131+
{
132+
CompletedWork.Add(workNo);
133+
BatchPages[workNo] = true;
134+
}
135+
136+
public bool AllWorkCompleted(int totalWorkCount)
137+
{
138+
return CompletedWork.Count == totalWorkCount;
139+
}
140+
141+
public int ImportedPages()
142+
{
143+
return CompletedWork.Count;
144+
}
145+
146+
public bool IsCurrentBatchCompleted()
147+
{
148+
return BatchPages.All(p => p.Value);
149+
}
150+
151+
public void StartNewBatch(List<int> pages)
152+
{
153+
BatchPages.Clear();
154+
foreach (var p in pages)
155+
{
156+
BatchPages.Add(p, false);
157+
}
158+
}
159+
160+
public bool HasRemainingWork(int totalWorkCount)
161+
{
162+
var importedPages = ImportedPages();
163+
var remainingPages = totalWorkCount - importedPages;
164+
return remainingPages > 0;
165+
}
166+
}
167+
168+
public class ProcessingSagaData : ContainSagaData
169+
{
170+
public ProcessingSagaData()
171+
{
172+
Progress = new WorkProgress();
173+
}
174+
175+
public Guid ProcessId { get; set; }
176+
public int WorkCount { get; set; }
177+
public WorkProgress Progress { get; set; }
178+
public DateTime StartedAt { get; set; }
179+
public int FailedJobs { get; set; }
180+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using NServiceBus;
4+
5+
public static class Program
6+
{
7+
public static async Task Main()
8+
{
9+
var config = new EndpointConfiguration(EndpointNames.WorkGenerator);
10+
11+
config.UsePersistence<LearningPersistence>();
12+
config.UseSerialization<SystemJsonSerializer>();
13+
config.AuditProcessedMessagesTo("audit");
14+
config.SendFailedMessagesTo("error");
15+
16+
var transport = config.UseTransport<LearningTransport>();
17+
var routing = transport.Routing();
18+
var endpoint = await config.StartWithDefaultRoutes(routing);
19+
20+
Console.WriteLine("Started.");
21+
Console.WriteLine("Press 'S' to start a new process or [Enter] to exit.");
22+
23+
while (true)
24+
{
25+
var pressedKey = Console.ReadKey();
26+
switch (pressedKey.Key)
27+
{
28+
case ConsoleKey.Enter:
29+
{
30+
return;
31+
}
32+
case ConsoleKey.S:
33+
{
34+
await StartSaga(endpoint);
35+
break;
36+
}
37+
}
38+
}
39+
}
40+
41+
static async Task StartSaga(IMessageSession session)
42+
{
43+
var id = Guid.NewGuid();
44+
var count = Random.Shared.Next(500, 1000);
45+
46+
await session.SendLocal(new StartProcessing
47+
{
48+
ProcessId = id,
49+
WorkCount = count
50+
});
51+
Console.WriteLine($"Started process '{id}' with '{count}' work orders.");
52+
}
53+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net10.0</TargetFramework>
6+
<LangVersion>preview</LangVersion>
7+
</PropertyGroup>
8+
9+
10+
11+
<ItemGroup>
12+
<ProjectReference Include="..\SharedMessages\SharedMessages.csproj" />
13+
</ItemGroup>
14+
15+
</Project>

0 commit comments

Comments
 (0)