Skip to content
This repository was archived by the owner on Nov 17, 2023. It is now read-only.

Commit 0e8d07d

Browse files
authored
Merge pull request #1034 from dotnet-architecture/fix/1022-handle-concurrent-integration-event-publishing
Handle concurrent integration events publishing
2 parents ec0c15a + 61ecfba commit 0e8d07d

20 files changed

+261
-36
lines changed

docker-compose.override.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ services:
110110
- ApplicationInsights__InstrumentationKey=${INSTRUMENTATION_KEY}
111111
- OrchestratorType=${ORCHESTRATOR_TYPE}
112112
- UseLoadTest=${USE_LOADTEST:-False}
113+
- Serilog__MinimumLevel__Override__Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ=Verbose
113114
ports:
114115
- "5102:80" # Important: In a production environment your should remove the external port (5102) kept here for microservice debugging purposes.
115116
# The API Gateway redirects and access through the internal port (80).
@@ -130,6 +131,7 @@ services:
130131
- ApplicationInsights__InstrumentationKey=${INSTRUMENTATION_KEY}
131132
- OrchestratorType=${ORCHESTRATOR_TYPE}
132133
- UseLoadTest=${USE_LOADTEST:-False}
134+
- Serilog__MinimumLevel__Override__Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ=Verbose
133135
ports:
134136
- "5111:80"
135137

@@ -168,6 +170,8 @@ services:
168170
- AzureServiceBusEnabled=False
169171
- ApplicationInsights__InstrumentationKey=${INSTRUMENTATION_KEY}
170172
- OrchestratorType=${ORCHESTRATOR_TYPE}
173+
- Serilog__MinimumLevel__Override__Payment.API.IntegrationEvents.EventHandling=Verbose
174+
- Serilog__MinimumLevel__Override__Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ=Verbose
171175
ports:
172176
- "5108:80" # Important: In a production environment your should remove the external port (5108) kept here for microservice debugging purposes.
173177
# The API Gateway redirects and access through the internal port (80).

scripts/restore-packages

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
echo RESTORING ALL PACKAGES...; for f in /src/csproj-files/*.csproj; do dotnet restore $f; done
1+
echo RESTORING ALL PACKAGES...; for f in /src/csproj-files/*.csproj; do dotnet restore $f; done

src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,16 @@ public void Publish(IntegrationEvent @event)
8080
_logger.LogWarning(ex, "Could not publish event: {EventId} after {Timeout}s ({ExceptionMessage})", @event.Id, $"{time.TotalSeconds:n1}", ex.Message);
8181
});
8282

83+
var eventName = @event.GetType().Name;
84+
85+
_logger.LogTrace("Creating RabbitMQ channel to publish event: {EventId} ({EventName})", @event.Id, eventName);
86+
8387
using (var channel = _persistentConnection.CreateModel())
8488
{
85-
var eventName = @event.GetType()
86-
.Name;
8789

88-
channel.ExchangeDeclare(exchange: BROKER_NAME,
89-
type: "direct");
90+
_logger.LogTrace("Declaring RabbitMQ exchange to publish event: {EventId}", @event.Id);
91+
92+
channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");
9093

9194
var message = JsonConvert.SerializeObject(@event);
9295
var body = Encoding.UTF8.GetBytes(message);
@@ -96,11 +99,14 @@ public void Publish(IntegrationEvent @event)
9699
var properties = channel.CreateBasicProperties();
97100
properties.DeliveryMode = 2; // persistent
98101

99-
channel.BasicPublish(exchange: BROKER_NAME,
100-
routingKey: eventName,
101-
mandatory: true,
102-
basicProperties: properties,
103-
body: body);
102+
_logger.LogTrace("Publishing event to RabbitMQ: {EventId}", @event.Id);
103+
104+
channel.BasicPublish(
105+
exchange: BROKER_NAME,
106+
routingKey: eventName,
107+
mandatory: true,
108+
basicProperties: properties,
109+
body: body);
104110
});
105111
}
106112
}
@@ -176,6 +182,8 @@ public void Dispose()
176182

177183
private void StartBasicConsume()
178184
{
185+
_logger.LogTrace("Starting RabbitMQ basic consume");
186+
179187
if (_consumerChannel != null)
180188
{
181189
var consumer = new AsyncEventingBasicConsumer(_consumerChannel);
@@ -225,6 +233,8 @@ private IModel CreateConsumerChannel()
225233
_persistentConnection.TryConnect();
226234
}
227235

236+
_logger.LogTrace("Creating RabbitMQ consumer channel");
237+
228238
var channel = _persistentConnection.CreateModel();
229239

230240
channel.ExchangeDeclare(exchange: BROKER_NAME,
@@ -238,6 +248,8 @@ private IModel CreateConsumerChannel()
238248

239249
channel.CallbackException += (sender, ea) =>
240250
{
251+
_logger.LogWarning(ea.Exception, "Recreating RabbitMQ consumer channel");
252+
241253
_consumerChannel.Dispose();
242254
_consumerChannel = CreateConsumerChannel();
243255
StartBasicConsume();
@@ -248,6 +260,8 @@ private IModel CreateConsumerChannel()
248260

249261
private async Task ProcessEvent(string eventName, string message)
250262
{
263+
_logger.LogTrace("Processing RabbitMQ event: {EventName}", eventName);
264+
251265
if (_subsManager.HasSubscriptionsForEvent(eventName))
252266
{
253267
using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
@@ -274,6 +288,10 @@ private async Task ProcessEvent(string eventName, string message)
274288
}
275289
}
276290
}
291+
else
292+
{
293+
_logger.LogWarning("No subscription for RabbitMQ event: {EventName}", eventName);
294+
}
277295
}
278296
}
279297
}

src/BuildingBlocks/EventBus/IntegrationEventLogEF/IntegrationEventLogEntry.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,17 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF
1212
public class IntegrationEventLogEntry
1313
{
1414
private IntegrationEventLogEntry() { }
15-
public IntegrationEventLogEntry(IntegrationEvent @event)
15+
public IntegrationEventLogEntry(IntegrationEvent @event, Guid transactionId)
1616
{
1717
EventId = @event.Id;
1818
CreationTime = @event.CreationDate;
1919
EventTypeName = @event.GetType().FullName;
2020
Content = JsonConvert.SerializeObject(@event);
2121
State = EventStateEnum.NotPublished;
2222
TimesSent = 0;
23+
TransactionId = transactionId.ToString();
2324
}
25+
2426
public Guid EventId { get; private set; }
2527
public string EventTypeName { get; private set; }
2628
[NotMapped]
@@ -31,6 +33,7 @@ public IntegrationEventLogEntry(IntegrationEvent @event)
3133
public int TimesSent { get; set; }
3234
public DateTime CreationTime { get; private set; }
3335
public string Content { get; private set; }
36+
public string TransactionId { get; private set; }
3437

3538
public IntegrationEventLogEntry DeserializeJsonContent(Type type)
3639
{

src/BuildingBlocks/EventBus/IntegrationEventLogEF/Services/IIntegrationEventLogService.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
1+
using Microsoft.EntityFrameworkCore.Storage;
2+
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
23
using System;
34
using System.Collections.Generic;
45
using System.Data.Common;
@@ -9,8 +10,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Servi
910
{
1011
public interface IIntegrationEventLogService
1112
{
12-
Task<IEnumerable<IntegrationEventLogEntry>> RetrieveEventLogsPendingToPublishAsync();
13-
Task SaveEventAsync(IntegrationEvent @event, DbTransaction transaction);
13+
Task<IEnumerable<IntegrationEventLogEntry>> RetrieveEventLogsPendingToPublishAsync(Guid transactionId);
14+
Task SaveEventAsync(IntegrationEvent @event, IDbContextTransaction transaction);
1415
Task MarkEventAsPublishedAsync(Guid eventId);
1516
Task MarkEventAsInProgressAsync(Guid eventId);
1617
Task MarkEventAsFailedAsync(Guid eventId);

src/BuildingBlocks/EventBus/IntegrationEventLogEF/Services/IntegrationEventLogService.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using Microsoft.EntityFrameworkCore;
22
using Microsoft.EntityFrameworkCore.Diagnostics;
3+
using Microsoft.EntityFrameworkCore.Storage;
34
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
45
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
56
using Newtonsoft.Json;
@@ -34,25 +35,24 @@ public IntegrationEventLogService(DbConnection dbConnection)
3435
.ToList();
3536
}
3637

37-
public async Task<IEnumerable<IntegrationEventLogEntry>> RetrieveEventLogsPendingToPublishAsync()
38+
public async Task<IEnumerable<IntegrationEventLogEntry>> RetrieveEventLogsPendingToPublishAsync(Guid transactionId)
3839
{
40+
var tid = transactionId.ToString();
41+
3942
return await _integrationEventLogContext.IntegrationEventLogs
40-
.Where(e => e.State == EventStateEnum.NotPublished)
43+
.Where(e => e.TransactionId == tid && e.State == EventStateEnum.NotPublished)
4144
.OrderBy(o => o.CreationTime)
4245
.Select(e => e.DeserializeJsonContent(_eventTypes.Find(t=> t.Name == e.EventTypeShortName)))
4346
.ToListAsync();
4447
}
4548

46-
public Task SaveEventAsync(IntegrationEvent @event, DbTransaction transaction)
49+
public Task SaveEventAsync(IntegrationEvent @event, IDbContextTransaction transaction)
4750
{
48-
if (transaction == null)
49-
{
50-
throw new ArgumentNullException(nameof(transaction), $"A {typeof(DbTransaction).FullName} is required as a pre-requisite to save the event.");
51-
}
51+
if (transaction == null) throw new ArgumentNullException(nameof(transaction));
5252

53-
var eventLogEntry = new IntegrationEventLogEntry(@event);
53+
var eventLogEntry = new IntegrationEventLogEntry(@event, transaction.TransactionId);
5454

55-
_integrationEventLogContext.Database.UseTransaction(transaction);
55+
_integrationEventLogContext.Database.UseTransaction(transaction.GetDbTransaction());
5656
_integrationEventLogContext.IntegrationEventLogs.Add(eventLogEntry);
5757

5858
return _integrationEventLogContext.SaveChangesAsync();

src/Services/Catalog/Catalog.API/Infrastructure/IntegrationEventMigrations/20190507184807_AddTransactionId.Designer.cs

Lines changed: 50 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
using Microsoft.EntityFrameworkCore.Migrations;
2+
3+
namespace Catalog.API.Infrastructure.IntegrationEventMigrations
4+
{
5+
public partial class AddTransactionId : Migration
6+
{
7+
protected override void Up(MigrationBuilder migrationBuilder)
8+
{
9+
migrationBuilder.AddColumn<string>(
10+
name: "TransactionId",
11+
table: "IntegrationEventLog",
12+
nullable: true);
13+
}
14+
15+
protected override void Down(MigrationBuilder migrationBuilder)
16+
{
17+
migrationBuilder.DropColumn(
18+
name: "TransactionId",
19+
table: "IntegrationEventLog");
20+
}
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
using Microsoft.EntityFrameworkCore;
2+
using Microsoft.EntityFrameworkCore.Design;
3+
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF;
4+
5+
namespace Catalog.API.Infrastructure.IntegrationEventMigrations
6+
{
7+
public class IntegrationEventLogContextDesignTimeFactory : IDesignTimeDbContextFactory<IntegrationEventLogContext>
8+
{
9+
public IntegrationEventLogContext CreateDbContext(string[] args)
10+
{
11+
var optionsBuilder = new DbContextOptionsBuilder<IntegrationEventLogContext>();
12+
13+
optionsBuilder.UseSqlServer(".", options => options.MigrationsAssembly(GetType().Assembly.GetName().Name));
14+
15+
return new IntegrationEventLogContext(optionsBuilder.Options);
16+
}
17+
}
18+
}

src/Services/Catalog/Catalog.API/Infrastructure/IntegrationEventMigrations/IntegrationEventLogContextModelSnapshot.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
using System;
1+
// <auto-generated />
2+
using System;
23
using Microsoft.EntityFrameworkCore;
34
using Microsoft.EntityFrameworkCore.Infrastructure;
45
using Microsoft.EntityFrameworkCore.Metadata;
5-
using Microsoft.EntityFrameworkCore.Migrations;
6+
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
67
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF;
78

89
namespace Catalog.API.Migrations
@@ -12,8 +13,10 @@ partial class IntegrationEventLogContextModelSnapshot : ModelSnapshot
1213
{
1314
protected override void BuildModel(ModelBuilder modelBuilder)
1415
{
16+
#pragma warning disable 612, 618
1517
modelBuilder
16-
.HasAnnotation("ProductVersion", "1.1.1")
18+
.HasAnnotation("ProductVersion", "2.2.3-servicing-35854")
19+
.HasAnnotation("Relational:MaxIdentifierLength", 128)
1720
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
1821

1922
modelBuilder.Entity("Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.IntegrationEventLogEntry", b =>
@@ -33,10 +36,13 @@ protected override void BuildModel(ModelBuilder modelBuilder)
3336

3437
b.Property<int>("TimesSent");
3538

39+
b.Property<string>("TransactionId");
40+
3641
b.HasKey("EventId");
3742

3843
b.ToTable("IntegrationEventLog");
3944
});
45+
#pragma warning restore 612, 618
4046
}
4147
}
4248
}

0 commit comments

Comments
 (0)