Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 36 additions & 6 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ This application showcases how to build a production-ready message streaming ser
- Comprehensive error handling with `Result` types (no `unwrap()`/`expect()` in production code)
- **Zero clippy warnings** - strict lints enforced, no `#[allow(...)]` in production code
- **Connection resilience** with automatic reconnection and exponential backoff
- **Circuit breaker** pattern for fail-fast during outages
- **Rate limiting** with token bucket algorithm (configurable RPS and burst)
- **API key authentication** with constant-time comparison (timing attack resistant)
- **Request ID propagation** for distributed tracing
- **Request timeout propagation** via `X-Request-Timeout` header
- **Configurable CORS** with origin whitelist support
- **Background stats caching** to avoid expensive queries on each request
- **Structured concurrency** with proper task lifecycle management
- **Background health checks** for early connection issue detection
- **Prometheus metrics** export for observability

## Architecture

Expand All @@ -35,6 +38,7 @@ This application showcases how to build a production-ready message streaming ser
│ Middleware Stack (src/middleware/) │
│ - rate_limit.rs: Token bucket rate limiting │
│ - auth.rs: API key authentication │
│ - timeout.rs: Request timeout propagation │
│ - request_id.rs: Request ID propagation │
│ + tower_http: Tracing, CORS │
├─────────────────────────────────────────────────────────────┤
Expand All @@ -50,6 +54,7 @@ This application showcases how to build a production-ready message streaming ser
├─────────────────────────────────────────────────────────────┤
│ IggyClientWrapper (src/iggy_client.rs) │
│ High-level wrapper with automatic reconnection │
│ + Circuit breaker for fail-fast during outages │
│ + PollParams builder for cleaner polling API │
├─────────────────────────────────────────────────────────────┤
│ Background Tasks (managed by TaskTracker) │
Expand Down Expand Up @@ -91,15 +96,23 @@ src/
├── lib.rs # Library exports
├── config.rs # Configuration from environment
├── error.rs # Error types with HTTP status codes
├── metrics.rs # Prometheus metrics export
├── state.rs # Shared application state with stats caching
├── routes.rs # Route definitions and middleware stack
├── iggy_client.rs # Iggy SDK wrapper with auto-reconnection
├── iggy_client/ # Iggy SDK wrapper module
│ ├── mod.rs # Client wrapper with auto-reconnection
│ ├── circuit_breaker.rs # Circuit breaker pattern implementation
│ ├── connection.rs # Connection state management
│ ├── helpers.rs # Utility functions
│ ├── params.rs # PollParams builder
│ └── scopeguard.rs # Scope guard utilities
├── validation.rs # Input validation utilities
├── middleware/
│ ├── mod.rs # Middleware exports
│ ├── ip.rs # Client IP extraction (shared by rate_limit and auth)
│ ├── rate_limit.rs # Token bucket rate limiting (Governor)
│ ├── auth.rs # API key authentication
│ ├── timeout.rs # Request timeout propagation
│ └── request_id.rs # Request ID propagation
├── models/
│ ├── mod.rs # Model exports
Expand Down Expand Up @@ -197,6 +210,13 @@ Environment variables (see `.env.example`):
| `HEALTH_CHECK_INTERVAL_SECS` | `30` | Connection health check interval |
| `OPERATION_TIMEOUT_SECS` | `30` | Timeout for Iggy operations |

### Circuit Breaker
| Variable | Default | Description |
|----------|---------|-------------|
| `CIRCUIT_BREAKER_FAILURE_THRESHOLD` | `5` | Failures before opening circuit |
| `CIRCUIT_BREAKER_SUCCESS_THRESHOLD` | `2` | Successes in half-open to close |
| `CIRCUIT_BREAKER_OPEN_DURATION_SECS` | `30` | How long circuit stays open |

### Rate Limiting
| Variable | Default | Description |
|----------|---------|-------------|
Expand Down Expand Up @@ -246,6 +266,7 @@ When empty (default), all X-Forwarded-For headers are trusted. **This is not rec
| Variable | Default | Description |
|----------|---------|-------------|
| `STATS_CACHE_TTL_SECS` | `5` | Stats cache refresh interval |
| `METRICS_PORT` | `9090` | Prometheus metrics port (0 = disabled) |

#### Log Levels

Expand Down Expand Up @@ -394,7 +415,7 @@ environment:
### Running Tests

```bash
# Unit tests (93 tests)
# Unit tests (130 tests)
cargo test --lib

# Integration tests (24 tests, requires Docker for testcontainers)
Expand Down Expand Up @@ -481,6 +502,7 @@ Error types and HTTP status codes:
- `connection_failed` (503): Initial connection to Iggy server failed
- `disconnected` (503): Lost connection during operation
- `connection_reset` (503): Connection was reset by peer
- `circuit_open` (503): Circuit breaker is open, failing fast
- `stream_error` (500): Stream operation failed
- `topic_error` (500): Topic operation failed
- `send_error` (500): Message send failed
Expand Down Expand Up @@ -533,7 +555,7 @@ Iggy uses **0-indexed partitions**:
## Dependencies

Key dependencies (see `Cargo.toml`):
- `iggy 0.8.0-edge.6`: Iggy Rust SDK (edge version for latest server features)
- `iggy 0.8.0`: Iggy Rust SDK
- `axum 0.8`: Web framework
- `tokio 1.48`: Async runtime
- `tokio-util 0.7`: Task tracking and cancellation tokens
Expand All @@ -543,8 +565,10 @@ Key dependencies (see `Cargo.toml`):
- `governor 0.8`: Rate limiting with token bucket algorithm
- `subtle 2.6`: Constant-time comparison for security
- `tower-http 0.6`: HTTP middleware (CORS, tracing, request ID)
- `rust_decimal 1.37`: Exact decimal arithmetic for monetary values
- `testcontainers 0.24`: Integration testing with containerized Iggy
- `rust_decimal 1.39`: Exact decimal arithmetic for monetary values
- `metrics 0.24`: Application metrics
- `metrics-exporter-prometheus 0.16`: Prometheus metrics export
- `testcontainers 0.26`: Integration testing with containerized Iggy

## Structured Concurrency

Expand Down Expand Up @@ -625,7 +649,7 @@ let messages = client.poll_with_params("stream", "topic", params).await?;

Request flow (applied in order):
```
Request → Rate Limit → Auth → Request ID → Tracing → CORS → Handler
Request → Rate Limit → Auth → Timeout → Request ID → Tracing → CORS → Handler
```

### Client IP Extraction (`src/middleware/ip.rs`)
Expand All @@ -648,6 +672,12 @@ Request → Rate Limit → Auth → Request ID → Tracing → CORS → Handler
- Accepts key via `X-API-Key` header or `api_key` query parameter
- Bypasses `/health` and `/ready` for health checks (exact path matching)

### Request Timeout (`src/middleware/timeout.rs`)
- Clients can specify `X-Request-Timeout: <milliseconds>` header
- Bounded: 100ms minimum, 5 minutes maximum
- Stored in request extensions for handler use
- `RequestTimeoutExt` trait for easy extraction in handlers

## Deployment Security

### Reverse Proxy Configuration (Required)
Expand Down
80 changes: 79 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ subtle = "2.6"
# Exit codes (BSD sysexits compatible)
exitcode = "1.1"

# Metrics for Prometheus
metrics = "0.24"
metrics-exporter-prometheus = { version = "0.16", default-features = false, features = ["http-listener"] }

[dev-dependencies]
reqwest = { version = "0.12", features = ["json"] }
testcontainers = "0.26"
Expand Down
54 changes: 54 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ pub struct Config {
/// Prevents operations from hanging indefinitely on network issues
pub operation_timeout: Duration,

// =========================================================================
// Circuit Breaker Configuration
// =========================================================================
/// Number of consecutive failures before opening the circuit (default: 5)
pub circuit_breaker_failure_threshold: u32,

/// Number of consecutive successes in half-open state to close circuit (default: 2)
pub circuit_breaker_success_threshold: u32,

/// How long the circuit stays open before transitioning to half-open (default: 30s)
pub circuit_breaker_open_duration: Duration,

// =========================================================================
// Rate Limiting Configuration
// =========================================================================
Expand Down Expand Up @@ -139,6 +151,9 @@ pub struct Config {

/// Interval for background stats cache refresh (default: 5 seconds)
pub stats_cache_ttl: Duration,

/// Port for Prometheus metrics endpoint (default: 9090, 0 = disabled)
pub metrics_port: u16,
}

impl Config {
Expand Down Expand Up @@ -180,6 +195,20 @@ impl Config {
)?),
operation_timeout: Duration::from_secs(Self::parse_env("OPERATION_TIMEOUT_SECS", 30)?),

// Circuit breaker
circuit_breaker_failure_threshold: Self::parse_env(
"CIRCUIT_BREAKER_FAILURE_THRESHOLD",
5,
)?,
circuit_breaker_success_threshold: Self::parse_env(
"CIRCUIT_BREAKER_SUCCESS_THRESHOLD",
2,
)?,
circuit_breaker_open_duration: Duration::from_secs(Self::parse_env(
"CIRCUIT_BREAKER_OPEN_DURATION_SECS",
30,
)?),

// Rate limiting
rate_limit_rps: Self::parse_env("RATE_LIMIT_RPS", 100)?,
rate_limit_burst: Self::parse_env("RATE_LIMIT_BURST", 50)?,
Expand All @@ -198,6 +227,7 @@ impl Config {
// Observability
log_level: env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
stats_cache_ttl: Duration::from_secs(Self::parse_env("STATS_CACHE_TTL_SECS", 5)?),
metrics_port: Self::parse_env("METRICS_PORT", 9090)?,
};

// Validate configuration before returning
Expand Down Expand Up @@ -266,6 +296,25 @@ impl Config {
!self.trusted_proxies.is_empty()
}

/// Check if Prometheus metrics export is enabled.
pub fn metrics_enabled(&self) -> bool {
self.metrics_port > 0
}

/// Get the metrics endpoint address.
///
/// Returns `None` if metrics are disabled (port = 0).
pub fn metrics_addr(&self) -> Option<std::net::SocketAddr> {
if self.metrics_enabled() {
Some(std::net::SocketAddr::from((
[0, 0, 0, 0],
self.metrics_port,
)))
} else {
None
}
}

/// Parse an environment variable into the specified type with a default value.
fn parse_env<T>(name: &str, default: T) -> AppResult<T>
where
Expand Down Expand Up @@ -344,6 +393,10 @@ impl Default for Config {
reconnect_max_delay: Duration::from_secs(30),
health_check_interval: Duration::from_secs(30),
operation_timeout: Duration::from_secs(30),
// Circuit breaker
circuit_breaker_failure_threshold: 5,
circuit_breaker_success_threshold: 2,
circuit_breaker_open_duration: Duration::from_secs(30),
// Rate limiting
rate_limit_rps: 100,
rate_limit_burst: 50,
Expand All @@ -359,6 +412,7 @@ impl Default for Config {
// Observability
log_level: "info".to_string(),
stats_cache_ttl: Duration::from_secs(5),
metrics_port: 9090,
}
}
}
Expand Down
Loading
Loading