Skip to content

Commit 7d14a75

Browse files
authored
Fix/17 circuit braker event loop (#18)
* refactor: improve error handling and logging in OutboxDispatcher * feat: add CircuitBreakerSettings configuration options * feat: implement Circuit Breaker pattern for database operation resilience * feat: enhance Circuit Breaker with logging and configuration updates * test: update tests to assert no exceptions are thrown during event processing * refactor: improve documentation and clarity in MissingEventsCoordinator * chore: update CHANGELOG for v1.5.0 release with new Circuit Breaker features and enhancements * refactor: update Outbox event keys in configuration for consistency
1 parent abfe19e commit 7d14a75

File tree

23 files changed

+488
-290
lines changed

23 files changed

+488
-290
lines changed

CHANGELOG.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
## v1.5.0 (March 6, 2025)
2+
3+
### Added:
4+
- Circuit breaker implementation for handling consecutive database failures
5+
- New configuration section `CircuitBreakerSettings` with the following options:
6+
- `IsEnabled`: Controls circuit breaker functionality
7+
- `Threshold`: Maximum number of consecutive failures
8+
- `DurationSc`: Duration in seconds for circuit open state
9+
- `HalfOpenMaxAttempts`: Maximum attempts in half-open state
10+
11+
### Changed:
12+
- Enhanced error handling in offset setting operations
13+
- Improved logging for database operation failures
14+
115
## v1.4.0 (December 17, 2024)
216

317
### Added:

PollingOutboxPublisher.sln

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ EndProject
77
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "SolutionItems", "SolutionItems", "{BCF067D7-9518-4177-AD14-C854D96ED0F0}"
88
ProjectSection(SolutionItems) = preProject
99
Dockerfile = Dockerfile
10-
.gitlab-ci.yml = .gitlab-ci.yml
1110
EndProjectSection
1211
EndProject
1312
Global

README.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,15 @@ value).
106106
> If there will be multiple instances, the `MasterPodSettings.IsActive` should be set to `true`. Otherwise, messages can
107107
> be duplicated or not published.
108108
109+
### Circuit Breaker
110+
111+
The application uses circuit breakers to handle database operation failures gracefully. Each daemon (OutboxEventsDaemon and MissingEventsDaemon) has its own circuit breaker that:
112+
113+
1. Opens after `Threshold` consecutive database failures (default: 3)
114+
2. Stays open for `DurationMs` (default: 30 seconds)
115+
3. Allows `HalfOpenMaxAttempts` (default: 1) operations when half-open
116+
4. Automatically closes if operations succeed in half-open state
117+
109118
## Configuration
110119

111120
The application can be configured using the `config.json` and `secret.json` files. Here are the configurations you can
@@ -119,7 +128,7 @@ set:
119128
120129
| **Key** | **Type** | **Description** |
121130
|----------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------|
122-
| `Kafka.ReloadOnChange` | bool | The flag indicating whether the Kafka configuration should be reloaded when the configuration file changes. |
131+
| `Kafka.ReloadOnChange` | bool | The flag indicating whether the Kafka configuration should be reloaded when the configuration file changes. |
123132
| `Kafka.SaslUsername` | string | The username for the SASL authentication of the Kafka cluster. |
124133
| `Kafka.Brokers` | string | The addresses of the Kafka brokers. |
125134
| `Kafka.SaslPassword` | string | The password for the SASL authentication of the Kafka cluster. |
@@ -162,6 +171,10 @@ set:
162171
| `Redis.Config` | string | The configuration for the Redis instance. |
163172
| `Redis.Password` | string | The password for the Redis instance. |
164173
| `Serilog` | object | The configuration for Serilog. |
174+
| `CircuitBreakerSettings.IsEnabled` | bool | Enable/disable the circuit breaker feature. When enabled, protects against database operation failures. |
175+
| `CircuitBreakerSettings.Threshold` | int | Number of consecutive database operation failures before the circuit breaker opens. |
176+
| `CircuitBreakerSettings.DurationSc` | int | Duration in seconds to keep the circuit breaker open before attempting to half-open. |
177+
| `CircuitBreakerSettings.HalfOpenMaxAttempts` | int | Maximum number of attempts allowed when the circuit breaker is in half-open state. |
165178

166179
## EXAMPLE
167180

example/Couchbase/config/config.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,5 +63,11 @@
6363
"Endpoints": "",
6464
"DefaultDatabase": 1,
6565
"Config": ""
66+
},
67+
"CircuitBreakerSettings": {
68+
"IsEnabled": true,
69+
"Threshold": 3,
70+
"DurationSc": 60,
71+
"HalfOpenMaxAttempts": 1
6672
}
6773
}

example/MsSql/config/config.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,11 @@
5757
"Endpoints": "",
5858
"DefaultDatabase": 1,
5959
"Config": ""
60+
},
61+
"CircuitBreakerSettings": {
62+
"IsEnabled": true,
63+
"Threshold": 3,
64+
"DurationSc": 60,
65+
"HalfOpenMaxAttempts": 1
6066
}
6167
}

example/Postgres/config/config.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,11 @@
5757
"Endpoints": "",
5858
"DefaultDatabase": 1,
5959
"Config": ""
60+
},
61+
"CircuitBreakerSettings": {
62+
"IsEnabled": true,
63+
"Threshold": 3,
64+
"DurationSc": 60,
65+
"HalfOpenMaxAttempts": 1
6066
}
6167
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using System.Diagnostics.CodeAnalysis;
2+
3+
namespace PollingOutboxPublisher.ConfigOptions;
4+
5+
[ExcludeFromCodeCoverage]
6+
public class CircuitBreakerSettings
7+
{
8+
public bool IsEnabled { get; set; } = false;
9+
public int Threshold { get; set; } = 3;
10+
public int DurationSc { get; set; } = 600;
11+
public int HalfOpenMaxAttempts { get; set; } = 1;
12+
}

src/Coordinators/MissingCoordinator/MissingEventsCoordinator.cs

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Threading.Tasks;
55
using PollingOutboxPublisher.Coordinators.MissingCoordinator.Interfaces;
66
using PollingOutboxPublisher.Coordinators.MissingCoordinator.Services.Interfaces;
7+
using PollingOutboxPublisher.Coordinators.Services;
78
using PollingOutboxPublisher.Coordinators.Services.Interfaces;
89
using PollingOutboxPublisher.Models;
910

@@ -15,37 +16,46 @@ public class MissingEventsCoordinator : IMissingEventsCoordinator
1516
private readonly IPollingMissingQueue _pollingMissingQueue;
1617
private readonly IMissingEventCleaner _missingEventCleaner;
1718
private readonly IMasterPodChecker _masterPodChecker;
19+
private readonly ICircuitBreaker _circuitBreaker;
1820

1921
public MissingEventsCoordinator(IOutboxDispatcher outboxDispatcher, IPollingMissingQueue pollingMissingQueue,
20-
IMissingEventCleaner missingEventCleaner, IMasterPodChecker masterPodChecker)
22+
IMissingEventCleaner missingEventCleaner, IMasterPodChecker masterPodChecker, ICircuitBreaker circuitBreaker)
2123
{
2224
_outboxDispatcher = outboxDispatcher;
2325
_pollingMissingQueue = pollingMissingQueue;
2426
_missingEventCleaner = missingEventCleaner;
2527
_masterPodChecker = masterPodChecker;
28+
_circuitBreaker = circuitBreaker;
2629
}
2730

2831
/**
29-
* Because of our outbox algorithm and MsSql async transactions, there may be some times when we process last message which
30-
* row id: 500, product-api still may working on the transaction it occurred for id: 450 and not commit it yet. In our EventWorker
31-
* when we mark "500" as last id we processed, we are missing 450 which are committed after we processed 500. For this reason we are dedecting
32-
* these missing ids while processing them in EventWorker and later recover them here. Since event orders are not important for our domain,
33-
* we don't block bulk of messages because there is still missing an event.
34-
* Here is the algorithm;
35-
* 1. EventWorker detects gap in its batch process, e.g: for events (1,2,5,6) missing events are (3,4)
36-
* 2. Missing events are written to MissingOutboxEvents table with the date it is missed
37-
* 3. MissingEventsWorker reads missing events from MissingOutboxEvents table with configured batchSize
38-
* 4. Filter out events which retry time is exceed. Max retry duration is configurable from application.properties
39-
* 5. For each retry time exceed event:
40-
* 5.1. Delete events from MissingOutboxEvents table
41-
* 5.2. Publish event to earth.product.pim.product-event-publisher.missing-events.0 topic for logging purpose
42-
* 6. For each retryable events which retry time is not exceed:
43-
* 6.1. Publish event to corresponding outbox topic
44-
* 6.2. Delete events from MissingOutboxEvents table
32+
* Due to the nature of asynchronous transactions in the outbox pattern, gaps in event sequences can occur.
33+
* For example: When processing event ID 500, event ID 450 might still be in an uncommitted transaction.
34+
* If we mark ID 500 as processed, we could miss ID 450 when it commits later.
35+
*
36+
* Since event ordering is not critical for our domain, we handle these gaps asynchronously:
37+
*
38+
* 1. EventWorker detects sequence gaps during batch processing
39+
* Example: For events [1,2,5,6], missing events are [3,4]
40+
*
41+
* 2. Missing events are recorded in MissingOutboxEvents table with timestamp
42+
*
43+
* 3. MissingEventsWorker polls MissingOutboxEvents table using configured batch size
44+
*
45+
* 4. Events exceeding retry duration (configured in application.properties) are filtered
46+
*
47+
* 5. For expired events:
48+
* - Remove from MissingOutboxEvents table
49+
* - Log to 'missing-events' topic for auditing
50+
*
51+
* 6. For the valid event that has a matching outbox event:
52+
* - Publish to original outbox topic
53+
* - Remove from MissingOutboxEvents table
4554
*/
4655
public async Task StartAsync(CancellationToken cancellationToken)
4756
{
48-
while (!cancellationToken.IsCancellationRequested && await _masterPodChecker.IsMasterPodAsync(cancellationToken))
57+
while (!cancellationToken.IsCancellationRequested &&
58+
await _masterPodChecker.IsMasterPodAsync(cancellationToken))
4959
{
5060
var (retryableMissingEvents, missingEvents, mappedMissingEvents, outboxEvents) =
5161
await _pollingMissingQueue.DequeueAsync(cancellationToken);
@@ -62,7 +72,7 @@ public async Task StartAsync(CancellationToken cancellationToken)
6272
await _missingEventCleaner.CleanMissingEventsHaveOutboxEventAsync(missingEvents,
6373
resultOfMappedMissingEvents.ToList());
6474
}
65-
75+
6676
// move missing events that don't have match
6777
if (missingEvents is not null && outboxEvents is not null)
6878
{
@@ -75,6 +85,9 @@ await _missingEventCleaner.CleanMissingEventsHaveOutboxEventAsync(missingEvents,
7585
{
7686
await _missingEventCleaner.CleanMissingEventsNotHaveOutboxEventAsync(missingEvents);
7787
}
88+
89+
// Reset circuit breaker after successful processing
90+
_circuitBreaker.Reset();
7891
}
7992
}
8093

src/Coordinators/OutboxCoordinator/OutboxCoordinator.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Threading.Tasks;
44
using PollingOutboxPublisher.Coordinators.OutboxCoordinator.Interfaces;
55
using PollingOutboxPublisher.Coordinators.OutboxCoordinator.Services.Interfaces;
6+
using PollingOutboxPublisher.Coordinators.Services;
67
using PollingOutboxPublisher.Coordinators.Services.Interfaces;
78

89
namespace PollingOutboxPublisher.Coordinators.OutboxCoordinator;
@@ -13,14 +14,17 @@ public class OutboxCoordinator : IOutboxCoordinator
1314
private readonly IPollingQueue _pollingQueue;
1415
private readonly IOffsetSetter _offsetSetter;
1516
private readonly IMasterPodChecker _masterPodChecker;
17+
private readonly ICircuitBreaker _circuitBreaker;
1618

1719
public OutboxCoordinator(IOutboxDispatcher outboxDispatcher, IPollingQueue pollingQueue,
18-
IOffsetSetter offsetSetter, IMasterPodChecker masterPodChecker)
20+
IOffsetSetter offsetSetter, IMasterPodChecker masterPodChecker,
21+
ICircuitBreaker circuitBreaker)
1922
{
2023
_outboxDispatcher = outboxDispatcher;
2124
_pollingQueue = pollingQueue;
2225
_offsetSetter = offsetSetter;
2326
_masterPodChecker = masterPodChecker;
27+
_circuitBreaker = circuitBreaker;
2428
}
2529

2630
public async Task StartAsync(CancellationToken cancellationToken)
@@ -34,6 +38,8 @@ public async Task StartAsync(CancellationToken cancellationToken)
3438
await Task.WhenAll(taskToAwait);
3539

3640
await _offsetSetter.SetLatestOffset(outboxEvents);
41+
42+
_circuitBreaker.Reset();
3743
}
3844
}
3945
}

src/Coordinators/OutboxCoordinator/Services/OffsetSetter.cs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System;
12
using System.Diagnostics.CodeAnalysis;
23
using System.Linq;
34
using System.Threading.Tasks;
@@ -6,6 +7,7 @@
67
using PollingOutboxPublisher.Coordinators.OutboxCoordinator.Services.Interfaces;
78
using PollingOutboxPublisher.Database.Repositories.Interfaces;
89
using PollingOutboxPublisher.Models;
10+
using PollingOutboxPublisher.Exceptions;
911

1012
namespace PollingOutboxPublisher.Coordinators.OutboxCoordinator.Services;
1113

@@ -24,8 +26,16 @@ public OffsetSetter(ILogger<OffsetSetter> logger, IOutboxOffsetRepository outbox
2426
[Trace]
2527
public async Task SetLatestOffset(OutboxEvent[] items)
2628
{
27-
var latestOffSet = items.Max(row => row.Id); // or newestEventId
28-
await _outboxOffsetRepository.UpdateOffsetAsync(latestOffSet);
29-
_logger.LogInformation("LatestOffSet: {LatestOffSet}", latestOffSet);
29+
try
30+
{
31+
var latestOffSet = items.Max(row => row.Id);
32+
await _outboxOffsetRepository.UpdateOffsetAsync(latestOffSet);
33+
_logger.LogInformation("LatestOffSet: {LatestOffSet}", latestOffSet);
34+
}
35+
catch (Exception ex)
36+
{
37+
_logger.LogError(ex, "Failed to update offset in database");
38+
throw new DatabaseOperationException("Failed to update offset", ex);
39+
}
3040
}
3141
}

0 commit comments

Comments
 (0)