Skip to content

Commit 22bf6f5

Browse files
committed
PRR improvements: Throttling, resilience, circuit breaker
1 parent a53c327 commit 22bf6f5

File tree

15 files changed

+523
-86
lines changed

15 files changed

+523
-86
lines changed

src/KeeperData.Bridge.PerformanceTests/PerformanceTests.cs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,6 @@
3030

3131
namespace KeeperData.Bridge.PerformanceTests;
3232

33-
/// <summary>
34-
/// Performance tests for the complete import pipeline.
35-
/// WARNING: These tests are resource-intensive and designed for local execution only.
36-
/// They should NOT run in CI/CD pipelines.
37-
/// </summary>
3833
[Trait("testtype", "performance")]
3934
public class PerformanceTests : IAsyncLifetime
4035
{
@@ -315,7 +310,11 @@ private ServiceProvider ConfigureServices(DataSetDefinition dataSetDefinition)
315310
services.AddSingleton<IOptions<IDatabaseConfig>>(Options.Create<IDatabaseConfig>(mongoConfig));
316311
services.AddSingleton(_mongoClient!);
317312

318-
// Dataset definitions
313+
var resilenceSection = configuration.GetSection("MongoResilience");
314+
services.Configure<KeeperData.Core.Database.Configuration.MongoResilienceConfig>(resilenceSection);
315+
316+
services.AddSingleton<KeeperData.Core.Database.Resilience.ResilientMongoOperations>();
317+
319318
var dataSetDefinitions = new DataSetDefinitions
320319
{
321320
SamCPHHolding = dataSetDefinition,
@@ -328,7 +327,6 @@ private ServiceProvider ConfigureServices(DataSetDefinition dataSetDefinition)
328327
};
329328
services.AddSingleton<Core.ETL.Abstract.IDataSetDefinitions>(dataSetDefinitions);
330329

331-
// Storage configuration
332330
var storageConfig = new KeeperData.Infrastructure.Storage.Configuration.StorageConfiguration
333331
{
334332
ExternalStorage = new KeeperData.Infrastructure.Storage.Configuration.StorageWithCredentialsConfiguration

src/KeeperData.Bridge/Controllers/ImportController.cs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,7 @@ public async Task<IActionResult> StartBulkImport([FromQuery] string sourceType =
9696
[HttpGet]
9797
[ProducesResponseType(typeof(ImportSummariesResponse), StatusCodes.Status200OK)]
9898
[ProducesResponseType(StatusCodes.Status400BadRequest)]
99-
public async Task<IActionResult> GetImportSummaries(
100-
[FromQuery] int skip = 0,
101-
[FromQuery] int top = 10,
102-
CancellationToken cancellationToken = default)
99+
public async Task<IActionResult> GetImportSummaries([FromQuery] int skip = 0, [FromQuery] int top = 10, CancellationToken cancellationToken = default)
103100
{
104101
logger.LogInformation("Received request to get import summaries with skip={skip}, top={top}", skip, top);
105102

src/KeeperData.Bridge/Setup/ServiceCollectionExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public static void ConfigureApi(this IServiceCollection services, IConfiguration
4848

4949
services.AddStorageDependencies(configuration);
5050

51-
services.AddEtlDependencies();
51+
services.AddEtlDependencies(configuration);
5252

5353
services.AddCrypto(configuration);
5454

src/KeeperData.Bridge/appsettings.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,16 @@
55
"EnableTransactions": false,
66
"HealthcheckEnabled": true
77
},
8+
"MongoResilience": {
9+
"MaxRetryAttempts": 3,
10+
"InitialDelayMs": 500,
11+
"TimeoutSeconds": 30,
12+
"EnableCircuitBreaker": true,
13+
"CircuitBreakerFailureRatio": 0.5,
14+
"CircuitBreakerMinimumThroughput": 10,
15+
"CircuitBreakerBreakDurationSeconds": 30,
16+
"UseJitter": true
17+
},
818
"AllowedHosts": "*",
919
"TraceHeader": "x-cdp-request-id",
1020
"Serilog": {
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
namespace KeeperData.Core.Database.Configuration;
2+
3+
/// <summary>
4+
/// Configuration for MongoDB resilience policies including retry and circuit breaker settings.
5+
/// </summary>
6+
public record MongoResilienceConfig
7+
{
8+
/// <summary>
9+
/// Maximum number of retry attempts for transient MongoDB failures.
10+
/// Default: 3 attempts
11+
/// </summary>
12+
public int MaxRetryAttempts { get; init; } = 3;
13+
14+
/// <summary>
15+
/// Initial delay in milliseconds before the first retry attempt.
16+
/// Default: 500ms (aligns with existing batch throttling)
17+
/// </summary>
18+
public int InitialDelayMs { get; init; } = 500;
19+
20+
/// <summary>
21+
/// Timeout in seconds for individual MongoDB operations.
22+
/// Default: 30 seconds
23+
/// </summary>
24+
public int TimeoutSeconds { get; init; } = 30;
25+
26+
/// <summary>
27+
/// Enable circuit breaker pattern to prevent cascading failures.
28+
/// Default: true (recommended for production)
29+
/// </summary>
30+
public bool EnableCircuitBreaker { get; init; } = true;
31+
32+
/// <summary>
33+
/// Circuit breaker failure ratio threshold (0.0 to 1.0).
34+
/// Circuit opens when this ratio of requests fail.
35+
/// Default: 0.5 (50% failure rate)
36+
/// </summary>
37+
public double CircuitBreakerFailureRatio { get; init; } = 0.5;
38+
39+
/// <summary>
40+
/// Minimum number of requests before circuit breaker evaluates failure ratio.
41+
/// Default: 10 requests
42+
/// </summary>
43+
public int CircuitBreakerMinimumThroughput { get; init; } = 10;
44+
45+
/// <summary>
46+
/// Duration in seconds the circuit stays open before attempting to close.
47+
/// Default: 30 seconds
48+
/// </summary>
49+
public int CircuitBreakerBreakDurationSeconds { get; init; } = 30;
50+
51+
/// <summary>
52+
/// Use jitter in retry delays to prevent thundering herd.
53+
/// Default: true (recommended)
54+
/// </summary>
55+
public bool UseJitter { get; init; } = true;
56+
}
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
using Microsoft.Extensions.Logging;
2+
using MongoDB.Driver;
3+
using Polly;
4+
using Polly.CircuitBreaker;
5+
using Polly.Retry;
6+
using Polly.Timeout;
7+
8+
namespace KeeperData.Core.Database.Resilience;
9+
10+
/// <summary>
11+
/// Factory for creating Polly resilience pipelines for MongoDB operations.
12+
/// Handles retry logic, circuit breaker, and timeout policies for transient failures.
13+
/// </summary>
14+
public static class MongoResiliencePipelineFactory
15+
{
16+
/// <summary>
17+
/// Creates a resilience pipeline for MongoDB operations with retry, circuit breaker, and timeout.
18+
/// </summary>
19+
/// <typeparam name="T">The result type of the MongoDB operation</typeparam>
20+
/// <param name="config">Resilience configuration settings</param>
21+
/// <param name="logger">Logger for diagnostics</param>
22+
/// <param name="operationName">Name of the operation for logging context</param>
23+
/// <returns>Configured resilience pipeline</returns>
24+
public static ResiliencePipeline<T> Create<T>(
25+
Configuration.MongoResilienceConfig config,
26+
ILogger logger,
27+
string operationName)
28+
{
29+
var pipelineBuilder = new ResiliencePipelineBuilder<T>();
30+
31+
// Add retry strategy for transient failures
32+
pipelineBuilder.AddRetry(new RetryStrategyOptions<T>
33+
{
34+
MaxRetryAttempts = config.MaxRetryAttempts,
35+
Delay = TimeSpan.FromMilliseconds(config.InitialDelayMs),
36+
BackoffType = DelayBackoffType.Exponential,
37+
UseJitter = config.UseJitter,
38+
ShouldHandle = new PredicateBuilder<T>()
39+
.Handle<MongoConnectionException>()
40+
.Handle<MongoExecutionTimeoutException>()
41+
.Handle<TimeoutException>()
42+
.Handle<MongoException>(ex =>
43+
// Retry on network-related errors
44+
ex.Message.Contains("network", StringComparison.OrdinalIgnoreCase) ||
45+
ex.Message.Contains("connection pool", StringComparison.OrdinalIgnoreCase) ||
46+
ex.InnerException is System.Net.Sockets.SocketException),
47+
OnRetry = args =>
48+
{
49+
// Log at Debug level to avoid log spam, Warning only on final failure
50+
var logLevel = args.AttemptNumber < config.MaxRetryAttempts ? LogLevel.Debug : LogLevel.Warning;
51+
52+
if (logLevel == LogLevel.Debug)
53+
{
54+
logger.LogDebug(
55+
args.Outcome.Exception,
56+
"[MongoDB Resilience] {OperationName} failed (attempt {AttemptNumber}/{MaxRetryAttempts}). Retrying after {RetryDelay}ms",
57+
operationName,
58+
args.AttemptNumber,
59+
config.MaxRetryAttempts,
60+
args.RetryDelay.TotalMilliseconds);
61+
}
62+
else
63+
{
64+
logger.LogWarning(
65+
args.Outcome.Exception,
66+
"[MongoDB Resilience] {OperationName} failed (attempt {AttemptNumber}/{MaxRetryAttempts}). Retrying after {RetryDelay}ms",
67+
operationName,
68+
args.AttemptNumber,
69+
config.MaxRetryAttempts,
70+
args.RetryDelay.TotalMilliseconds);
71+
}
72+
73+
return ValueTask.CompletedTask;
74+
}
75+
});
76+
77+
// Add circuit breaker if enabled
78+
if (config.EnableCircuitBreaker)
79+
{
80+
pipelineBuilder.AddCircuitBreaker(new CircuitBreakerStrategyOptions<T>
81+
{
82+
FailureRatio = config.CircuitBreakerFailureRatio,
83+
MinimumThroughput = config.CircuitBreakerMinimumThroughput,
84+
SamplingDuration = TimeSpan.FromSeconds(config.CircuitBreakerBreakDurationSeconds),
85+
BreakDuration = TimeSpan.FromSeconds(config.CircuitBreakerBreakDurationSeconds),
86+
ShouldHandle = new PredicateBuilder<T>()
87+
.Handle<MongoConnectionException>()
88+
.Handle<MongoExecutionTimeoutException>()
89+
.Handle<TimeoutException>(),
90+
OnOpened = args =>
91+
{
92+
logger.LogError(
93+
"[MongoDB Resilience] Circuit breaker OPENED for {OperationName}. " +
94+
"Too many failures detected. Breaking for {BreakDuration}s",
95+
operationName,
96+
config.CircuitBreakerBreakDurationSeconds);
97+
return ValueTask.CompletedTask;
98+
},
99+
OnClosed = args =>
100+
{
101+
logger.LogInformation(
102+
"[MongoDB Resilience] Circuit breaker CLOSED for {OperationName}. Resuming normal operations",
103+
operationName);
104+
return ValueTask.CompletedTask;
105+
},
106+
OnHalfOpened = args =>
107+
{
108+
logger.LogInformation(
109+
"[MongoDB Resilience] Circuit breaker HALF-OPEN for {OperationName}. Testing if service recovered",
110+
operationName);
111+
return ValueTask.CompletedTask;
112+
}
113+
});
114+
}
115+
116+
// Add timeout as the outer layer
117+
pipelineBuilder.AddTimeout(TimeSpan.FromSeconds(config.TimeoutSeconds));
118+
119+
return pipelineBuilder.Build();
120+
}
121+
122+
/// <summary>
123+
/// Creates a resilience pipeline for void/async Task MongoDB operations.
124+
/// </summary>
125+
public static ResiliencePipeline CreateForVoid(
126+
Configuration.MongoResilienceConfig config,
127+
ILogger logger,
128+
string operationName)
129+
{
130+
var pipelineBuilder = new ResiliencePipelineBuilder();
131+
132+
// Add retry strategy
133+
pipelineBuilder.AddRetry(new RetryStrategyOptions
134+
{
135+
MaxRetryAttempts = config.MaxRetryAttempts,
136+
Delay = TimeSpan.FromMilliseconds(config.InitialDelayMs),
137+
BackoffType = DelayBackoffType.Exponential,
138+
UseJitter = config.UseJitter,
139+
ShouldHandle = new PredicateBuilder()
140+
.Handle<MongoConnectionException>()
141+
.Handle<MongoExecutionTimeoutException>()
142+
.Handle<TimeoutException>()
143+
.Handle<MongoException>(ex =>
144+
ex.Message.Contains("network", StringComparison.OrdinalIgnoreCase) ||
145+
ex.Message.Contains("connection pool", StringComparison.OrdinalIgnoreCase) ||
146+
ex.InnerException is System.Net.Sockets.SocketException),
147+
OnRetry = args =>
148+
{
149+
var logLevel = args.AttemptNumber < config.MaxRetryAttempts ? LogLevel.Debug : LogLevel.Warning;
150+
151+
if (logLevel == LogLevel.Debug)
152+
{
153+
logger.LogDebug(
154+
args.Outcome.Exception,
155+
"[MongoDB Resilience] {OperationName} failed (attempt {AttemptNumber}/{MaxRetryAttempts}). Retrying after {RetryDelay}ms",
156+
operationName,
157+
args.AttemptNumber,
158+
config.MaxRetryAttempts,
159+
args.RetryDelay.TotalMilliseconds);
160+
}
161+
else
162+
{
163+
logger.LogWarning(
164+
args.Outcome.Exception,
165+
"[MongoDB Resilience] {OperationName} failed (attempt {AttemptNumber}/{MaxRetryAttempts}). Retrying after {RetryDelay}ms",
166+
operationName,
167+
args.AttemptNumber,
168+
config.MaxRetryAttempts,
169+
args.RetryDelay.TotalMilliseconds);
170+
}
171+
172+
return ValueTask.CompletedTask;
173+
}
174+
});
175+
176+
// Add circuit breaker if enabled
177+
if (config.EnableCircuitBreaker)
178+
{
179+
pipelineBuilder.AddCircuitBreaker(new CircuitBreakerStrategyOptions
180+
{
181+
FailureRatio = config.CircuitBreakerFailureRatio,
182+
MinimumThroughput = config.CircuitBreakerMinimumThroughput,
183+
SamplingDuration = TimeSpan.FromSeconds(config.CircuitBreakerBreakDurationSeconds),
184+
BreakDuration = TimeSpan.FromSeconds(config.CircuitBreakerBreakDurationSeconds),
185+
ShouldHandle = new PredicateBuilder()
186+
.Handle<MongoConnectionException>()
187+
.Handle<MongoExecutionTimeoutException>()
188+
.Handle<TimeoutException>(),
189+
OnOpened = args =>
190+
{
191+
logger.LogError(
192+
"[MongoDB Resilience] Circuit breaker OPENED for {OperationName}. Breaking for {BreakDuration}s",
193+
operationName,
194+
config.CircuitBreakerBreakDurationSeconds);
195+
return ValueTask.CompletedTask;
196+
},
197+
OnClosed = args =>
198+
{
199+
logger.LogInformation(
200+
"[MongoDB Resilience] Circuit breaker CLOSED for {OperationName}",
201+
operationName);
202+
return ValueTask.CompletedTask;
203+
},
204+
OnHalfOpened = args =>
205+
{
206+
logger.LogInformation(
207+
"[MongoDB Resilience] Circuit breaker HALF-OPEN for {OperationName}",
208+
operationName);
209+
return ValueTask.CompletedTask;
210+
}
211+
});
212+
}
213+
214+
pipelineBuilder.AddTimeout(TimeSpan.FromSeconds(config.TimeoutSeconds));
215+
216+
return pipelineBuilder.Build();
217+
}
218+
}

0 commit comments

Comments
 (0)