Skip to content

Commit 61e37dd

Browse files
committed
Message Bus
1 parent 345641e commit 61e37dd

File tree

15 files changed

+92
-43
lines changed

15 files changed

+92
-43
lines changed
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
5+
namespace ClassifiedAds.Domain.Infrastructure.MessageBrokers;
6+
7+
public interface IMessageBus
8+
{
9+
Task SendAsync<T>(T message, MetaData metaData = null, CancellationToken cancellationToken = default)
10+
where T : IMessageBusEvent;
11+
12+
Task ReceiveAsync<TConsumer, T>(Func<T, MetaData, Task> action, CancellationToken cancellationToken = default)
13+
where T : IMessageBusEvent;
14+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
namespace ClassifiedAds.Domain.Infrastructure.MessageBrokers;
2+
3+
public interface IMessageBusEvent
4+
{
5+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
using Microsoft.Extensions.DependencyInjection;
2+
using System;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
6+
namespace ClassifiedAds.Domain.Infrastructure.MessageBrokers;
7+
8+
public class MessageBus : IMessageBus
9+
{
10+
private readonly IServiceProvider _serviceProvider;
11+
12+
public MessageBus(IServiceProvider serviceProvider)
13+
{
14+
_serviceProvider = serviceProvider;
15+
}
16+
17+
public async Task SendAsync<T>(T message, MetaData metaData = null, CancellationToken cancellationToken = default)
18+
where T : IMessageBusEvent
19+
{
20+
await _serviceProvider.GetRequiredService<IMessageSender<T>>().SendAsync(message, metaData, cancellationToken);
21+
}
22+
23+
public async Task ReceiveAsync<TConsumer, T>(Func<T, MetaData, Task> action, CancellationToken cancellationToken = default)
24+
where T : IMessageBusEvent
25+
{
26+
await _serviceProvider.GetRequiredService<IMessageReceiver<TConsumer, T>>().ReceiveAsync(action, cancellationToken);
27+
}
28+
}

src/Microservices/Services.AuditLog/ClassifiedAds.Services.AuditLog/AuditLogModuleServiceCollectionExtensions.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using ClassifiedAds.Domain.Repositories;
1+
using ClassifiedAds.Domain.Infrastructure.MessageBrokers;
2+
using ClassifiedAds.Domain.Repositories;
23
using ClassifiedAds.Services.AuditLog.Authorization;
34
using ClassifiedAds.Services.AuditLog.ConfigurationOptions;
45
using ClassifiedAds.Services.AuditLog.DTOs;
@@ -29,7 +30,8 @@ public static IServiceCollection AddAuditLogModule(this IServiceCollection servi
2930

3031
services.AddAuthorizationPolicies(Assembly.GetExecutingAssembly(), AuthorizationPolicyNames.GetPolicyNames());
3132

32-
services.AddMessageBusReceiver<AuditLogAggregationConsumer, AuditLogCreatedEvent>(appSettings.MessageBroker);
33+
services.AddTransient<IMessageBus, MessageBus>()
34+
.AddMessageBusReceiver<AuditLogAggregationConsumer, AuditLogCreatedEvent>(appSettings.MessageBroker);
3335

3436
return services;
3537
}
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
using ClassifiedAds.Services.AuditLog.Entities;
1+
using ClassifiedAds.Domain.Infrastructure.MessageBrokers;
2+
using ClassifiedAds.Services.AuditLog.Entities;
23

34
namespace ClassifiedAds.Services.AuditLog.DTOs;
45

5-
public class AuditLogCreatedEvent
6+
public class AuditLogCreatedEvent : IMessageBusEvent
67
{
78
public AuditLogEntry AuditLog { get; set; }
89
}

src/Microservices/Services.AuditLog/ClassifiedAds.Services.AuditLog/HostedServices/MessageBusReceiver.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,20 @@ internal class MessageBusReceiver : BackgroundService
1414
{
1515
private readonly ILogger<MessageBusReceiver> _logger;
1616
private readonly IServiceProvider _serviceProvider;
17-
private readonly IMessageReceiver<AuditLogAggregationConsumer, AuditLogCreatedEvent> _auditLogCreatedEventReceiver;
17+
private readonly IMessageBus _messageBus;
1818

1919
public MessageBusReceiver(ILogger<MessageBusReceiver> logger,
2020
IServiceProvider serviceProvider,
21-
IMessageReceiver<AuditLogAggregationConsumer, AuditLogCreatedEvent> auditLogCreatedEventReceiver)
21+
IMessageBus messageBus)
2222
{
2323
_logger = logger;
2424
_serviceProvider = serviceProvider;
25-
_auditLogCreatedEventReceiver = auditLogCreatedEventReceiver;
25+
_messageBus = messageBus;
2626
}
2727

2828
protected override Task ExecuteAsync(CancellationToken stoppingToken)
2929
{
30-
_auditLogCreatedEventReceiver?.ReceiveAsync(async (data, metaData) =>
30+
_messageBus.ReceiveAsync<AuditLogAggregationConsumer, AuditLogCreatedEvent>(async (data, metaData) =>
3131
{
3232
using var scope = _serviceProvider.CreateScope();
3333
await ProcessMessage(scope, data, metaData);

src/Microservices/Services.Product/ClassifiedAds.Services.Product.Api/Commands/PublishEventsCommand.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,19 @@ public class PublishEventsCommandHandler : IRequestHandler<PublishEventsCommand>
2424
private readonly ILogger<PublishEventsCommandHandler> _logger;
2525
private readonly IDateTimeProvider _dateTimeProvider;
2626
private readonly IRepository<OutboxEvent, Guid> _outboxEventRepository;
27-
private readonly IMessageSender<AuditLogCreatedEvent> _auditLogCreatedEventSender;
27+
private readonly IMessageBus _messageBus;
2828
private readonly DaprClient _daprClient;
2929

3030
public PublishEventsCommandHandler(ILogger<PublishEventsCommandHandler> logger,
3131
IDateTimeProvider dateTimeProvider,
3232
IRepository<OutboxEvent, Guid> outboxEventRepository,
33-
IMessageSender<AuditLogCreatedEvent> auditLogCreatedEventSender,
33+
IMessageBus messageBus,
3434
DaprClient daprClient)
3535
{
3636
_logger = logger;
3737
_dateTimeProvider = dateTimeProvider;
3838
_outboxEventRepository = outboxEventRepository;
39-
_auditLogCreatedEventSender = auditLogCreatedEventSender;
39+
_messageBus = messageBus;
4040
_daprClient = daprClient;
4141
}
4242

@@ -53,7 +53,7 @@ public async Task Handle(PublishEventsCommand command, CancellationToken cancell
5353
if (eventLog.EventType == "AUDIT_LOG_ENTRY_CREATED")
5454
{
5555
var logEntry = JsonSerializer.Deserialize<AuditLogEntry>(eventLog.Message);
56-
await _auditLogCreatedEventSender.SendAsync(new AuditLogCreatedEvent { AuditLog = logEntry },
56+
await _messageBus.SendAsync(new AuditLogCreatedEvent { AuditLog = logEntry },
5757
new MetaData
5858
{
5959
MessageId = eventLog.Id.ToString(),
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
using ClassifiedAds.Services.Product.Entities;
1+
using ClassifiedAds.Domain.Infrastructure.MessageBrokers;
2+
using ClassifiedAds.Services.Product.Entities;
23

34
namespace ClassifiedAds.Services.Product.DTOs;
45

5-
public class AuditLogCreatedEvent
6+
public class AuditLogCreatedEvent : IMessageBusEvent
67
{
78
public AuditLogEntry AuditLog { get; set; }
89
}

src/Microservices/Services.Product/ClassifiedAds.Services.Product.Api/ProductModuleServiceCollectionExtensions.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using ClassifiedAds.CrossCuttingConcerns.Csv;
2+
using ClassifiedAds.Domain.Infrastructure.MessageBrokers;
23
using ClassifiedAds.Domain.Repositories;
34
using ClassifiedAds.Infrastructure.Csv;
45
using ClassifiedAds.Infrastructure.Identity;
@@ -46,7 +47,8 @@ public static IServiceCollection AddProductModule(this IServiceCollection servic
4647
services.AddScoped(typeof(ICsvReader<>), typeof(CsvReader<>));
4748
services.AddScoped(typeof(ICsvWriter<>), typeof(CsvWriter<>));
4849

49-
services.AddMessageBusSender<AuditLogCreatedEvent>(appSettings.MessageBroker);
50+
services.AddTransient<IMessageBus, MessageBus>()
51+
.AddMessageBusSender<AuditLogCreatedEvent>(appSettings.MessageBroker);
5052

5153
return services;
5254
}

src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/Commands/PublishEventsCommand.cs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,17 @@ public class PublishEventsCommandHandler : ICommandHandler<PublishEventsCommand>
2323
private readonly ILogger<PublishEventsCommandHandler> _logger;
2424
private readonly IDateTimeProvider _dateTimeProvider;
2525
private readonly IRepository<OutboxEvent, Guid> _outboxEventRepository;
26-
private readonly IMessageSender<FileUploadedEvent> _fileUploadedEventSender;
27-
private readonly IMessageSender<FileDeletedEvent> _fileDeletedEventSender;
28-
private readonly IMessageSender<AuditLogCreatedEvent> _auditLogCreatedEventSender;
26+
private readonly IMessageBus _messageBus;
2927

3028
public PublishEventsCommandHandler(ILogger<PublishEventsCommandHandler> logger,
3129
IDateTimeProvider dateTimeProvider,
3230
IRepository<OutboxEvent, Guid> outboxEventRepository,
33-
IMessageSender<FileUploadedEvent> fileUploadedEventSender,
34-
IMessageSender<FileDeletedEvent> fileDeletedEventSender,
35-
IMessageSender<AuditLogCreatedEvent> auditLogCreatedEventSender)
31+
IMessageBus messageBus)
3632
{
3733
_logger = logger;
3834
_dateTimeProvider = dateTimeProvider;
3935
_outboxEventRepository = outboxEventRepository;
40-
_fileUploadedEventSender = fileUploadedEventSender;
41-
_fileDeletedEventSender = fileDeletedEventSender;
42-
_auditLogCreatedEventSender = auditLogCreatedEventSender;
36+
_messageBus = messageBus;
4337
}
4438

4539
public async Task HandleAsync(PublishEventsCommand command, CancellationToken cancellationToken = default)
@@ -54,16 +48,16 @@ public async Task HandleAsync(PublishEventsCommand command, CancellationToken ca
5448
{
5549
if (eventLog.EventType == "FILEENTRY_CREATED")
5650
{
57-
await _fileUploadedEventSender.SendAsync(new FileUploadedEvent { FileEntry = JsonSerializer.Deserialize<FileEntry>(eventLog.Message) });
51+
await _messageBus.SendAsync(new FileUploadedEvent { FileEntry = JsonSerializer.Deserialize<FileEntry>(eventLog.Message) });
5852
}
5953
else if (eventLog.EventType == "FILEENTRY_DELETED")
6054
{
61-
await _fileDeletedEventSender.SendAsync(new FileDeletedEvent { FileEntry = JsonSerializer.Deserialize<FileEntry>(eventLog.Message) });
55+
await _messageBus.SendAsync(new FileDeletedEvent { FileEntry = JsonSerializer.Deserialize<FileEntry>(eventLog.Message) });
6256
}
6357
else if (eventLog.EventType == "AUDIT_LOG_ENTRY_CREATED")
6458
{
6559
var logEntry = JsonSerializer.Deserialize<AuditLogEntry>(eventLog.Message);
66-
await _auditLogCreatedEventSender.SendAsync(new AuditLogCreatedEvent { AuditLog = logEntry },
60+
await _messageBus.SendAsync(new AuditLogCreatedEvent { AuditLog = logEntry },
6761
new MetaData
6862
{
6963
MessageId = eventLog.Id.ToString(),

0 commit comments

Comments
 (0)