Skip to content

Commit c33346b

Browse files
authored
Refactor messaging services (#48)
* Add Messaging.Tests project Includes Testcontainers project for end to end Rabbit message testing * WIP: refactor RabbitService, use single Publish/Subscribe method * Refactor DMS messaging. Use single Publish/Subscribe methods. * Remove StatusPublish/StatusSubscribe and use generic Publish/Subscribe * Remove IDbMessagingService and replace with generic IMessageService * Rename messages and use DbRequestMessage<TResponse> * Ensure RabbitService serializes generic type to concrete implementation * Remove constructors and rely on properties for initialisation Migrates from constructor-based to property-based initialization for message payloads, ensuring consistency in data transfer between services.
1 parent 0b226af commit c33346b

File tree

125 files changed

+980
-1432
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

125 files changed

+980
-1432
lines changed

Directory.Packages.props

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@
6666
<PackageVersion Include="Serilog.Sinks.File" Version="7.0.0" />
6767
<PackageVersion Include="SimMetrics.Net" Version="1.0.5" />
6868
<PackageVersion Include="System.Data.SqlClient" Version="4.9.0" />
69+
<PackageVersion Include="Testcontainers.RabbitMq" Version="4.9.0" />
70+
<PackageVersion Include="Testcontainers.Xunit" Version="4.9.0" />
6971
<PackageVersion Include="xunit" Version="2.9.3" />
7072
<PackageVersion Include="xunit.runner.visualstudio" Version="3.1.5">
7173
<PrivateAssets>all</PrivateAssets>

dms.sln

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
2-
Microsoft Visual Studio Solution File, Format Version 12.00
1+
Microsoft Visual Studio Solution File, Format Version 12.00
32
# Visual Studio Version 18
43
VisualStudioVersion = 18.0.11116.177
54
MinimumVisualStudioVersion = 15.0.26124.0
@@ -97,6 +96,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Offloc.Cleaner.Tests", "tes
9796
EndProject
9897
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Delius.Parser.Tests", "tests\Delius.Parser.Tests\Delius.Parser.Tests.csproj", "{841B9B78-5596-46EF-B38D-0E1C18321CDF}"
9998
EndProject
99+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Messaging.Tests", "tests\Messaging.Tests\Messaging.Tests.csproj", "{16431464-ACDD-4DE0-8AFF-D8926CF8C14E}"
100+
EndProject
100101
Global
101102
GlobalSection(SolutionConfigurationPlatforms) = preSolution
102103
Debug|Any CPU = Debug|Any CPU
@@ -545,6 +546,18 @@ Global
545546
{841B9B78-5596-46EF-B38D-0E1C18321CDF}.Release|x64.Build.0 = Release|Any CPU
546547
{841B9B78-5596-46EF-B38D-0E1C18321CDF}.Release|x86.ActiveCfg = Release|Any CPU
547548
{841B9B78-5596-46EF-B38D-0E1C18321CDF}.Release|x86.Build.0 = Release|Any CPU
549+
{16431464-ACDD-4DE0-8AFF-D8926CF8C14E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
550+
{16431464-ACDD-4DE0-8AFF-D8926CF8C14E}.Debug|Any CPU.Build.0 = Debug|Any CPU
551+
{16431464-ACDD-4DE0-8AFF-D8926CF8C14E}.Debug|x64.ActiveCfg = Debug|Any CPU
552+
{16431464-ACDD-4DE0-8AFF-D8926CF8C14E}.Debug|x64.Build.0 = Debug|Any CPU
553+
{16431464-ACDD-4DE0-8AFF-D8926CF8C14E}.Debug|x86.ActiveCfg = Debug|Any CPU
554+
{16431464-ACDD-4DE0-8AFF-D8926CF8C14E}.Debug|x86.Build.0 = Debug|Any CPU
555+
{16431464-ACDD-4DE0-8AFF-D8926CF8C14E}.Release|Any CPU.ActiveCfg = Release|Any CPU
556+
{16431464-ACDD-4DE0-8AFF-D8926CF8C14E}.Release|Any CPU.Build.0 = Release|Any CPU
557+
{16431464-ACDD-4DE0-8AFF-D8926CF8C14E}.Release|x64.ActiveCfg = Release|Any CPU
558+
{16431464-ACDD-4DE0-8AFF-D8926CF8C14E}.Release|x64.Build.0 = Release|Any CPU
559+
{16431464-ACDD-4DE0-8AFF-D8926CF8C14E}.Release|x86.ActiveCfg = Release|Any CPU
560+
{16431464-ACDD-4DE0-8AFF-D8926CF8C14E}.Release|x86.Build.0 = Release|Any CPU
548561
EndGlobalSection
549562
GlobalSection(SolutionProperties) = preSolution
550563
HideSolutionNode = FALSE
@@ -590,6 +603,7 @@ Global
590603
{10A645A2-039D-4906-BCB7-DBE12B0EF34D} = {0AB3BF05-4346-4AA6-1389-037BE0695223}
591604
{199066EA-192B-4126-A1BC-85900CDCA1DD} = {0AB3BF05-4346-4AA6-1389-037BE0695223}
592605
{841B9B78-5596-46EF-B38D-0E1C18321CDF} = {0AB3BF05-4346-4AA6-1389-037BE0695223}
606+
{16431464-ACDD-4DE0-8AFF-D8926CF8C14E} = {0AB3BF05-4346-4AA6-1389-037BE0695223}
593607
EndGlobalSection
594608
GlobalSection(ExtensibilityGlobals) = postSolution
595609
SolutionGuid = {39E62751-A441-4DEF-AD24-20CDC6D59FF3}

src/Blocking/BlockingBackgroundService.cs

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,37 +7,22 @@
77

88
namespace Blocking;
99

10-
public class BlockingBackgroundService : BackgroundService
10+
public class BlockingBackgroundService(
11+
IMessageService messageService,
12+
DatabaseInsert database) : BackgroundService
1113
{
12-
private readonly IStatusMessagingService statusMessagingService;
13-
private readonly IImportMessagingService importMessageService;
14-
private readonly IBlockingMessagingService blockingMessageService;
15-
private readonly DatabaseInsert matchingDbInsert;
16-
17-
public BlockingBackgroundService(IStatusMessagingService statusMessagingService, IImportMessagingService importMessageService,
18-
DatabaseInsert matchingDbInsert, IBlockingMessagingService blockingMessageService)
19-
{
20-
this.statusMessagingService = statusMessagingService;
21-
this.matchingDbInsert = matchingDbInsert;
22-
this.importMessageService = importMessageService;
23-
this.blockingMessageService = blockingMessageService;
24-
}
25-
2614
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
2715
{
28-
await Task.Run(() =>
16+
await messageService.SubscribeAsync<ImportFinishedMessage>(async (message) =>
2917
{
30-
importMessageService.ImportSubscribeAsync<ImportFinishedMessage>(async (message) =>
31-
{
32-
await statusMessagingService.StatusPublishAsync(new StatusUpdateMessage("Blocking candidates..."));
33-
await CallBlocking();
34-
}, TImportQueue.ImportFinished);
35-
}, stoppingToken);
18+
await messageService.PublishAsync(new StatusUpdateMessage("Blocking candidates..."));
19+
await CallBlocking();
20+
}, TImportQueue.ImportFinished);
3621
}
3722

3823
private async Task CallBlocking()
3924
{
40-
await matchingDbInsert.InsertCandidates();
41-
await blockingMessageService.BlockingPublishAsync(new BlockingFinishedMessage());
25+
await database.InsertCandidates();
26+
await messageService.PublishAsync(new BlockingFinishedMessage());
4227
}
4328
}
Lines changed: 14 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,38 @@
11
using Cleanup.CleanupServices.LiveCleanup;
2-
32
using Messaging.Interfaces;
43
using Messaging.Messages.MergingMessages.CleanupMessages;
54
using Messaging.Queues;
65
using Microsoft.Extensions.Hosting;
7-
using System.Globalization;
86
using Messaging.Messages.StagingMessages.Offloc;
97
using Messaging.Messages.StagingMessages.Delius;
108

119
namespace Cleanup;
1210

13-
public class CleanupBackgroundService : BackgroundService
11+
public class CleanupBackgroundService(
12+
IMessageService messageService,
13+
DeliusCleanupService deliusCleanup,
14+
OfflocCleanupService offlocCleanup) : BackgroundService
1415
{
15-
private readonly IStagingMessagingService stagingMessagingService;
16-
private readonly IMergingMessagingService mergingMessagingService;
17-
private readonly IDbMessagingService dbMessagingService;
18-
19-
private DeliusCleanupService deliusCleanup;
20-
private OfflocCleanupService offlocCleanup;
21-
22-
private CultureInfo cultureInfo = new CultureInfo("en-GB");
23-
24-
public CleanupBackgroundService(IMergingMessagingService mergingService,
25-
IDbMessagingService dbMessagingService, IStagingMessagingService stagingService,
26-
OfflocCleanupService offlocCleanup, DeliusCleanupService deliusCleanup)
27-
{
28-
this.stagingMessagingService = stagingService;
29-
this.mergingMessagingService = mergingService;
30-
this.deliusCleanup = deliusCleanup;
31-
this.offlocCleanup = offlocCleanup;
32-
this.dbMessagingService = dbMessagingService;
33-
}
34-
3516
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
3617
{
37-
await Task.CompletedTask;
18+
await messageService.SubscribeAsync<DeliusFilesCleanupMessage>(async message =>
19+
{
20+
deliusCleanup.Cleanup(message.FileName);
21+
}, TMergingQueue.DeliusFilesCleanupQueue);
3822

39-
mergingMessagingService.MergingSubscribeAsync<DeliusFilesCleanupMessage>(async(message) =>
40-
await CleanDelius(message), TMergingQueue.DeliusFilesCleanupQueue);
41-
mergingMessagingService.MergingSubscribeAsync<OfflocFilesCleanupMessage>(async(message) =>
42-
await CleanOffloc(message), TMergingQueue.OfflocFilesCleanupQueue);
23+
await messageService.SubscribeAsync<OfflocFilesCleanupMessage>(async message =>
24+
{
25+
offlocCleanup.Cleanup(message.FileName);
26+
}, TMergingQueue.OfflocFilesCleanupQueue);
4327

44-
stagingMessagingService.StagingSubscribeAsync<ClearTemporaryDeliusFiles>(async (message) =>
28+
await messageService.SubscribeAsync<ClearTemporaryDeliusFiles>(async _ =>
4529
{
46-
await Task.CompletedTask;
4730
deliusCleanup.ClearIllegalFiles();
4831
}, TStagingQueue.DeliusFilesClear);
4932

50-
stagingMessagingService.StagingSubscribeAsync<ClearHalfCleanedOfflocFiles>(async (message) =>
33+
await messageService.SubscribeAsync<ClearHalfCleanedOfflocFiles>(async _ =>
5134
{
52-
await Task.CompletedTask;
5335
offlocCleanup.ClearIllegalFiles();
5436
}, TStagingQueue.OfflocFilesClear);
5537
}
56-
57-
private async Task CleanDelius(DeliusFilesCleanupMessage message)
58-
{
59-
await Task.CompletedTask;
60-
61-
deliusCleanup.Cleanup(message.fileName);
62-
}
63-
64-
private async Task CleanOffloc(OfflocFilesCleanupMessage message)
65-
{
66-
await Task.CompletedTask;
67-
68-
offlocCleanup.Cleanup(message.fileName);
69-
}
7038
}

src/Cleanup/CleanupServices/LiveCleanup/OfflocCleanupService.cs

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,11 @@
11
using FileStorage;
2-
using Messaging.Interfaces;
3-
using Messaging.Messages.DbMessages.Receiving;
4-
using Messaging.Messages.DbMessages.Sending;
52

63
namespace Cleanup.CleanupServices.LiveCleanup;
74

8-
public class OfflocCleanupService : CleanupServiceBase
5+
public class OfflocCleanupService(IFileLocations fileLocations) : CleanupServiceBase(fileLocations.offlocInput, fileLocations.offlocOutput)
96
{
10-
private readonly IDbMessagingService dbMessagingService;
11-
12-
public OfflocCleanupService(IFileLocations fileLocations, IDbMessagingService dbMessagingService)
13-
: base(fileLocations.offlocInput, fileLocations.offlocOutput)
14-
{
15-
this.dbMessagingService = dbMessagingService;
16-
}
17-
18-
//A method to make DMS more resilient (by stopping it from trying to process half-cleaned files).
19-
public override void ClearIllegalFiles()
7+
//A method to make DMS more resilient (by stopping it from trying to process half-cleaned files).
8+
public override void ClearIllegalFiles()
209
{
2110
DirectoryInfo di = new DirectoryInfo(inputFolderPath);
2211
var illegalFiles = di.GetFiles("*_clean*");

0 commit comments

Comments
 (0)