Skip to content

Commit ed4133f

Browse files
authored
Merge branch 'master' into add-security-gates
2 parents da8e85a + d989d9f commit ed4133f

File tree

24 files changed

+494
-294
lines changed

24 files changed

+494
-294
lines changed

.github/workflows/scorecard.yml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,17 @@ jobs:
4747
results_format: sarif
4848
publish_results: true
4949

50-
- name: "Upload artifact"
51-
uses: actions/upload-artifact@97a0fba1372883ab732affbe8f94b823f91727db
50+
- name: "Upload artifact"
51+
uses: actions/upload-artifact@65c4c4a1ddee5b72f698fdd19549f0f0fb45cf08 # v4.6.0
5252
with:
5353
name: SARIF file
5454
path: results.sarif
5555
retention-days: 5
5656

57+
# Upload the results to GitHub's code scanning dashboard (optional).
58+
# Commenting out will disable upload of results to your repo's Code Scanning dashboard
5759
- name: "Upload to code-scanning"
58-
uses: github/codeql-action/upload-sarif@v3
60+
uses: github/codeql-action/upload-sarif@f6091c0113d1dcf9b98e269ee48e8a7e51b7bdd4 # v3.28.5
5961
with:
6062
sarif_file: results.sarif
6163

CHANGELOG.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
## v1.5.0 (March 6, 2025)
2+
3+
### Added:
4+
- Circuit breaker implementation for handling consecutive database failures
5+
- New configuration section `CircuitBreakerSettings` with the following options:
6+
- `IsEnabled`: Controls circuit breaker functionality
7+
- `Threshold`: Maximum number of consecutive failures
8+
- `DurationSc`: Duration in seconds for circuit open state
9+
- `HalfOpenMaxAttempts`: Maximum attempts in half-open state
10+
11+
### Changed:
12+
- Enhanced error handling in offset setting operations
13+
- Improved logging for database operation failures
14+
115
## v1.4.0 (December 17, 2024)
216

317
### Added:

PollingOutboxPublisher.sln

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ EndProject
77
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "SolutionItems", "SolutionItems", "{BCF067D7-9518-4177-AD14-C854D96ED0F0}"
88
ProjectSection(SolutionItems) = preProject
99
Dockerfile = Dockerfile
10-
.gitlab-ci.yml = .gitlab-ci.yml
1110
EndProjectSection
1211
EndProject
1312
Global

README.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,15 @@ value).
108108
> If there will be multiple instances, the `MasterPodSettings.IsActive` should be set to `true`. Otherwise, messages can
109109
> be duplicated or not published.
110110
111+
### Circuit Breaker
112+
113+
The application uses circuit breakers to handle database operation failures gracefully. Each daemon (OutboxEventsDaemon and MissingEventsDaemon) has its own circuit breaker that:
114+
115+
1. Opens after `Threshold` consecutive database failures (default: 3)
116+
2. Stays open for `DurationSc` (default: 600 seconds)
117+
3. Allows `HalfOpenMaxAttempts` (default: 1) operations when half-open
118+
4. Automatically closes if operations succeed in half-open state
119+
111120
## Configuration
112121

113122
The application can be configured using the `config.json` and `secret.json` files. Here are the configurations you can
@@ -121,7 +130,7 @@ set:
121130
122131
| **Key** | **Type** | **Description** |
123132
|----------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------|
124-
| `Kafka.ReloadOnChange` | bool | The flag indicating whether the Kafka configuration should be reloaded when the configuration file changes. |
133+
| `Kafka.ReloadOnChange` | bool | The flag indicating whether the Kafka configuration should be reloaded when the configuration file changes. |
125134
| `Kafka.SaslUsername` | string | The username for the SASL authentication of the Kafka cluster. |
126135
| `Kafka.Brokers` | string | The addresses of the Kafka brokers. |
127136
| `Kafka.SaslPassword` | string | The password for the SASL authentication of the Kafka cluster. |
@@ -164,6 +173,10 @@ set:
164173
| `Redis.Config` | string | The configuration for the Redis instance. |
165174
| `Redis.Password` | string | The password for the Redis instance. |
166175
| `Serilog` | object | The configuration for Serilog. |
176+
| `CircuitBreakerSettings.IsEnabled` | bool | Enable/disable the circuit breaker feature. When enabled, protects against database operation failures. |
177+
| `CircuitBreakerSettings.Threshold` | int | Number of consecutive database operation failures before the circuit breaker opens. |
178+
| `CircuitBreakerSettings.DurationSc` | int | Duration in seconds to keep the circuit breaker open before attempting to half-open. |
179+
| `CircuitBreakerSettings.HalfOpenMaxAttempts` | int | Maximum number of attempts allowed when the circuit breaker is in half-open state. |
167180

168181
## EXAMPLE
169182

example/Couchbase/config/config.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,5 +63,11 @@
6363
"Endpoints": "",
6464
"DefaultDatabase": 1,
6565
"Config": ""
66+
},
67+
"CircuitBreakerSettings": {
68+
"IsEnabled": true,
69+
"Threshold": 3,
70+
"DurationSc": 60,
71+
"HalfOpenMaxAttempts": 1
6672
}
6773
}

example/MsSql/config/config.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,11 @@
5757
"Endpoints": "",
5858
"DefaultDatabase": 1,
5959
"Config": ""
60+
},
61+
"CircuitBreakerSettings": {
62+
"IsEnabled": true,
63+
"Threshold": 3,
64+
"DurationSc": 60,
65+
"HalfOpenMaxAttempts": 1
6066
}
6167
}

example/Postgres/config/config.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,11 @@
5757
"Endpoints": "",
5858
"DefaultDatabase": 1,
5959
"Config": ""
60+
},
61+
"CircuitBreakerSettings": {
62+
"IsEnabled": true,
63+
"Threshold": 3,
64+
"DurationSc": 60,
65+
"HalfOpenMaxAttempts": 1
6066
}
6167
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using System.Diagnostics.CodeAnalysis;
2+
3+
namespace PollingOutboxPublisher.ConfigOptions;
4+
5+
[ExcludeFromCodeCoverage]
6+
public class CircuitBreakerSettings
7+
{
8+
public bool IsEnabled { get; set; } = false;
9+
public int Threshold { get; set; } = 3;
10+
public int DurationSc { get; set; } = 600;
11+
public int HalfOpenMaxAttempts { get; set; } = 1;
12+
}

src/Coordinators/MissingCoordinator/MissingEventsCoordinator.cs

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Threading.Tasks;
55
using PollingOutboxPublisher.Coordinators.MissingCoordinator.Interfaces;
66
using PollingOutboxPublisher.Coordinators.MissingCoordinator.Services.Interfaces;
7+
using PollingOutboxPublisher.Coordinators.Services;
78
using PollingOutboxPublisher.Coordinators.Services.Interfaces;
89
using PollingOutboxPublisher.Models;
910

@@ -15,37 +16,46 @@ public class MissingEventsCoordinator : IMissingEventsCoordinator
1516
private readonly IPollingMissingQueue _pollingMissingQueue;
1617
private readonly IMissingEventCleaner _missingEventCleaner;
1718
private readonly IMasterPodChecker _masterPodChecker;
19+
private readonly ICircuitBreaker _circuitBreaker;
1820

1921
public MissingEventsCoordinator(IOutboxDispatcher outboxDispatcher, IPollingMissingQueue pollingMissingQueue,
20-
IMissingEventCleaner missingEventCleaner, IMasterPodChecker masterPodChecker)
22+
IMissingEventCleaner missingEventCleaner, IMasterPodChecker masterPodChecker, ICircuitBreaker circuitBreaker)
2123
{
2224
_outboxDispatcher = outboxDispatcher;
2325
_pollingMissingQueue = pollingMissingQueue;
2426
_missingEventCleaner = missingEventCleaner;
2527
_masterPodChecker = masterPodChecker;
28+
_circuitBreaker = circuitBreaker;
2629
}
2730

2831
/**
29-
* Because of our outbox algorithm and MsSql async transactions, there may be some times when we process last message which
30-
* row id: 500, product-api still may working on the transaction it occurred for id: 450 and not commit it yet. In our EventWorker
31-
* when we mark "500" as last id we processed, we are missing 450 which are committed after we processed 500. For this reason we are dedecting
32-
* these missing ids while processing them in EventWorker and later recover them here. Since event orders are not important for our domain,
33-
* we don't block bulk of messages because there is still missing an event.
34-
* Here is the algorithm;
35-
* 1. EventWorker detects gap in its batch process, e.g: for events (1,2,5,6) missing events are (3,4)
36-
* 2. Missing events are written to MissingOutboxEvents table with the date it is missed
37-
* 3. MissingEventsWorker reads missing events from MissingOutboxEvents table with configured batchSize
38-
* 4. Filter out events which retry time is exceed. Max retry duration is configurable from application.properties
39-
* 5. For each retry time exceed event:
40-
* 5.1. Delete events from MissingOutboxEvents table
41-
* 5.2. Publish event to earth.product.pim.product-event-publisher.missing-events.0 topic for logging purpose
42-
* 6. For each retryable events which retry time is not exceed:
43-
* 6.1. Publish event to corresponding outbox topic
44-
* 6.2. Delete events from MissingOutboxEvents table
32+
* Due to the nature of asynchronous transactions in the outbox pattern, gaps in event sequences can occur.
33+
* For example: When processing event ID 500, event ID 450 might still be in an uncommitted transaction.
34+
* If we mark ID 500 as processed, we could miss ID 450 when it commits later.
35+
*
36+
* Since event ordering is not critical for our domain, we handle these gaps asynchronously:
37+
*
38+
* 1. EventWorker detects sequence gaps during batch processing
39+
* Example: For events [1,2,5,6], missing events are [3,4]
40+
*
41+
* 2. Missing events are recorded in MissingOutboxEvents table with timestamp
42+
*
43+
* 3. MissingEventsWorker polls MissingOutboxEvents table using configured batch size
44+
*
45+
* 4. Events exceeding retry duration (configured in application.properties) are filtered
46+
*
47+
* 5. For expired events:
48+
* - Remove from MissingOutboxEvents table
49+
* - Log to 'missing-events' topic for auditing
50+
*
51+
* 6. For the valid event that has a matching outbox event:
52+
* - Publish to original outbox topic
53+
* - Remove from MissingOutboxEvents table
4554
*/
4655
public async Task StartAsync(CancellationToken cancellationToken)
4756
{
48-
while (!cancellationToken.IsCancellationRequested && await _masterPodChecker.IsMasterPodAsync(cancellationToken))
57+
while (!cancellationToken.IsCancellationRequested &&
58+
await _masterPodChecker.IsMasterPodAsync(cancellationToken))
4959
{
5060
var (retryableMissingEvents, missingEvents, mappedMissingEvents, outboxEvents) =
5161
await _pollingMissingQueue.DequeueAsync(cancellationToken);
@@ -62,7 +72,7 @@ public async Task StartAsync(CancellationToken cancellationToken)
6272
await _missingEventCleaner.CleanMissingEventsHaveOutboxEventAsync(missingEvents,
6373
resultOfMappedMissingEvents.ToList());
6474
}
65-
75+
6676
// move missing events that don't have match
6777
if (missingEvents is not null && outboxEvents is not null)
6878
{
@@ -75,6 +85,9 @@ await _missingEventCleaner.CleanMissingEventsHaveOutboxEventAsync(missingEvents,
7585
{
7686
await _missingEventCleaner.CleanMissingEventsNotHaveOutboxEventAsync(missingEvents);
7787
}
88+
89+
// Reset circuit breaker after successful processing
90+
_circuitBreaker.Reset();
7891
}
7992
}
8093

src/Coordinators/OutboxCoordinator/OutboxCoordinator.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Threading.Tasks;
44
using PollingOutboxPublisher.Coordinators.OutboxCoordinator.Interfaces;
55
using PollingOutboxPublisher.Coordinators.OutboxCoordinator.Services.Interfaces;
6+
using PollingOutboxPublisher.Coordinators.Services;
67
using PollingOutboxPublisher.Coordinators.Services.Interfaces;
78

89
namespace PollingOutboxPublisher.Coordinators.OutboxCoordinator;
@@ -13,14 +14,17 @@ public class OutboxCoordinator : IOutboxCoordinator
1314
private readonly IPollingQueue _pollingQueue;
1415
private readonly IOffsetSetter _offsetSetter;
1516
private readonly IMasterPodChecker _masterPodChecker;
17+
private readonly ICircuitBreaker _circuitBreaker;
1618

1719
public OutboxCoordinator(IOutboxDispatcher outboxDispatcher, IPollingQueue pollingQueue,
18-
IOffsetSetter offsetSetter, IMasterPodChecker masterPodChecker)
20+
IOffsetSetter offsetSetter, IMasterPodChecker masterPodChecker,
21+
ICircuitBreaker circuitBreaker)
1922
{
2023
_outboxDispatcher = outboxDispatcher;
2124
_pollingQueue = pollingQueue;
2225
_offsetSetter = offsetSetter;
2326
_masterPodChecker = masterPodChecker;
27+
_circuitBreaker = circuitBreaker;
2428
}
2529

2630
public async Task StartAsync(CancellationToken cancellationToken)
@@ -34,6 +38,8 @@ public async Task StartAsync(CancellationToken cancellationToken)
3438
await Task.WhenAll(taskToAwait);
3539

3640
await _offsetSetter.SetLatestOffset(outboxEvents);
41+
42+
_circuitBreaker.Reset();
3743
}
3844
}
3945
}

0 commit comments

Comments
 (0)