Skip to content

Commit dbdf949

Browse files
authored
feat: add production resilience features (#9)
* feat: add production resilience features - Add circuit breaker pattern for fail-fast during outages - Configurable failure/success thresholds and open duration - Three states: Closed, Open, HalfOpen - Integrated with IggyClientWrapper reconnection logic - Add Prometheus metrics export (metrics-rs) - Counters: messages sent/polled, reconnects, circuit breaker events - Histograms: request/send/poll duration - Gauges: connection status, circuit breaker state - Configurable via METRICS_PORT (default: 9090, 0 = disabled) - Add request timeout propagation middleware - Clients specify timeout via X-Request-Timeout header (milliseconds) - Bounded: 100ms min, 5min max - RequestTimeoutExt trait for handler extraction - Fix empty X-Forwarded-For header handling - Empty/whitespace-only headers now return "unknown" - Prevents separate rate-limit buckets for empty headers - Add consumer ID upper bound validation - MAX_CONSUMER_ID = 1 billion - Catches likely misconfigurations * fix: add missing Config fields to integration tests
1 parent c39e19d commit dbdf949

File tree

15 files changed

+1301
-41
lines changed

15 files changed

+1301
-41
lines changed

CLAUDE.md

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@ This application showcases how to build a production-ready message streaming ser
1717
- Comprehensive error handling with `Result` types (no `unwrap()`/`expect()` in production code)
1818
- **Zero clippy warnings** - strict lints enforced, no `#[allow(...)]` in production code
1919
- **Connection resilience** with automatic reconnection and exponential backoff
20+
- **Circuit breaker** pattern for fail-fast during outages
2021
- **Rate limiting** with token bucket algorithm (configurable RPS and burst)
2122
- **API key authentication** with constant-time comparison (timing attack resistant)
2223
- **Request ID propagation** for distributed tracing
24+
- **Request timeout propagation** via `X-Request-Timeout` header
2325
- **Configurable CORS** with origin whitelist support
2426
- **Background stats caching** to avoid expensive queries on each request
2527
- **Structured concurrency** with proper task lifecycle management
2628
- **Background health checks** for early connection issue detection
29+
- **Prometheus metrics** export for observability
2730

2831
## Architecture
2932

@@ -35,6 +38,7 @@ This application showcases how to build a production-ready message streaming ser
3538
│ Middleware Stack (src/middleware/) │
3639
│ - rate_limit.rs: Token bucket rate limiting │
3740
│ - auth.rs: API key authentication │
41+
│ - timeout.rs: Request timeout propagation │
3842
│ - request_id.rs: Request ID propagation │
3943
│ + tower_http: Tracing, CORS │
4044
├─────────────────────────────────────────────────────────────┤
@@ -50,6 +54,7 @@ This application showcases how to build a production-ready message streaming ser
5054
├─────────────────────────────────────────────────────────────┤
5155
│ IggyClientWrapper (src/iggy_client.rs) │
5256
│ High-level wrapper with automatic reconnection │
57+
│ + Circuit breaker for fail-fast during outages │
5358
│ + PollParams builder for cleaner polling API │
5459
├─────────────────────────────────────────────────────────────┤
5560
│ Background Tasks (managed by TaskTracker) │
@@ -91,15 +96,23 @@ src/
9196
├── lib.rs # Library exports
9297
├── config.rs # Configuration from environment
9398
├── error.rs # Error types with HTTP status codes
99+
├── metrics.rs # Prometheus metrics export
94100
├── state.rs # Shared application state with stats caching
95101
├── routes.rs # Route definitions and middleware stack
96-
├── iggy_client.rs # Iggy SDK wrapper with auto-reconnection
102+
├── iggy_client/ # Iggy SDK wrapper module
103+
│ ├── mod.rs # Client wrapper with auto-reconnection
104+
│ ├── circuit_breaker.rs # Circuit breaker pattern implementation
105+
│ ├── connection.rs # Connection state management
106+
│ ├── helpers.rs # Utility functions
107+
│ ├── params.rs # PollParams builder
108+
│ └── scopeguard.rs # Scope guard utilities
97109
├── validation.rs # Input validation utilities
98110
├── middleware/
99111
│ ├── mod.rs # Middleware exports
100112
│ ├── ip.rs # Client IP extraction (shared by rate_limit and auth)
101113
│ ├── rate_limit.rs # Token bucket rate limiting (Governor)
102114
│ ├── auth.rs # API key authentication
115+
│ ├── timeout.rs # Request timeout propagation
103116
│ └── request_id.rs # Request ID propagation
104117
├── models/
105118
│ ├── mod.rs # Model exports
@@ -197,6 +210,13 @@ Environment variables (see `.env.example`):
197210
| `HEALTH_CHECK_INTERVAL_SECS` | `30` | Connection health check interval |
198211
| `OPERATION_TIMEOUT_SECS` | `30` | Timeout for Iggy operations |
199212

213+
### Circuit Breaker
214+
| Variable | Default | Description |
215+
|----------|---------|-------------|
216+
| `CIRCUIT_BREAKER_FAILURE_THRESHOLD` | `5` | Failures before opening circuit |
217+
| `CIRCUIT_BREAKER_SUCCESS_THRESHOLD` | `2` | Successes in half-open to close |
218+
| `CIRCUIT_BREAKER_OPEN_DURATION_SECS` | `30` | How long circuit stays open |
219+
200220
### Rate Limiting
201221
| Variable | Default | Description |
202222
|----------|---------|-------------|
@@ -246,6 +266,7 @@ When empty (default), all X-Forwarded-For headers are trusted. **This is not rec
246266
| Variable | Default | Description |
247267
|----------|---------|-------------|
248268
| `STATS_CACHE_TTL_SECS` | `5` | Stats cache refresh interval |
269+
| `METRICS_PORT` | `9090` | Prometheus metrics port (0 = disabled) |
249270

250271
#### Log Levels
251272

@@ -394,7 +415,7 @@ environment:
394415
### Running Tests
395416

396417
```bash
397-
# Unit tests (93 tests)
418+
# Unit tests (130 tests)
398419
cargo test --lib
399420

400421
# Integration tests (24 tests, requires Docker for testcontainers)
@@ -481,6 +502,7 @@ Error types and HTTP status codes:
481502
- `connection_failed` (503): Initial connection to Iggy server failed
482503
- `disconnected` (503): Lost connection during operation
483504
- `connection_reset` (503): Connection was reset by peer
505+
- `circuit_open` (503): Circuit breaker is open, failing fast
484506
- `stream_error` (500): Stream operation failed
485507
- `topic_error` (500): Topic operation failed
486508
- `send_error` (500): Message send failed
@@ -533,7 +555,7 @@ Iggy uses **0-indexed partitions**:
533555
## Dependencies
534556

535557
Key dependencies (see `Cargo.toml`):
536-
- `iggy 0.8.0-edge.6`: Iggy Rust SDK (edge version for latest server features)
558+
- `iggy 0.8.0`: Iggy Rust SDK
537559
- `axum 0.8`: Web framework
538560
- `tokio 1.48`: Async runtime
539561
- `tokio-util 0.7`: Task tracking and cancellation tokens
@@ -543,8 +565,10 @@ Key dependencies (see `Cargo.toml`):
543565
- `governor 0.8`: Rate limiting with token bucket algorithm
544566
- `subtle 2.6`: Constant-time comparison for security
545567
- `tower-http 0.6`: HTTP middleware (CORS, tracing, request ID)
546-
- `rust_decimal 1.37`: Exact decimal arithmetic for monetary values
547-
- `testcontainers 0.24`: Integration testing with containerized Iggy
568+
- `rust_decimal 1.39`: Exact decimal arithmetic for monetary values
569+
- `metrics 0.24`: Application metrics
570+
- `metrics-exporter-prometheus 0.16`: Prometheus metrics export
571+
- `testcontainers 0.26`: Integration testing with containerized Iggy
548572

549573
## Structured Concurrency
550574

@@ -625,7 +649,7 @@ let messages = client.poll_with_params("stream", "topic", params).await?;
625649

626650
Request flow (applied in order):
627651
```
628-
Request → Rate Limit → Auth → Request ID → Tracing → CORS → Handler
652+
Request → Rate Limit → Auth → Timeout → Request ID → Tracing → CORS → Handler
629653
```
630654

631655
### Client IP Extraction (`src/middleware/ip.rs`)
@@ -648,6 +672,12 @@ Request → Rate Limit → Auth → Request ID → Tracing → CORS → Handler
648672
- Accepts key via `X-API-Key` header or `api_key` query parameter
649673
- Bypasses `/health` and `/ready` for health checks (exact path matching)
650674

675+
### Request Timeout (`src/middleware/timeout.rs`)
676+
- Clients can specify `X-Request-Timeout: <milliseconds>` header
677+
- Bounded: 100ms minimum, 5 minutes maximum
678+
- Stored in request extensions for handler use
679+
- `RequestTimeoutExt` trait for easy extraction in handlers
680+
651681
## Deployment Security
652682

653683
### Reverse Proxy Configuration (Required)

Cargo.lock

Lines changed: 79 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ subtle = "2.6"
6060
# Exit codes (BSD sysexits compatible)
6161
exitcode = "1.1"
6262

63+
# Metrics for Prometheus
64+
metrics = "0.24"
65+
metrics-exporter-prometheus = { version = "0.16", default-features = false, features = ["http-listener"] }
66+
6367
[dev-dependencies]
6468
reqwest = { version = "0.12", features = ["json"] }
6569
testcontainers = "0.26"

src/config.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,18 @@ pub struct Config {
7777
/// Prevents operations from hanging indefinitely on network issues
7878
pub operation_timeout: Duration,
7979

80+
// =========================================================================
81+
// Circuit Breaker Configuration
82+
// =========================================================================
83+
/// Number of consecutive failures before opening the circuit (default: 5)
84+
pub circuit_breaker_failure_threshold: u32,
85+
86+
/// Number of consecutive successes in half-open state to close circuit (default: 2)
87+
pub circuit_breaker_success_threshold: u32,
88+
89+
/// How long the circuit stays open before transitioning to half-open (default: 30s)
90+
pub circuit_breaker_open_duration: Duration,
91+
8092
// =========================================================================
8193
// Rate Limiting Configuration
8294
// =========================================================================
@@ -139,6 +151,9 @@ pub struct Config {
139151

140152
/// Interval for background stats cache refresh (default: 5 seconds)
141153
pub stats_cache_ttl: Duration,
154+
155+
/// Port for Prometheus metrics endpoint (default: 9090, 0 = disabled)
156+
pub metrics_port: u16,
142157
}
143158

144159
impl Config {
@@ -180,6 +195,20 @@ impl Config {
180195
)?),
181196
operation_timeout: Duration::from_secs(Self::parse_env("OPERATION_TIMEOUT_SECS", 30)?),
182197

198+
// Circuit breaker
199+
circuit_breaker_failure_threshold: Self::parse_env(
200+
"CIRCUIT_BREAKER_FAILURE_THRESHOLD",
201+
5,
202+
)?,
203+
circuit_breaker_success_threshold: Self::parse_env(
204+
"CIRCUIT_BREAKER_SUCCESS_THRESHOLD",
205+
2,
206+
)?,
207+
circuit_breaker_open_duration: Duration::from_secs(Self::parse_env(
208+
"CIRCUIT_BREAKER_OPEN_DURATION_SECS",
209+
30,
210+
)?),
211+
183212
// Rate limiting
184213
rate_limit_rps: Self::parse_env("RATE_LIMIT_RPS", 100)?,
185214
rate_limit_burst: Self::parse_env("RATE_LIMIT_BURST", 50)?,
@@ -198,6 +227,7 @@ impl Config {
198227
// Observability
199228
log_level: env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
200229
stats_cache_ttl: Duration::from_secs(Self::parse_env("STATS_CACHE_TTL_SECS", 5)?),
230+
metrics_port: Self::parse_env("METRICS_PORT", 9090)?,
201231
};
202232

203233
// Validate configuration before returning
@@ -266,6 +296,25 @@ impl Config {
266296
!self.trusted_proxies.is_empty()
267297
}
268298

299+
/// Check if Prometheus metrics export is enabled.
300+
pub fn metrics_enabled(&self) -> bool {
301+
self.metrics_port > 0
302+
}
303+
304+
/// Get the metrics endpoint address.
305+
///
306+
/// Returns `None` if metrics are disabled (port = 0).
307+
pub fn metrics_addr(&self) -> Option<std::net::SocketAddr> {
308+
if self.metrics_enabled() {
309+
Some(std::net::SocketAddr::from((
310+
[0, 0, 0, 0],
311+
self.metrics_port,
312+
)))
313+
} else {
314+
None
315+
}
316+
}
317+
269318
/// Parse an environment variable into the specified type with a default value.
270319
fn parse_env<T>(name: &str, default: T) -> AppResult<T>
271320
where
@@ -344,6 +393,10 @@ impl Default for Config {
344393
reconnect_max_delay: Duration::from_secs(30),
345394
health_check_interval: Duration::from_secs(30),
346395
operation_timeout: Duration::from_secs(30),
396+
// Circuit breaker
397+
circuit_breaker_failure_threshold: 5,
398+
circuit_breaker_success_threshold: 2,
399+
circuit_breaker_open_duration: Duration::from_secs(30),
347400
// Rate limiting
348401
rate_limit_rps: 100,
349402
rate_limit_burst: 50,
@@ -359,6 +412,7 @@ impl Default for Config {
359412
// Observability
360413
log_level: "info".to_string(),
361414
stats_cache_ttl: Duration::from_secs(5),
415+
metrics_port: 9090,
362416
}
363417
}
364418
}

0 commit comments

Comments
 (0)