diff --git a/.gitignore b/.gitignore index 79b421b..aece483 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,6 @@ fuzz/target/ fuzz/corpus/ fuzz/artifacts/ fuzz/Cargo.lock + +# local settings +.claude/settings.local.json diff --git a/CHANGELOG.md b/CHANGELOG.md index 8491f92..bdb4084 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- **Observability Stack**: Complete Grafana-based monitoring setup + - Prometheus metrics collection (port 9090) with 15-day retention + - Grafana dashboards (port 3001) with pre-configured Prometheus datasource + - Iggy Web UI integration (port 3050) for stream/topic/message management + - Pre-built Iggy Overview dashboard (server status, request rates, throughput, latency) +- **Documentation Guides**: + - Event-driven architecture guide (`docs/guide.md`): streams/topics/partitions, consumer groups, error handling patterns, production patterns (outbox, saga, idempotency) + - Partitioning guide (`docs/partitioning-guide.md`): partition keys, ordering guarantees, selection strategies + - Durable storage guide (`docs/durable-storage-guide.md`): storage architecture, fsync configuration, S3 backup/archiving, recovery procedures + - Documentation index (`docs/README.md`) with topic-based navigation + +### Changed + +- Updated `docker-compose.yaml` with full observability stack configuration +- Simplified documentation section in README.md to reference `docs/` directory + ## [0.1.0] - 2024-12-01 ### Added diff --git a/CLAUDE.md b/CLAUDE.md index de3b980..a5021aa 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -75,6 +75,17 @@ This application showcases how to build a production-ready message streaming ser .commitlintrc.json # Conventional commits configuration +observability/ +├── prometheus/ +│ └── prometheus.yml # Prometheus scrape configuration +└── grafana/ + └── provisioning/ + ├── datasources/ + │ └── datasources.yml # Prometheus datasource + └── dashboards/ + ├── dashboards.yml # Dashboard provisioning + └── iggy-overview.json # Pre-built Iggy dashboard + src/ ├── main.rs # Application entry point ├── lib.rs # Library exports @@ -117,8 +128,19 @@ fuzz/ └── fuzz_validation.rs # Validation function fuzz tests deny.toml # License and security policy for cargo-deny + +docs/ # Documentation and guides +├── README.md # Guide index and navigation +├── guide.md # Event-driven architecture guide +├── partitioning-guide.md # Partitioning strategies +├── durable-storage-guide.md # Storage and durability configuration +└── structured-concurrency.md # Task lifecycle management ``` +## Documentation + +See the [docs/](docs/) directory for comprehensive guides covering event-driven architecture, partitioning strategies, durable storage configuration, and more. + ## API Endpoints ### Health & Status @@ -247,6 +269,81 @@ RUST_LOG=debug RUST_LOG=trace ``` +## Observability Stack + +The project includes a complete Grafana-based observability stack for monitoring Iggy and the sample application. + +### Components + +| Service | Port | Description | +|---------|------|-------------| +| **Iggy** | 3000 | Message streaming server (also serves `/metrics`) | +| **Iggy Web UI** | 3050 | Dashboard for streams, topics, messages, and users | +| **Prometheus** | 9090 | Metrics collection and storage | +| **Grafana** | 3001 | Visualization and dashboards | + +### Quick Start with Observability + +```bash +# Start the full stack (Iggy + Prometheus + Grafana) +docker-compose up -d + +# Access the services: +# - Iggy HTTP API: http://localhost:3000 +# - Iggy Web UI: http://localhost:3050 (iggy/iggy) +# - Prometheus: http://localhost:9090 +# - Grafana: http://localhost:3001 (admin/admin) +``` + +### Iggy Web UI + +The Iggy Web UI provides a comprehensive dashboard for managing the Iggy server: + +- **Streams & Topics**: Create, browse, and delete streams and topics +- **Messages**: Browse and inspect messages in topics +- **Users**: Manage users and permissions +- **Server Health**: Monitor server status and connections + +Access at http://localhost:3050 with credentials `iggy/iggy`. + +### Grafana Dashboards + +Pre-configured dashboards are automatically provisioned: + +- **Iggy Overview**: Server status, request rates, message throughput, latency percentiles + +### Prometheus Metrics + +Iggy exposes Prometheus-compatible metrics at `/metrics`: + +```bash +# View raw metrics +curl http://localhost:3000/metrics +``` + +### Customizing Dashboards + +1. Log into Grafana at http://localhost:3001 +2. Navigate to Dashboards → Iggy folder +3. Edit existing dashboards or create new ones +4. Export JSON and save to `observability/grafana/provisioning/dashboards/` + +### Adding OpenTelemetry (Optional) + +Iggy supports OpenTelemetry for distributed tracing. Add to docker-compose: + +```yaml +environment: + - IGGY_TELEMETRY_ENABLED=true + - IGGY_TELEMETRY_SERVICE_NAME=iggy + - IGGY_TELEMETRY_LOGS_TRANSPORT=grpc + - IGGY_TELEMETRY_LOGS_ENDPOINT=http://otel-collector:4317 + - IGGY_TELEMETRY_TRACES_TRANSPORT=grpc + - IGGY_TELEMETRY_TRACES_ENDPOINT=http://otel-collector:4317 +``` + +--- + ## Development ### Prerequisites @@ -255,9 +352,9 @@ RUST_LOG=trace ### Quick Start -1. Start Iggy server: +1. Start Iggy server (with observability): ```bash - docker-compose up -d iggy + docker-compose up -d iggy prometheus grafana ``` 2. Run the application: diff --git a/Cargo.lock b/Cargo.lock index 7128870..e703b6f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -590,9 +590,9 @@ checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" [[package]] name = "cc" -version = "1.2.48" +version = "1.2.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c481bdbf0ed3b892f6f806287d72acd515b352a4ec27a208489b8c1bc839633a" +checksum = "90583009037521a116abf44494efecd645ba48b6622457080f080b85544e2215" dependencies = [ "find-msvc-tools", "jobserver", @@ -730,9 +730,9 @@ dependencies = [ [[package]] name = "convert_case" -version = "0.7.1" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb402b8d4c85569410425650ce3eddc7d698ed96d39a73f941b08fb63082f1e7" +checksum = "633458d4ef8c78b72454de2d54fd6ab2e60f9e02be22f3c6104cdc8a4e0fceb9" dependencies = [ "unicode-segmentation", ] @@ -910,22 +910,23 @@ dependencies = [ [[package]] name = "derive_more" -version = "2.0.1" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "093242cf7570c207c83073cf82f79706fe7b8317e98620a47d5be7c3d8497678" +checksum = "10b768e943bed7bf2cab53df09f4bc34bfd217cdb57d971e769874c9a6710618" dependencies = [ "derive_more-impl", ] [[package]] name = "derive_more-impl" -version = "2.0.1" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3" +checksum = "6d286bfdaf75e988b4a78e013ecd79c581e06399ab53fbacd2d916c2f904f30b" dependencies = [ "convert_case", "proc-macro2", "quote", + "rustc_version", "syn 2.0.111", "unicode-xid", ] @@ -1573,9 +1574,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.18" +version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52e9a2a24dc5c6821e71a7030e1e14b7b632acac55c40e9d2e082c621261bb56" +checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f" dependencies = [ "base64 0.22.1", "bytes", @@ -2009,9 +2010,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.177" +version = "0.2.178" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976" +checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091" [[package]] name = "libdbus-sys" @@ -2069,9 +2070,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.28" +version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" +checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" [[package]] name = "lru-slab" @@ -2108,9 +2109,9 @@ checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" [[package]] name = "mio" -version = "1.1.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69d83b0086dc8ecf3ce9ae2874b2d1290252e2a30720bea58a5c6639b0092873" +checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc" dependencies = [ "libc", "wasi", @@ -2549,7 +2550,7 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "219cb19e96be00ab2e37d6e299658a0cfa83e52429179969b0f0121b4ac46983" dependencies = [ - "toml_edit 0.23.7", + "toml_edit 0.23.9", ] [[package]] @@ -3910,9 +3911,9 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.23.7" +version = "0.23.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6485ef6d0d9b5d0ec17244ff7eb05310113c3f316f2d14200d4de56b3cb98f8d" +checksum = "5d7cbc3b4b49633d57a0509303158ca50de80ae32c265093b24c414705807832" dependencies = [ "indexmap 2.12.1", "toml_datetime 0.7.3", @@ -4273,13 +4274,13 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.18.1" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2" +checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a" dependencies = [ "getrandom 0.3.4", "js-sys", - "serde", + "serde_core", "wasm-bindgen", ] diff --git a/README.md b/README.md index d78c3f4..fba2ccd 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ Apache Iggy is capable of processing millions of messages per second with ultra- ``` ┌─────────────────────────────────────────────────────────────┐ │ Axum HTTP Server │ -│ (Port 3000) │ +│ (Port 8000) │ ├─────────────────────────────────────────────────────────────┤ │ Middleware Stack │ │ Rate Limit → Auth → Request ID → Tracing → CORS │ @@ -68,8 +68,11 @@ Apache Iggy is capable of processing millions of messages per second with ultra- │ IggyClientWrapper (with auto-reconnection) │ │ High-level wrapper around Iggy SDK │ ├─────────────────────────────────────────────────────────────┤ +│ Observability Stack │ +│ Prometheus (9090) → Grafana (3001) + Iggy Web UI (3050) │ +├─────────────────────────────────────────────────────────────┤ │ Apache Iggy Server │ -│ TCP (8090) / QUIC (8080) / HTTP (3000) │ +│ TCP (8090) / QUIC (8080) / HTTP (3000) / Metrics (/metrics)│ └─────────────────────────────────────────────────────────────┘ ``` @@ -88,24 +91,31 @@ cd iggy_sample cp .env.example .env ``` -### 2. Start Iggy Server +### 2. Start the Full Stack ```bash -docker-compose up -d iggy +# Start Iggy server with observability stack +docker-compose up -d ``` +This starts: +- **Iggy Server** - Message streaming (ports 8090, 8080, 3000) +- **Iggy Web UI** - Dashboard for managing streams/topics (port 3050) +- **Prometheus** - Metrics collection (port 9090) +- **Grafana** - Visualization dashboards (port 3001) + ### 3. Run the Application ```bash cargo run ``` -The server will start on `http://localhost:3000` (or the port specified in `.env`). +The server will start on `http://localhost:8000` (or the port specified in `.env`). ### 4. Verify It's Working ```bash -curl http://localhost:3000/health +curl http://localhost:8000/health ``` Expected response: @@ -118,6 +128,16 @@ Expected response: } ``` +### 5. Access the Dashboards + +| Service | URL | Credentials | +|---------|-----|-------------| +| Sample App API | http://localhost:8000 | - | +| Iggy HTTP API | http://localhost:3000 | - | +| Iggy Web UI | http://localhost:3050 | iggy / iggy | +| Prometheus | http://localhost:9090 | - | +| Grafana | http://localhost:3001 | admin / admin | + ## API Reference ### Health & Status @@ -455,6 +475,80 @@ docker build -t iggy-sample . docker run -p 8000:8000 -e IGGY_CONNECTION_STRING=iggy://iggy:iggy@host.docker.internal:8090 iggy-sample ``` +## Observability Stack + +The project includes a complete observability stack for monitoring and managing Iggy. + +### Components + +| Component | Port | Description | +|-----------|------|-------------| +| **Iggy Server** | 3000 | HTTP API + Prometheus metrics at `/metrics` | +| **Iggy Web UI** | 3050 | Dashboard for streams, topics, messages, users | +| **Prometheus** | 9090 | Metrics collection with 15-day retention | +| **Grafana** | 3001 | Pre-configured dashboards for visualization | + +### Iggy Web UI + +The Iggy Web UI provides a comprehensive dashboard: + +- **Streams & Topics**: Create, browse, and delete streams and topics +- **Messages**: Browse and inspect messages in real-time +- **Users**: Manage users and permissions +- **Server Health**: Monitor connections and server status + +Access at http://localhost:3050 with credentials `iggy/iggy`. + +### Grafana Dashboards + +Pre-configured dashboards are automatically provisioned: + +- **Iggy Overview**: Server status, request rates, message throughput, latency percentiles + +Access at http://localhost:3001 with credentials `admin/admin`. + +### Prometheus Metrics + +Iggy exposes Prometheus-compatible metrics: + +```bash +# View raw metrics from Iggy +curl http://localhost:3000/metrics + +# Query via Prometheus +curl 'http://localhost:9090/api/v1/query?query=up{job="iggy"}' +``` + +### Configuration Files + +``` +observability/ +├── prometheus/ +│ └── prometheus.yml # Scrape configuration +└── grafana/ + └── provisioning/ + ├── datasources/ + │ └── datasources.yml # Prometheus datasource + └── dashboards/ + ├── dashboards.yml # Dashboard provisioning + └── iggy-overview.json +``` + +### Adding OpenTelemetry (Optional) + +Iggy supports OpenTelemetry for distributed tracing: + +```yaml +# Add to iggy service in docker-compose.yaml +environment: + - IGGY_TELEMETRY_ENABLED=true + - IGGY_TELEMETRY_SERVICE_NAME=iggy + - IGGY_TELEMETRY_LOGS_TRANSPORT=grpc + - IGGY_TELEMETRY_LOGS_ENDPOINT=http://otel-collector:4317 + - IGGY_TELEMETRY_TRACES_TRANSPORT=grpc + - IGGY_TELEMETRY_TRACES_ENDPOINT=http://otel-collector:4317 +``` + ## Event Schema Events follow a structured format with type-safe payloads: @@ -594,8 +688,7 @@ Automatically creates PRs for: ## Documentation -- [architecture.md](architecture.md) - Detailed architecture documentation -- [CLAUDE.md](CLAUDE.md) - Project documentation for AI assistants +See the [docs/](docs/) directory for comprehensive guides covering event-driven architecture, partitioning strategies, durable storage configuration, and more. ## Resources diff --git a/docker-compose.yaml b/docker-compose.yaml index 496e500..34eb747 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,58 +1,138 @@ services: - # Apache Iggy message streaming server (edge with io_uring shared-nothing architecture) - iggy: - image: apache/iggy:latest - privileged: true - container_name: iggy-server - ports: - # TCP transport (default) - - "8090:8090" - # QUIC transport - - "8080:8080" - # HTTP transport - - "3000:3000" - volumes: - - iggy_data:/iggy/data - environment: - # Root user credentials - - IGGY_ROOT_USERNAME=iggy - - IGGY_ROOT_PASSWORD=iggy - # Server configuration - - IGGY_HTTP_ENABLED=true - - IGGY_HTTP_ADDRESS=0.0.0.0:3000 - - IGGY_TCP_ENABLED=true - - IGGY_TCP_ADDRESS=0.0.0.0:8090 - - IGGY_QUIC_ENABLED=true - - IGGY_QUIC_ADDRESS=0.0.0.0:8080 - healthcheck: - test: ["CMD", "iggy", "ping"] - interval: 10s - timeout: 5s - retries: 5 - start_period: 10s - restart: unless-stopped + # Apache Iggy message streaming server + iggy: + image: apache/iggy:latest + privileged: true + container_name: iggy-server + ports: + # TCP transport (default) + - "8090:8090" + # QUIC transport + - "8080:8080" + # HTTP transport (also serves /metrics for Prometheus) + - "3000:3000" + volumes: + - iggy_data:/iggy/data + environment: + # Root user credentials + - IGGY_ROOT_USERNAME=iggy + - IGGY_ROOT_PASSWORD=iggy + # Server configuration + - IGGY_HTTP_ENABLED=true + - IGGY_HTTP_ADDRESS=0.0.0.0:3000 + - IGGY_TCP_ENABLED=true + - IGGY_TCP_ADDRESS=0.0.0.0:8090 + - IGGY_QUIC_ENABLED=true + - IGGY_QUIC_ADDRESS=0.0.0.0:8080 + # Metrics endpoint (Prometheus-compatible) + - IGGY_HTTP_METRICS_ENABLED=true + - IGGY_HTTP_METRICS_ENDPOINT=/metrics + healthcheck: + test: ["CMD", "iggy", "ping"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 10s + restart: unless-stopped + networks: + - iggy-network - # Sample application - app: - build: - context: . - dockerfile: Dockerfile - container_name: iggy-sample-app - ports: - - "8000:8000" - environment: - - HOST=0.0.0.0 - - PORT=8000 - - IGGY_CONNECTION_STRING=iggy://iggy:iggy@iggy:8090 - - IGGY_STREAM=sample-stream - - IGGY_TOPIC=events - - IGGY_PARTITIONS=3 - - RUST_LOG=info,iggy_sample=debug - depends_on: - iggy: - condition: service_healthy - restart: unless-stopped + # Sample application + app: + build: + context: . + dockerfile: Dockerfile + container_name: iggy-sample-app + ports: + - "8000:8000" + environment: + - HOST=0.0.0.0 + - PORT=8000 + - IGGY_CONNECTION_STRING=iggy://iggy:iggy@iggy:8090 + - IGGY_STREAM=sample-stream + - IGGY_TOPIC=events + - IGGY_PARTITIONS=3 + - RUST_LOG=info,iggy_sample=debug + depends_on: + iggy: + condition: service_healthy + restart: unless-stopped + networks: + - iggy-network + + # Prometheus metrics collection + prometheus: + image: prom/prometheus:latest + container_name: iggy-prometheus + ports: + - "9090:9090" + volumes: + - ./observability/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml:ro + - prometheus_data:/prometheus + command: + - "--config.file=/etc/prometheus/prometheus.yml" + - "--storage.tsdb.path=/prometheus" + - "--storage.tsdb.retention.time=15d" + - "--web.enable-lifecycle" + - "--web.console.libraries=/usr/share/prometheus/console_libraries" + - "--web.console.templates=/usr/share/prometheus/consoles" + depends_on: + iggy: + condition: service_healthy + restart: unless-stopped + networks: + - iggy-network + + # Grafana visualization + grafana: + image: grafana/grafana:latest + container_name: iggy-grafana + ports: + - "3001:3000" + volumes: + - ./observability/grafana/provisioning:/etc/grafana/provisioning:ro + - grafana_data:/var/lib/grafana + environment: + # Default admin credentials (change in production) + - GF_SECURITY_ADMIN_USER=admin + - GF_SECURITY_ADMIN_PASSWORD=admin + # Allow anonymous access for local development + - GF_AUTH_ANONYMOUS_ENABLED=true + - GF_AUTH_ANONYMOUS_ORG_ROLE=Viewer + # Server settings + - GF_SERVER_ROOT_URL=http://localhost:3001 + # Provisioning + - GF_PATHS_PROVISIONING=/etc/grafana/provisioning + depends_on: + - prometheus + restart: unless-stopped + networks: + - iggy-network + + # Iggy Web UI - Dashboard for managing streams, topics, and messages + iggy-web-ui: + image: apache/iggy-web-ui:latest + container_name: iggy-web-ui + ports: + - "3050:3050" + environment: + # Connect to Iggy server HTTP API + - PUBLIC_IGGY_API_URL=http://iggy:3000 + depends_on: + iggy: + condition: service_healthy + restart: unless-stopped + networks: + - iggy-network volumes: - iggy_data: - driver: local + iggy_data: + driver: local + prometheus_data: + driver: local + grafana_data: + driver: local + +networks: + iggy-network: + driver: bridge diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 0000000..633c5e7 --- /dev/null +++ b/docs/README.md @@ -0,0 +1,60 @@ +# Documentation and Guides + +This directory contains comprehensive guides for understanding and working with Apache Iggy and event-driven architecture patterns. + +## Available Guides + +| Guide | Description | +|-------|-------------| +| [Event-Driven Architecture Guide](guide.md) | Core concepts, streams/topics/partitions, consumer groups, error handling patterns, and production patterns (outbox, saga, idempotency) | +| [Partitioning Guide](partitioning-guide.md) | Deep dive into partition keys, ordering guarantees, and partition selection strategies | +| [Durable Storage Guide](durable-storage-guide.md) | Storage architecture, fsync configuration, backup/archiving to S3, recovery procedures, and production recommendations | +| [Structured Concurrency](structured-concurrency.md) | Task lifecycle management, cancellation tokens, and graceful shutdown patterns | + +## Quick Navigation + +### Getting Started +- New to Iggy? Start with the [Event-Driven Architecture Guide](guide.md) +- Setting up production? See [Durable Storage Guide](durable-storage-guide.md) + +### By Topic + +**Architecture & Design** +- [Core Concepts](guide.md#core-concepts) - EDA fundamentals, Iggy overview +- [Streams, Topics, and Partitions](guide.md#streams-topics-and-partitions) - Data organization +- [Message Flow Patterns](guide.md#message-flow-patterns) - Pub/sub, work queues, event sourcing + +**Data Durability** +- [Durability Configuration](durable-storage-guide.md#durability-configuration) - fsync settings and trade-offs +- [Backup and Archiving](durable-storage-guide.md#backup-and-archiving) - S3 and disk archival +- [Recovery](durable-storage-guide.md#recovery-and-data-integrity) - Disaster recovery procedures + +**Message Ordering** +- [Partition Keys](partitioning-guide.md) - Ensuring message ordering +- [Consumer Groups](guide.md#consumer-groups) - Parallel processing with ordering + +**Production Patterns** +- [Error Handling](guide.md#error-handling-strategies) - At-least-once, idempotency, DLQ +- [Outbox Pattern](guide.md#pattern-outbox-for-reliable-publishing) - Reliable event publishing +- [Production Recommendations](durable-storage-guide.md#production-recommendations) - Config templates + +**Application Internals** +- [Structured Concurrency](structured-concurrency.md) - Background task management +- [Graceful Shutdown](structured-concurrency.md) - Clean shutdown procedures + +## External Resources + +### Official Iggy Documentation +- [Apache Iggy GitHub](https://github.com/apache/iggy) +- [Iggy Documentation](https://iggy.apache.org/docs/) +- [Iggy Architecture](https://iggy.apache.org/docs/introduction/architecture/) +- [Iggy Server Configuration](https://iggy.apache.org/docs/server/configuration) + +### Related Reading +- [Event-Driven Architecture Patterns](https://martinfowler.com/articles/201701-event-driven.html) - Martin Fowler +- [Designing Data-Intensive Applications](https://dataintensive.net/) - Martin Kleppmann +- [Enterprise Integration Patterns](https://www.enterpriseintegrationpatterns.com/) + +--- + +*Last updated: December 2025* diff --git a/docs/durable-storage-guide.md b/docs/durable-storage-guide.md new file mode 100644 index 0000000..d966442 --- /dev/null +++ b/docs/durable-storage-guide.md @@ -0,0 +1,722 @@ +# Apache Iggy Durable Storage Guide + +A comprehensive guide to configuring Apache Iggy for durable, production-ready message storage. + +--- + +## Table of Contents + +1. [Storage Architecture Overview](#storage-architecture-overview) +2. [Durability Configuration](#durability-configuration) +3. [Segment Management](#segment-management) +4. [Data Retention Policies](#data-retention-policies) +5. [Backup and Archiving](#backup-and-archiving) +6. [Recovery and Data Integrity](#recovery-and-data-integrity) +7. [Performance vs Durability Trade-offs](#performance-vs-durability-trade-offs) +8. [Production Recommendations](#production-recommendations) +9. [Current Limitations](#current-limitations) +10. [Configuration Reference](#configuration-reference) + +--- + +## Storage Architecture Overview + +### Append-Only Log Model + +Iggy uses an **append-only log** as its core storage abstraction. This is the same fundamental design used by Kafka, Apache BookKeeper, and other high-performance streaming systems. + +``` +Topic: "orders" (3 partitions) +├── Partition 0/ +│ ├── segment-0000000000.log (messages 0-999,999) +│ ├── segment-0000000000.idx (offset index) +│ ├── segment-0000000000.tidx (time index) +│ ├── segment-0001000000.log (messages 1,000,000-1,999,999) +│ └── ... +├── Partition 1/ +│ └── ... +└── Partition 2/ + └── ... +``` + +### Key Characteristics + +| Property | Description | +|----------|-------------| +| **Immutable records** | Messages cannot be modified once written | +| **Sequential writes** | Optimized for append operations | +| **Indexed access** | Fast lookups by offset and timestamp | +| **Segment-based** | Data split into manageable chunks | + +### Storage Location + +By default, all data is persisted under the `local_data` directory. Configure the path in `server.toml`: + +```toml +[system] +path = "local_data" # Change to your preferred location +``` + +Or via environment variable: + +```bash +export IGGY_SYSTEM_PATH=/var/lib/iggy/data +``` + +--- + +## Durability Configuration + +Durability in Iggy is controlled by **fsync** settings. fsync forces the operating system to flush data from kernel buffers to the physical storage device, ensuring data survives crashes. + +### The fsync Trade-off + +``` +Without fsync (default): +┌─────────┐ ┌─────────────┐ ┌──────────┐ +│ Iggy │ → │ OS Buffer │ → │ Disk │ +│ Server │ │ Cache │ │ │ +└─────────┘ └─────────────┘ └──────────┘ + ↑ + Data lives here temporarily. + Lost on power failure or crash. + +With fsync enabled: +┌─────────┐ ┌─────────────┐ ┌──────────┐ +│ Iggy │ → │ OS Buffer │ ═══│ Disk │ +│ Server │ │ Cache │ ↑ │ │ +└─────────┘ └─────────────┘ │ └──────────┘ + │ + fsync() forces write to disk + before acknowledging message. +``` + +### Core Durability Settings + +#### 1. Message Saver Configuration + +The `message_saver` section controls background persistence: + +```toml +[system.partition.message_saver] +# Enable background message saving +enabled = true + +# Force synchronous writes to disk +# true = durable but slower (data guaranteed on disk before ack) +# false = faster but data may be lost on crash +enforce_fsync = true + +# Interval between save operations (when not using fsync per message) +interval = "1s" +``` + +#### 2. Per-Message fsync (Maximum Durability) + +For the strongest durability guarantee, force fsync after every message: + +```toml +[system.partition] +# Force fsync on every partition write +enforce_fsync = true + +# Flush after each message (set to 1 for maximum durability) +messages_required_to_save = 1 +``` + +Or via environment variables: + +```bash +export IGGY_SYSTEM_PARTITION_ENFORCE_FSYNC=true +export IGGY_SYSTEM_PARTITION_MESSAGES_REQUIRED_TO_SAVE=1 +``` + +### Durability Levels + +| Level | Configuration | Latency | Data Loss Risk | +|-------|---------------|---------|----------------| +| **Maximum** | `enforce_fsync=true`, `messages_required_to_save=1` | ~9ms P99 | None (survives power loss) | +| **High** | `enforce_fsync=true`, `messages_required_to_save=100` | ~1-2ms P99 | Up to 100 messages | +| **Balanced** | `enforce_fsync=true`, `interval="1s"` | ~0.5ms P99 | Up to 1 second of data | +| **Performance** | `enforce_fsync=false` | ~0.1ms P99 | Unbounded (OS-dependent) | + +### Performance Benchmarks + +With fsync-per-message enabled, Iggy achieves impressive durability performance: + +| Metric | Value | Notes | +|--------|-------|-------| +| P99 Producer Latency | ~9.48ms | With `enforce_fsync=true`, `messages_required_to_save=1` | +| P9999 Producer Latency | ~9.48ms | Consistent even at tail | +| P99 Consumer Latency | <3ms | Reading from durable storage | + +Single-digit millisecond latencies at P9999 for a fully durable workload is excellent performance, competitive with or exceeding many other streaming platforms. + +--- + +## Segment Management + +Iggy stores messages in **segments**, which are physical files on disk. + +### Segment Configuration + +```toml +[system.segment] +# Maximum segment size before rolling to new segment +# Larger = fewer files, potentially slower recovery +# Smaller = more files, faster recovery, more overhead +size = "1 GB" + +# Cache offset indexes in memory (faster reads) +cache_indexes = true + +# Cache time-based indexes in memory +cache_time_indexes = true + +# Validate checksums when loading segments (integrity check) +validate_checksum = true +``` + +### Segment Lifecycle + +``` +1. ACTIVE: Current segment receiving writes + └── segment-0000000000.log (< size limit) + +2. ROLLED: Size limit reached, new segment created + ├── segment-0000000000.log (closed, read-only) + └── segment-0001000000.log (now active) + +3. EXPIRED: All messages in segment past retention + └── segment-0000000000.log (eligible for deletion/archival) + +4. ARCHIVED/DELETED: Based on configuration + └── (moved to archive location or deleted) +``` + +### Index Files + +Each segment has associated index files: + +| File | Purpose | +|------|---------| +| `.log` | Message data (binary format) | +| `.idx` | Offset index (find message by offset) | +| `.tidx` | Time index (find message by timestamp) | + +### Checksum Validation + +Enable checksum validation to detect data corruption: + +```toml +[system.segment] +validate_checksum = true # Validates on segment load +``` + +This adds overhead during segment loading but catches corruption early. + +--- + +## Data Retention Policies + +Iggy supports both time-based and size-based retention. + +### Time-Based Retention (Message Expiry) + +Messages are deleted after a specified duration: + +```toml +[system.segment] +# Human-readable format +message_expiry = "7 days" # 7 days +message_expiry = "2 days 4 hours" # 2 days + 4 hours +message_expiry = "none" # Never expire (default) +``` + +Expiry is evaluated per-segment. The entire segment is removed when all messages within it have expired. + +### Size-Based Retention (Max Topic Size) + +Limit the total size of a topic: + +```toml +[topic] +# Maximum topic size across all partitions +max_topic_size = "10 GB" + +# Delete oldest segments when limit reached +delete_oldest_segments = true +``` + +When the limit is reached, the oldest segments are deleted in whole-segment increments to make room for new messages. + +### Retention Strategy by Use Case + +| Use Case | Time Retention | Size Limit | Rationale | +|----------|----------------|------------|-----------| +| Real-time processing | 1-7 days | 10-50 GB | Data processed quickly, no need to keep | +| Audit logs | 1-7 years | Unlimited | Compliance requirements | +| Event sourcing | Never expire | Unlimited | Events are source of truth | +| Metrics/telemetry | 30-90 days | 100 GB | Balance history vs storage cost | +| Dev/test environments | 1 day | 1 GB | Quick turnover, limited resources | + +### Configuring Per-Topic Retention + +When creating topics via the SDK: + +```rust +use iggy::prelude::*; + +// 7-day retention +client.create_topic( + "mystream", + "events", + 3, // partitions + None, + None, + Some(IggyExpiry::ExpireDuration(IggyDuration::from_str("7days")?)), + Some(IggyByteSize::from_str("10GB")?), // max size +).await?; + +// Never expire (event sourcing) +client.create_topic( + "mystream", + "event-store", + 10, + None, + None, + Some(IggyExpiry::NeverExpire), + None, // no size limit +).await?; +``` + +--- + +## Backup and Archiving + +Iggy supports archiving expired segments before deletion, either to local disk or S3-compatible cloud storage. + +### Disk Archiving + +Archive segments to a local directory: + +```toml +[data_maintenance.archiver] +enabled = true +kind = "disk" + +# Archive expired segments before deletion +archive_expired = true + +# Local path for archived data +path = "/var/lib/iggy/archive" +``` + +### S3-Compatible Cloud Storage + +Archive to AWS S3, MinIO, or any S3-compatible storage: + +```toml +[data_maintenance.archiver] +enabled = true +kind = "s3" + +# Archive expired segments before deletion +archive_expired = true + +# S3 configuration +[data_maintenance.archiver.s3] +# AWS credentials (or use IAM roles/environment) +access_key_id = "your-access-key" +secret_access_key = "your-secret-key" + +# S3 bucket and optional prefix +bucket = "iggy-backups" +key_prefix = "production/iggy/" + +# Region +region = "us-east-1" + +# For non-AWS S3 (MinIO, etc.) +endpoint = "https://minio.example.com" + +# Temporary staging directory for uploads +tmp_upload_dir = "/tmp/iggy-upload" +``` + +### Archive Workflow + +``` +1. Segment expires (all messages past retention) + └── segment-0000000000.log + +2. If archiver enabled: + ├── Upload to S3: s3://bucket/prefix/stream/topic/partition/segment-0000000000.log + └── Verify upload success + +3. After successful archive: + └── Delete local segment (if archive_expired = true) + +4. If archiver disabled: + └── Delete segment immediately +``` + +### MinIO Example (Self-Hosted S3) + +For on-premises deployments using MinIO: + +```toml +[data_maintenance.archiver] +enabled = true +kind = "s3" +archive_expired = true + +[data_maintenance.archiver.s3] +access_key_id = "minioadmin" +secret_access_key = "minioadmin" +bucket = "iggy-archive" +endpoint = "http://minio:9000" +region = "us-east-1" # Required even for MinIO +tmp_upload_dir = "/tmp/iggy-s3-staging" +``` + +--- + +## Recovery and Data Integrity + +### Automatic State Recovery + +If metadata becomes corrupted but segment files are intact, Iggy can rebuild: + +```toml +[system.recovery] +# Recreate stream/topic/partition metadata from existing data files +recreate_missing_state = true +``` + +This scans the data directory and rebuilds the logical structure from physical segment files. + +### Manual Recovery Steps + +1. **Identify corruption**: Check logs for checksum errors or missing segments + +2. **Stop the server**: Ensure no writes during recovery + +3. **Verify segment files**: Check for truncated or zero-byte files + ```bash + find /var/lib/iggy/data -name "*.log" -size 0 + ``` + +4. **Restore from backup** (if using archiver): + ```bash + aws s3 sync s3://iggy-backups/production/iggy/ /var/lib/iggy/data/ + ``` + +5. **Restart with recovery enabled**: + ```bash + export IGGY_SYSTEM_RECOVERY_RECREATE_MISSING_STATE=true + iggy-server + ``` + +### Preventing Data Corruption + +| Measure | Configuration | Purpose | +|---------|---------------|---------| +| Checksums | `validate_checksum = true` | Detect corruption on read | +| fsync | `enforce_fsync = true` | Prevent partial writes | +| S3 backup | `archiver.enabled = true` | Off-site redundancy | +| EBS snapshots | (infrastructure) | Point-in-time recovery | + +--- + +## Performance vs Durability Trade-offs + +### Understanding the Trade-off + +Every durability feature has a performance cost: + +``` +Performance ←────────────────────────────────→ Durability + +[No fsync] [Batched fsync] [Per-message fsync] + ~0.1ms ~1ms ~9ms + Data at risk Some data risk No data loss +``` + +### Choosing Your Configuration + +#### High-Throughput, Lower Durability (Metrics, Logs) + +```toml +[system.partition] +enforce_fsync = false + +[system.partition.message_saver] +enabled = true +enforce_fsync = false +interval = "5s" +``` + +- Throughput: Millions of messages/second +- Latency: Sub-millisecond +- Risk: May lose several seconds of data on crash + +#### Balanced (Most Applications) + +```toml +[system.partition] +enforce_fsync = true +messages_required_to_save = 100 + +[system.partition.message_saver] +enabled = true +enforce_fsync = true +interval = "1s" +``` + +- Throughput: Hundreds of thousands of messages/second +- Latency: 1-2ms +- Risk: May lose up to 100 messages on crash + +#### Maximum Durability (Financial, Critical Systems) + +```toml +[system.partition] +enforce_fsync = true +messages_required_to_save = 1 + +[system.partition.message_saver] +enabled = true +enforce_fsync = true +``` + +- Throughput: Tens of thousands of messages/second +- Latency: ~9ms +- Risk: No data loss (survives power failure) + +### Hardware Considerations + +fsync performance depends heavily on storage: + +| Storage Type | fsync Latency | Recommendation | +|--------------|---------------|----------------| +| NVMe SSD | 0.1-1ms | Best for durable workloads | +| SATA SSD | 1-5ms | Good for balanced workloads | +| HDD | 5-15ms | Avoid for high-durability needs | +| Network (EBS gp3) | 1-3ms | Good, use provisioned IOPS | +| Network (EBS io2) | 0.5-1ms | Best for cloud deployments | + +--- + +## Production Recommendations + +### Recommended Production Configuration + +```toml +# server.toml - Production Configuration + +[system] +path = "/var/lib/iggy/data" + +[system.partition] +enforce_fsync = true +messages_required_to_save = 100 # Balance: durability with reasonable latency + +[system.partition.message_saver] +enabled = true +enforce_fsync = true +interval = "1s" + +[system.segment] +size = "1 GB" +cache_indexes = true +cache_time_indexes = true +validate_checksum = true +message_expiry = "7 days" # Adjust per your needs + +[system.recovery] +recreate_missing_state = true + +[data_maintenance.archiver] +enabled = true +kind = "s3" +archive_expired = true + +[data_maintenance.archiver.s3] +bucket = "your-iggy-backups" +region = "us-east-1" +key_prefix = "production/" +tmp_upload_dir = "/tmp/iggy-upload" +``` + +### Infrastructure Checklist + +- [ ] **Storage**: Use NVMe SSD or provisioned IOPS EBS +- [ ] **Filesystem**: ext4 or XFS with `noatime` mount option +- [ ] **Backup**: Enable S3 archiver or EBS snapshots +- [ ] **Monitoring**: Alert on disk space, segment count, backup failures +- [ ] **Capacity**: Plan for 2x expected data volume +- [ ] **Recovery testing**: Regularly test restore from backups + +### Monitoring Metrics + +Key metrics to monitor for storage health: + +| Metric | Alert Threshold | Action | +|--------|-----------------|--------| +| Disk usage | >80% | Add storage or reduce retention | +| Segment count | >1000 per partition | Review segment size settings | +| Backup failures | Any | Check S3 credentials/connectivity | +| fsync latency | >50ms | Check storage health | +| Checksum errors | Any | Investigate corruption source | + +--- + +## Current Limitations + +As of Iggy v0.6.0 (December 2025): + +### Single-Node Architecture + +Iggy currently runs as a **single node only**. There is no built-in replication or clustering. + +**Implications:** +- No automatic failover +- Single point of failure for availability +- Rely on S3 archiving + infrastructure (EBS snapshots) for redundancy + +**Roadmap:** +- Clustering via Viewstamped Replication is planned for future releases +- Will provide automatic failover and data replication + +### Mitigation Strategies + +Until clustering is available: + +1. **S3 Archiving**: Archive all data to durable cloud storage +2. **EBS Snapshots**: If on AWS, use automated EBS snapshots +3. **Cold standby**: Keep a replica populated from S3 archives +4. **Infrastructure HA**: Use container orchestration for process restarts + +``` +Primary ──writes──→ Iggy ──archives──→ S3 + │ + └── EBS Snapshot (every hour) + +On failure: +1. Launch new instance +2. Attach latest EBS snapshot (or restore from S3) +3. Start Iggy with recreate_missing_state = true +4. Resume operations (some data loss possible) +``` + +--- + +## Configuration Reference + +### Environment Variables + +All configuration can be set via environment variables using the pattern `IGGY_
_`: + +| Variable | Default | Description | +|----------|---------|-------------| +| `IGGY_SYSTEM_PATH` | `local_data` | Data directory | +| `IGGY_SYSTEM_PARTITION_ENFORCE_FSYNC` | `false` | Enable fsync on writes | +| `IGGY_SYSTEM_PARTITION_MESSAGES_REQUIRED_TO_SAVE` | `10000` | Messages before flush | +| `IGGY_SYSTEM_SEGMENT_SIZE` | `1073741824` | Segment size in bytes (1GB) | +| `IGGY_SYSTEM_SEGMENT_CACHE_INDEXES` | `true` | Cache offset indexes | +| `IGGY_SYSTEM_SEGMENT_VALIDATE_CHECKSUM` | `false` | Validate on load | +| `IGGY_SYSTEM_SEGMENT_MESSAGE_EXPIRY` | `none` | Default message TTL | +| `IGGY_SYSTEM_RECOVERY_RECREATE_MISSING_STATE` | `false` | Auto-rebuild metadata | +| `IGGY_DATA_MAINTENANCE_ARCHIVER_ENABLED` | `false` | Enable archiving | +| `IGGY_DATA_MAINTENANCE_ARCHIVER_KIND` | `disk` | `disk` or `s3` | + +### Complete server.toml Template + +```toml +# Apache Iggy Server Configuration +# Full reference for durable storage settings + +[system] +# Base path for all data +path = "local_data" + +[system.partition] +# Synchronous writes for durability +enforce_fsync = true + +# Messages to buffer before flushing (1 = immediate, higher = batched) +messages_required_to_save = 100 + +[system.partition.message_saver] +# Background persistence +enabled = true +enforce_fsync = true +interval = "1s" + +[system.segment] +# Max segment size (bytes or human-readable) +size = "1 GB" + +# Index caching for performance +cache_indexes = true +cache_time_indexes = true + +# Data integrity +validate_checksum = true + +# Default message retention (overridable per-topic) +message_expiry = "7 days" + +[system.recovery] +# Rebuild metadata from segment files if missing +recreate_missing_state = true + +[data_maintenance.archiver] +# Enable archiving of expired segments +enabled = true + +# "disk" or "s3" +kind = "s3" + +# Archive before deleting expired segments +archive_expired = true + +# Disk archiver settings (if kind = "disk") +[data_maintenance.archiver.disk] +path = "/var/lib/iggy/archive" + +# S3 archiver settings (if kind = "s3") +[data_maintenance.archiver.s3] +access_key_id = "" # Or use IAM roles +secret_access_key = "" # Or use IAM roles +bucket = "iggy-backups" +key_prefix = "production/" +region = "us-east-1" +endpoint = "" # Custom endpoint for MinIO, etc. +tmp_upload_dir = "/tmp/iggy-upload" +``` + +--- + +## Sources and Further Reading + +### Official Documentation +- [Apache Iggy GitHub Repository](https://github.com/apache/iggy) +- [Iggy Architecture Documentation](https://iggy.apache.org/docs/introduction/architecture/) +- [Iggy Server Configuration](https://iggy.apache.org/docs/server/configuration) +- [Iggy Official Website](https://iggy.apache.org/) + +### Related Concepts +- [Apache Kafka Storage Internals](https://kafka.apache.org/documentation/#design_filesystem) - Similar log-based architecture +- [fsync and Data Durability](https://lwn.net/Articles/457667/) - Linux kernel perspective +- [Write-Ahead Logging](https://en.wikipedia.org/wiki/Write-ahead_logging) - Foundational concept + +### Performance References +- [Iggy Benchmarks](https://github.com/apache/iggy/tree/master/bench) - Official benchmark suite +- [io_uring and Modern I/O](https://iggy.apache.org/blogs/2025/11/17/websocket-io-uring/) - Iggy's I/O architecture + +--- + +*Last updated: December 2025* +*Iggy version: 0.6.0* diff --git a/docs/guide.md b/docs/guide.md new file mode 100644 index 0000000..dd3c573 --- /dev/null +++ b/docs/guide.md @@ -0,0 +1,1002 @@ +# Event-Driven Architecture with Apache Iggy + +A comprehensive guide to understanding and implementing event-driven applications using Apache Iggy message streaming. + +--- + +## Table of Contents + +1. [Core Concepts](#core-concepts) +2. [Streams, Topics, and Partitions](#streams-topics-and-partitions) +3. [Message Flow Patterns](#message-flow-patterns) +4. [Partition Keys and Ordering](#partition-keys-and-ordering) +5. [Consumer Groups](#consumer-groups) +6. [Message Retention and Expiry](#message-retention-and-expiry) +7. [Error Handling Strategies](#error-handling-strategies) +8. [Production Patterns](#production-patterns) +9. [Code Examples](#code-examples) +10. [Quick Reference](#quick-reference) + +--- + +## Core Concepts + +### What is Event-Driven Architecture? + +Event-driven architecture (EDA) is a design pattern where the flow of the program is determined by events - significant changes in state that the system needs to react to. + +``` +Traditional Request/Response: +Client → Request → Server → Response → Client (synchronous, blocking) + +Event-Driven: +Producer → Event → Message Broker → Consumer(s) (asynchronous, decoupled) +``` + +### Key Benefits + +| Benefit | Description | +|---------|-------------| +| **Decoupling** | Producers don't need to know about consumers | +| **Scalability** | Add consumers independently to handle load | +| **Resilience** | Events persist; consumers can recover and replay | +| **Flexibility** | Multiple consumers can react to same event differently | + +### Apache Iggy Overview + +Apache Iggy is a persistent message streaming platform written in Rust, designed for: +- **High throughput**: Millions of messages/second +- **Low latency**: Sub-millisecond tail latencies +- **Durability**: Persistent append-only log with configurable retention +- **Multiple protocols**: TCP, QUIC, HTTP, WebSocket + +--- + +## Streams, Topics, and Partitions + +### Hierarchy + +``` +Iggy Server +└── Stream (logical grouping, like a database) + └── Topic (event category, like a table) + └── Partition (parallelism unit, like a shard) + └── Messages (ordered within partition) +``` + +### Streams + +A **stream** is a top-level container for related topics. Use streams for: +- Multi-tenancy (one stream per tenant) +- Environment separation (dev, staging, prod) +- Domain boundaries (ecommerce, analytics, notifications) + +```rust +// Creating a stream +client.create_stream("ecommerce", Some(1)).await?; +``` + +### Topics + +A **topic** is a named channel for a specific category of events. Think of it as: +- A "table" in database terms +- A "queue" in messaging terms +- A "channel" in pub/sub terms + +```rust +// Creating a topic with 3 partitions, 7-day retention +client.create_topic( + "ecommerce", // stream + "orders", // topic name + 3, // partitions + None, // compression (None = default) + None, // replication (future) + Some(IggyExpiry::ExpireDuration(IggyDuration::from_str("7days")?)), + None, // max topic size +).await?; +``` + +**Topic Design Guidelines:** + +| Guideline | Example | +|-----------|---------| +| One topic per event category | `orders`, `users`, `payments` | +| Verb-noun naming | `order-created`, `user-updated` | +| Don't over-partition | Start with 3-10, scale as needed | + +### Partitions + +**Partitions are NOT for sub-categorization.** They exist for: + +1. **Parallelism** - Multiple consumers can read simultaneously +2. **Ordering** - Messages within a partition are strictly ordered + +``` +Topic: "orders" (3 partitions) +├── Partition 0 → Consumer A (processes ~33% of orders) +├── Partition 1 → Consumer B (processes ~33% of orders) +└── Partition 2 → Consumer C (processes ~33% of orders) +``` + +**Critical Understanding:** + +``` +❌ WRONG: Partitions as categories + Partition 0 = "pending orders" + Partition 1 = "shipped orders" + Partition 2 = "cancelled orders" + +✅ CORRECT: Partitions as parallel lanes + Partition 0 = orders hashing to 0 (any status) + Partition 1 = orders hashing to 1 (any status) + Partition 2 = orders hashing to 2 (any status) +``` + +### Choosing Partition Count + +| Scenario | Partitions | Reasoning | +|----------|------------|-----------| +| Development/Testing | 1-3 | Simplicity | +| Small production | 3-10 | Moderate parallelism | +| High throughput | 10-50 | High parallelism | +| Massive scale | 50-100+ | Maximum parallelism | + +**Rule**: Partitions >= maximum number of parallel consumers you'll need. + +--- + +## Message Flow Patterns + +### Pattern 1: Simple Pub/Sub + +One producer, multiple independent consumers. + +``` +Producer → Topic → Consumer A (notifications) + → Consumer B (analytics) + → Consumer C (audit log) +``` + +Each consumer maintains its own offset and processes all messages. + +### Pattern 2: Competing Consumers (Work Queue) + +Multiple consumers in a group share the workload. + +``` +Producer → Topic (3 partitions) → Consumer Group "workers" + ├── Worker A (partition 0) + ├── Worker B (partition 1) + └── Worker C (partition 2) +``` + +Each message is processed by exactly one consumer in the group. + +### Pattern 3: Event Sourcing + +Events as the source of truth; state derived from event replay. + +``` +Commands → Event Store (Iggy) → Event Handlers → Read Models + ↓ + Event Replay → Rebuild State +``` + +### Pattern 4: CQRS (Command Query Responsibility Segregation) + +Separate write and read paths. + +``` +Write Path: Command → Validate → Store Event → Publish to Iggy +Read Path: Iggy → Consumer → Update Read Database → Query API +``` + +--- + +## Partition Keys and Ordering + +### The Ordering Problem + +``` +Without partition keys: +Message 1: "Order 123 created" → Partition 0 +Message 2: "Order 123 paid" → Partition 2 ← Different partition! +Message 3: "Order 123 shipped" → Partition 1 ← Different partition! + +Consumer might process "shipped" before "created" = BUG +``` + +### Solution: Partition Keys + +A partition key ensures related messages go to the same partition: + +```rust +// All events for order-123 go to same partition +let partition_key = format!("order-{}", order_id); +producer.send_with_key(partition_key, message).await?; +``` + +**How it works:** + +``` +partition_key = "order-123" +partition_index = hash("order-123") % num_partitions + = 2847623847 % 3 + = 1 + +All "order-123" events → Partition 1 → Processed in order +``` + +### Choosing Partition Keys + +| Domain | Partition Key | Reasoning | +|--------|---------------|-----------| +| Orders | `order_id` | All events for an order stay ordered | +| Users | `user_id` | User actions processed in sequence | +| Sessions | `session_id` | Session events stay together | +| Devices | `device_id` | Device telemetry ordered | +| Transactions | `txn_id` | Financial events strictly ordered | + +### Code Example + +```rust +use iggy::prelude::*; + +// Publishing with partition key +async fn publish_order_event( + producer: &IggyProducer, + order_id: Uuid, + event: OrderEvent, +) -> Result<(), Error> { + let message = serde_json::to_vec(&event)?; + + // Use order_id as partition key - ensures ordering + let partition_key = order_id.to_string(); + + producer + .send_with_key(partition_key, message) + .await?; + + Ok(()) +} + +// All these events for order-123 will be processed in order: +publish_order_event(&producer, order_123, OrderEvent::Created { .. }).await?; +publish_order_event(&producer, order_123, OrderEvent::Paid { .. }).await?; +publish_order_event(&producer, order_123, OrderEvent::Shipped { .. }).await?; +``` + +--- + +## Consumer Groups + +### What is a Consumer Group? + +A consumer group is a set of consumers that cooperatively consume a topic. The server automatically assigns partitions to consumers in the group. + +``` +Topic (6 partitions) + Consumer Group "processors" (3 consumers) + +Partition 0 ─┐ +Partition 1 ─┴─→ Consumer A + +Partition 2 ─┐ +Partition 3 ─┴─→ Consumer B + +Partition 4 ─┐ +Partition 5 ─┴─→ Consumer C +``` + +### Consumer Group Guarantees + +| Guarantee | Description | +|-----------|-------------| +| **Each message processed once** | Within a group, no duplicates | +| **Partition exclusivity** | One consumer owns a partition at a time | +| **Automatic rebalancing** | If consumer dies, partitions reassigned | +| **Offset tracking** | Server tracks progress per consumer | + +### Creating a Consumer Group + +```rust +use iggy::prelude::*; + +let mut consumer = client + .consumer_group("ecommerce", "orders", "order-processors")? + .auto_commit(AutoCommit::IntervalOrWhen( + IggyDuration::from_str("5s")?, + AutoCommitWhen::ConsumingAllMessages, + )) + .create_consumer_group_if_not_exists() + .auto_join_consumer_group() + .polling_strategy(PollingStrategy::next()) + .batch_size(100) + .build(); + +consumer.init().await?; + +while let Some(message) = consumer.next().await { + process_message(message)?; + // Offset auto-committed every 5s or when batch complete +} +``` + +### Scaling with Consumer Groups + +``` +Initial: 1 consumer, 3 partitions +┌─────────────────────────────────┐ +│ Consumer A: P0, P1, P2 │ +└─────────────────────────────────┘ + +Scale up: 3 consumers, 3 partitions (rebalance) +┌───────────┬───────────┬───────────┐ +│ Consumer A│ Consumer B│ Consumer C│ +│ P0 │ P1 │ P2 │ +└───────────┴───────────┴───────────┘ + +Over-scaled: 4 consumers, 3 partitions (one idle) +┌───────────┬───────────┬───────────┬───────────┐ +│ Consumer A│ Consumer B│ Consumer C│ Consumer D│ +│ P0 │ P1 │ P2 │ IDLE │ +└───────────┴───────────┴───────────┴───────────┘ +``` + +**Key insight**: You can't have more active consumers than partitions. + +--- + +## Message Retention and Expiry + +### Retention Policies + +Iggy supports time-based message expiry: + +```rust +// Topic with 7-day retention +IggyExpiry::ExpireDuration(IggyDuration::from_str("7days")?) + +// Topic with 1-year retention (audit logs) +IggyExpiry::ExpireDuration(IggyDuration::from_str("365days")?) + +// Never expire (permanent event store) +IggyExpiry::NeverExpire + +// Use server default +IggyExpiry::ServerDefault +``` + +### Retention Strategy by Use Case + +| Use Case | Retention | Reasoning | +|----------|-----------|-----------| +| Transient events | 1-7 days | Processed and done | +| Audit logs | 1-7 years | Compliance requirements | +| Event sourcing | Forever | Events are source of truth | +| Metrics/telemetry | 30-90 days | Trend analysis window | + +### Example: Multi-Topic Retention + +```rust +// Transient processing queue - 7 days +client.create_topic( + "myapp", "task-queue", 3, None, None, + Some(IggyExpiry::ExpireDuration(IggyDuration::from_str("7days")?)), + None, +).await?; + +// Audit log - 1 year +client.create_topic( + "myapp", "audit-log", 1, None, None, + Some(IggyExpiry::ExpireDuration(IggyDuration::from_str("365days")?)), + None, +).await?; + +// Event store - never expire +client.create_topic( + "myapp", "event-store", 10, None, None, + Some(IggyExpiry::NeverExpire), + None, +).await?; +``` + +--- + +## Error Handling Strategies + +### At-Most-Once Delivery + +Process before committing offset. May lose messages on failure. + +```rust +while let Some(message) = consumer.next().await { + // Offset committed immediately (auto-commit) + // If process_message fails, message is lost + process_message(message)?; +} +``` + +**Use when**: Losing occasional messages is acceptable (metrics, logs). + +### At-Least-Once Delivery + +Commit offset only after successful processing. May reprocess on failure. + +```rust +let mut consumer = client + .consumer_group("stream", "topic", "group")? + .auto_commit(AutoCommit::Disabled) // Manual commit + .build(); + +while let Some(message) = consumer.next().await { + match process_message(&message) { + Ok(_) => { + consumer.commit_offset().await?; // Commit after success + } + Err(e) => { + log::error!("Processing failed: {}, will retry", e); + // Don't commit - message will be redelivered + } + } +} +``` + +**Use when**: Every message must be processed (orders, payments). + +### Dead Letter Queue (DLQ) + +Move poison messages aside after retry exhaustion. + +```rust +const MAX_RETRIES: u32 = 3; + +async fn process_with_dlq( + message: &Message, + dlq_producer: &IggyProducer, +) -> Result<(), Error> { + let retry_count = get_retry_count(message); + + match process_message(message).await { + Ok(_) => Ok(()), + Err(e) if retry_count < MAX_RETRIES => { + // Requeue with incremented retry count + requeue_with_retry(message, retry_count + 1).await + } + Err(e) => { + // Max retries exceeded - send to DLQ + log::error!("Moving to DLQ after {} retries: {}", retry_count, e); + dlq_producer.send(message.to_dlq_format()).await?; + Ok(()) // Commit original message + } + } +} +``` + +### Idempotency + +Design consumers to handle duplicate messages safely. + +**Why idempotency matters:** In at-least-once delivery, the same message may be processed multiple times (network retries, consumer restarts). Your processing logic must handle this gracefully. + +**The Pattern:** Store processed event IDs in a database. Before processing, check if the event was already handled. + +```rust +/// Database represents your application's persistent storage. +/// This could be PostgreSQL, MySQL, MongoDB, Redis, or any database. +/// +/// The key requirement: it must support atomic transactions so we can +/// record the event ID and apply changes together. +/// +/// Example implementations: +/// - PostgreSQL with sqlx: `sqlx::PgPool` +/// - MongoDB: `mongodb::Client` +/// - Redis: `redis::Client` (for simple cases) +struct Database { + pool: sqlx::PgPool, // Example: PostgreSQL connection pool +} + +impl Database { + /// Check if we've already processed this event. + /// This prevents duplicate processing in at-least-once delivery. + async fn event_exists(&self, event_id: Uuid) -> Result { + let result = sqlx::query!( + "SELECT 1 FROM processed_events WHERE event_id = $1", + event_id + ) + .fetch_optional(&self.pool) + .await?; + + Ok(result.is_some()) + } + + /// Execute multiple operations atomically. + /// Either all succeed or all fail - no partial updates. + async fn transaction(&self, f: F) -> Result + where + F: FnOnce(&mut Transaction) -> Future>, + { + let mut tx = self.pool.begin().await?; + let result = f(&mut tx).await?; + tx.commit().await?; + Ok(result) + } +} + +/// Idempotent event processing: safe to call multiple times with same event. +async fn process_order_idempotently( + event: &OrderEvent, + db: &Database, +) -> Result<(), Error> { + // Step 1: Check if already processed using event ID + // This is the idempotency check - critical for at-least-once delivery + if db.event_exists(event.id).await? { + log::info!("Event {} already processed, skipping", event.id); + return Ok(()); // Safe to return - we already handled this + } + + // Step 2: Process and record the event ID atomically + // The transaction ensures: either BOTH happen or NEITHER happens + // This prevents the bug where we process but don't record (causing reprocess) + db.transaction(|tx| async { + // Apply the business logic (update order status, send email, etc.) + apply_order_event(tx, event).await?; + + // Record that we processed this event ID + tx.record_event_id(event.id).await?; + + Ok(()) + }).await +} + +/// Apply the actual business logic for an order event. +/// This is where you update your domain models. +async fn apply_order_event(tx: &mut Transaction, event: &OrderEvent) -> Result<(), Error> { + match &event.data { + OrderData::Created { items, total } => { + // Insert new order into database + sqlx::query!( + "INSERT INTO orders (id, items, total, status) VALUES ($1, $2, $3, 'pending')", + event.order_id, items, total + ).execute(tx).await?; + } + OrderData::Paid { amount } => { + // Update order status to paid + sqlx::query!( + "UPDATE orders SET status = 'paid', paid_amount = $1 WHERE id = $2", + amount, event.order_id + ).execute(tx).await?; + } + // ... handle other event types + } + Ok(()) +} +``` + +**Database Schema for Idempotency:** + +```sql +-- Table to track processed events (for idempotency) +CREATE TABLE processed_events ( + event_id UUID PRIMARY KEY, + processed_at TIMESTAMP DEFAULT NOW(), + -- Optional: store event type for debugging + event_type VARCHAR(100) +); + +-- Index for fast lookups +CREATE INDEX idx_processed_events_id ON processed_events(event_id); + +-- Optional: clean up old records (events older than retention period) +-- Run periodically via cron or scheduled job +DELETE FROM processed_events WHERE processed_at < NOW() - INTERVAL '30 days'; +``` + +--- + +## Production Patterns + +### Pattern: Outbox for Reliable Publishing + +**The Problem:** How do you ensure a database change and an event publication happen together? If you do them separately, you risk inconsistency. + +```rust +// THE PROBLEM: Two separate operations that can partially fail + +async fn create_order_naive(order: &Order, producer: &IggyProducer) -> Result<(), Error> { + // Step 1: Save to database + db.save_order(&order).await?; + + // Step 2: Publish event + producer.send(order_created_event).await?; // What if this fails? + + // If Step 1 succeeds but Step 2 fails: + // - Order exists in database + // - But no event was published + // - Other services never learn about the order + // - System is now INCONSISTENT + + Ok(()) +} +``` + +**The Solution: Outbox Pattern** + +Instead of publishing directly, write the event to an "outbox" table in the SAME database transaction as your business data. A separate process reads the outbox and publishes to the message broker. + +```rust +/// The Outbox Pattern: Atomic database + event publishing +/// +/// How it works: +/// 1. Business operation + event insert in ONE transaction +/// 2. Separate "publisher" process reads outbox table +/// 3. Publisher sends events to message broker +/// 4. Publisher marks events as published +/// +/// Benefits: +/// - Atomicity: If transaction fails, neither data nor event is saved +/// - Reliability: Events are persisted; can retry publishing +/// - Ordering: Events published in order they were created + +// Step 1: Write business data AND event in same transaction +async fn create_order_with_outbox( + db: &Database, + order: &Order, +) -> Result<(), Error> { + db.transaction(|tx| async { + // Save the order (your business data) + tx.save_order(&order).await?; + + // Insert event into outbox table (same transaction!) + let event = OrderCreatedEvent { + order_id: order.id, + user_id: order.user_id, + total: order.total, + created_at: Utc::now(), + }; + tx.insert_outbox_event("order.created", &event).await?; + + // Both succeed or both fail - atomic! + Ok(()) + }).await +} + +// Step 2: Separate background process that publishes events +async fn outbox_publisher(db: &Database, producer: &IggyProducer) { + loop { + // Fetch unpublished events from outbox table + let events = db.fetch_unpublished_events(100).await?; + + for event in events { + // Publish to Iggy + match producer.send(event.payload.clone()).await { + Ok(_) => { + // Mark as published so we don't send again + db.mark_event_published(event.id).await?; + } + Err(e) => { + // Log error, will retry on next iteration + log::error!("Failed to publish event {}: {}", event.id, e); + } + } + } + + // Small delay to avoid busy-looping + tokio::time::sleep(Duration::from_millis(100)).await; + } +} +``` + +**Outbox Database Schema:** + +```sql +-- Outbox table: stores events to be published +CREATE TABLE outbox ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + event_type VARCHAR(100) NOT NULL, -- e.g., "order.created" + payload JSONB NOT NULL, -- The event data + created_at TIMESTAMP DEFAULT NOW(), + published_at TIMESTAMP NULL, -- NULL = not yet published + + -- Optional: for ordering and debugging + aggregate_type VARCHAR(100), -- e.g., "Order" + aggregate_id UUID -- e.g., the order ID +); + +-- Index for the publisher query (find unpublished events) +CREATE INDEX idx_outbox_unpublished ON outbox(created_at) + WHERE published_at IS NULL; + +-- Publisher query +SELECT * FROM outbox +WHERE published_at IS NULL +ORDER BY created_at +LIMIT 100; +``` + +**Further Reading:** +- [Microservices Patterns: Transactional Outbox](https://microservices.io/patterns/data/transactional-outbox.html) +- [Debezium CDC](https://debezium.io/) - Alternative approach using Change Data Capture + +### Pattern: Saga for Distributed Transactions + +Coordinate multi-service operations with compensating actions. + +``` +Order Saga: +1. CreateOrder → OrderCreated +2. ReserveInventory → InventoryReserved (or InventoryFailed → CompensateOrder) +3. ProcessPayment → PaymentProcessed (or PaymentFailed → ReleaseInventory, CompensateOrder) +4. ShipOrder → OrderShipped + +Each step publishes events; failures trigger compensating events. +``` + +### Pattern: Event Versioning + +Handle schema evolution gracefully. + +```rust +#[derive(Serialize, Deserialize)] +#[serde(tag = "version")] +enum OrderCreatedEvent { + #[serde(rename = "1")] + V1 { + order_id: Uuid, + user_id: Uuid, + total: f64, // Old: floating point + }, + #[serde(rename = "2")] + V2 { + order_id: Uuid, + user_id: Uuid, + total: Decimal, // New: exact decimal + currency: String, // New field + }, +} + +fn process_order_created(event: OrderCreatedEvent) { + match event { + OrderCreatedEvent::V1 { order_id, user_id, total } => { + // Handle legacy format + let total = Decimal::from_f64_retain(total).unwrap(); + process_order(order_id, user_id, total, "USD") + } + OrderCreatedEvent::V2 { order_id, user_id, total, currency } => { + process_order(order_id, user_id, total, ¤cy) + } + } +} +``` + +--- + +## Code Examples + +### Complete Producer Example + +```rust +use iggy::prelude::*; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive(Debug, Serialize, Deserialize)] +struct OrderEvent { + id: Uuid, + order_id: Uuid, + event_type: String, + timestamp: chrono::DateTime, + data: serde_json::Value, +} + +async fn run_producer() -> Result<(), Box> { + // Connect to Iggy + let client = IggyClient::builder() + .with_tcp() + .with_server_address("127.0.0.1:8090") + .build()?; + + client.connect().await?; + client.login_user("iggy", "iggy").await?; + + // Ensure stream and topic exist + let _ = client.create_stream("ecommerce", Some(1)).await; + let _ = client.create_topic( + "ecommerce", + "orders", + 3, // partitions + None, + None, + Some(IggyExpiry::ExpireDuration(IggyDuration::from_str("7days")?)), + None, + ).await; + + // Create producer + let producer = client + .producer("ecommerce", "orders")? + .batch_size(100) + .send_interval(IggyDuration::from_str("100ms")?) + .build(); + + producer.init().await?; + + // Send events + let order_id = Uuid::new_v4(); + let event = OrderEvent { + id: Uuid::new_v4(), + order_id, + event_type: "order.created".to_string(), + timestamp: chrono::Utc::now(), + data: serde_json::json!({ + "items": [{"product_id": "123", "quantity": 2}], + "total": "59.98" + }), + }; + + let payload = serde_json::to_vec(&event)?; + + // Use order_id as partition key for ordering + producer + .send_with_key(order_id.to_string(), payload) + .await?; + + println!("Sent event: {:?}", event.id); + + Ok(()) +} +``` + +### Complete Consumer Example + +```rust +use iggy::prelude::*; + +async fn run_consumer() -> Result<(), Box> { + let client = IggyClient::builder() + .with_tcp() + .with_server_address("127.0.0.1:8090") + .build()?; + + client.connect().await?; + client.login_user("iggy", "iggy").await?; + + // Create consumer with consumer group + let mut consumer = client + .consumer_group("ecommerce", "orders", "order-processors")? + .auto_commit(AutoCommit::IntervalOrWhen( + IggyDuration::from_str("5s")?, + AutoCommitWhen::ConsumingAllMessages, + )) + .create_consumer_group_if_not_exists() + .auto_join_consumer_group() + .polling_strategy(PollingStrategy::next()) + .poll_interval(IggyDuration::from_str("100ms")?) + .batch_size(100) + .build(); + + consumer.init().await?; + + println!("Consumer started, waiting for messages..."); + + while let Some(message) = consumer.next().await { + let event: OrderEvent = serde_json::from_slice(&message.payload)?; + + println!( + "Received: {} - {} (partition {})", + event.event_type, + event.order_id, + message.partition_id + ); + + // Process the event + match process_order_event(&event).await { + Ok(_) => println!("Processed successfully"), + Err(e) => eprintln!("Processing error: {}", e), + } + } + + Ok(()) +} + +async fn process_order_event(event: &OrderEvent) -> Result<(), Box> { + match event.event_type.as_str() { + "order.created" => handle_order_created(event).await, + "order.paid" => handle_order_paid(event).await, + "order.shipped" => handle_order_shipped(event).await, + _ => { + println!("Unknown event type: {}", event.event_type); + Ok(()) + } + } +} +``` + +--- + +## Quick Reference + +### CLI Commands + +```bash +# View messages in a topic +iggy -u iggy -p iggy message poll --offset 0 --message-count 100 stream topic 1 + +# Send a message +iggy -u iggy -p iggy message send --partition-id 1 stream topic "hello world" + +# List streams +iggy -u iggy -p iggy stream list + +# List topics in a stream +iggy -u iggy -p iggy topic list stream + +# Get topic details +iggy -u iggy -p iggy topic get stream topic +``` + +### HTTP API + +```bash +# Health check +curl http://localhost:3000/ + +# Create stream +curl -X POST http://localhost:3000/streams \ + -H "Content-Type: application/json" \ + -d '{"stream_id": 1, "name": "my-stream"}' + +# Create topic +curl -X POST http://localhost:3000/streams/my-stream/topics \ + -H "Content-Type: application/json" \ + -d '{"topic_id": 1, "name": "events", "partitions_count": 3}' + +# Send message +curl -X POST http://localhost:3000/streams/my-stream/topics/events/messages \ + -H "Content-Type: application/json" \ + -d '{"partitioning": {"kind": "partition_id", "value": 1}, "messages": [{"payload": "aGVsbG8="}]}' + +# Poll messages +curl "http://localhost:3000/streams/my-stream/topics/events/messages?consumer_id=1&partition_id=1&count=10" +``` + +### Environment Variables (iggy_sample) + +```bash +# Connection +IGGY_CONNECTION_STRING=iggy://user:pass@host:8090 + +# Defaults +IGGY_STREAM=sample-stream +IGGY_TOPIC=events +IGGY_PARTITIONS=3 + +# Rate limiting +RATE_LIMIT_RPS=100 +RATE_LIMIT_BURST=50 + +# Message limits +BATCH_MAX_SIZE=1000 +POLL_MAX_COUNT=100 +``` + +### Observability URLs + +| Service | URL | Credentials | +|---------|-----|-------------| +| Iggy HTTP API | http://localhost:3000 | iggy/iggy | +| Iggy Web UI | http://localhost:3050 | iggy/iggy | +| Prometheus | http://localhost:9090 | - | +| Grafana | http://localhost:3001 | admin/admin | +| Sample App | http://localhost:8000 | - | + +--- + +## Further Reading + +- [Apache Iggy Documentation](https://iggy.apache.org/docs/) +- [Event-Driven Architecture Patterns](https://martinfowler.com/articles/201701-event-driven.html) +- [Designing Data-Intensive Applications](https://dataintensive.net/) - Martin Kleppmann +- [Enterprise Integration Patterns](https://www.enterpriseintegrationpatterns.com/) + +--- + +*Last updated: December 2025* diff --git a/docs/partitioning-guide.md b/docs/partitioning-guide.md new file mode 100644 index 0000000..7e157c8 --- /dev/null +++ b/docs/partitioning-guide.md @@ -0,0 +1,1447 @@ +# Understanding Partitions in Message Streaming + +A deep-dive guide into partitioning strategies, partition keys, and ordering guarantees in event-driven systems, with specific focus on Apache Iggy. + +--- + +## Table of Contents + +1. [What is a Partition?](#what-is-a-partition) +2. [Why Partitions Exist](#why-partitions-exist) +3. [The Ordering Problem](#the-ordering-problem) +4. [Partition Keys Explained](#partition-keys-explained) +5. [Iggy Partitioning Strategies](#iggy-partitioning-strategies) +6. [Choosing Partition Keys](#choosing-partition-keys) +7. [Common Partitioning Mistakes](#common-partitioning-mistakes) +8. [Partition Count Guidelines](#partition-count-guidelines) +9. [Iggy Partition Storage Architecture](#iggy-partition-storage-architecture) +10. [Iggy Server Configuration](#iggy-server-configuration) +11. [Rebalancing and Consumer Groups](#rebalancing-and-consumer-groups) +12. [Custom Partitioners](#custom-partitioners) +13. [Real-World Scenarios](#real-world-scenarios) +14. [Code Examples](#code-examples) +15. [Troubleshooting](#troubleshooting) + +--- + +## What is a Partition? + +A **partition** is a physical subdivision of a topic. Think of it as a separate, ordered log file within a topic. + +### Visual Model + +``` +Topic: "orders" (3 partitions) + +Partition 0: [msg1] → [msg4] → [msg7] → [msg10] → ... +Partition 1: [msg2] → [msg5] → [msg8] → [msg11] → ... +Partition 2: [msg3] → [msg6] → [msg9] → [msg12] → ... + +Each partition: +- Is an independent, ordered sequence +- Can be read by one consumer (within a consumer group) +- Has its own offset counter +- Is stored separately on disk +``` + +### Key Properties + +| Property | Description | +|----------|-------------| +| **Ordered** | Messages within a partition maintain strict order | +| **Independent** | Partitions don't share state or ordering with each other | +| **Parallelizable** | Different partitions can be processed simultaneously | +| **Persistent** | Each partition is an append-only log on disk | + +--- + +## Why Partitions Exist + +Partitions solve two fundamental problems in distributed systems: + +### Problem 1: Scalability + +Without partitions, a single consumer must process ALL messages: + +``` +Topic (no partitions): +[msg1] → [msg2] → [msg3] → [msg4] → [msg5] → ... → Consumer A (bottleneck!) + +With partitions: +Partition 0: [msg1] → [msg4] → ... → Consumer A +Partition 1: [msg2] → [msg5] → ... → Consumer B +Partition 2: [msg3] → [msg6] → ... → Consumer C + +Throughput: 3x improvement +``` + +### Problem 2: Locality + +Related data can be colocated for efficient processing: + +``` +Without partition keys: +Order 123 events scattered randomly: + P0: [Order 123 Created] + P2: [Order 123 Paid] ← Different partition! + P1: [Order 123 Shipped] ← Different partition! + +With partition key = order_id: +All Order 123 events together: + P1: [Order 123 Created] → [Order 123 Paid] → [Order 123 Shipped] +``` + +### The Tradeoff + +``` +More Partitions = More Parallelism = Less Global Ordering + +┌─────────────────────────────────────────────────────────────────┐ +│ 1 Partition │ Perfect global order, no parallelism │ +├───────────────────────┼─────────────────────────────────────────┤ +│ 3 Partitions │ Good balance for most applications │ +├───────────────────────┼─────────────────────────────────────────┤ +│ 100 Partitions │ High parallelism, complex coordination │ +└─────────────────────────────────────────────────────────────────┘ +``` + +--- + +## The Ordering Problem + +This is the most critical concept to understand. + +### Scenario: E-commerce Order Processing + +```rust +// These events MUST be processed in order for correctness +let events = vec![ + OrderEvent::Created { order_id: "123", total: 100.00 }, + OrderEvent::PaymentReceived { order_id: "123", amount: 100.00 }, + OrderEvent::Shipped { order_id: "123", tracking: "UPS123" }, + OrderEvent::Delivered { order_id: "123" }, +]; +``` + +### What Happens Without Partition Keys? + +``` +Producer sends events (no partition key): + +Event 1: Created → hash(random) % 3 = 0 → Partition 0 +Event 2: Paid → hash(random) % 3 = 2 → Partition 2 +Event 3: Shipped → hash(random) % 3 = 1 → Partition 1 +Event 4: Delivered → hash(random) % 3 = 0 → Partition 0 + +Consumer processing order (parallel consumption): + +Consumer A (P0): Created → Delivered +Consumer B (P1): Shipped +Consumer C (P2): Paid + +Actual processing order: Created → Shipped → Paid → Delivered + ^^^^^^^^^^^^^^^^ + OUT OF ORDER! BUG! +``` + +### The Bug in Action + +```rust +// Consumer processes "Shipped" before "Paid" +fn process_order_shipped(order_id: &str) { + let order = db.get_order(order_id); + + if order.status != OrderStatus::Paid { + // ERROR: "Cannot ship unpaid order!" + // But the payment DID happen - we just processed out of order + } +} +``` + +### Solution: Partition Keys + +``` +Producer sends events WITH partition key = order_id: + +Event 1: Created → hash("123") % 3 = 1 → Partition 1 +Event 2: Paid → hash("123") % 3 = 1 → Partition 1 +Event 3: Shipped → hash("123") % 3 = 1 → Partition 1 +Event 4: Delivered → hash("123") % 3 = 1 → Partition 1 + +All events for order "123" go to Partition 1. +Single consumer processes them in exact order. +``` + +--- + +## Partition Keys Explained + +### How Partition Keys Work + +``` +partition_index = hash(partition_key) % number_of_partitions +``` + +Step by step: + +``` +partition_key = "order-123" + +Step 1: Hash the key + hash("order-123") = 2847623847 (example hash value) + +Step 2: Modulo by partition count + 2847623847 % 3 = 1 + +Step 3: Route to partition + Message → Partition 1 +``` + +### Consistent Hashing Guarantee + +The same key ALWAYS maps to the same partition (given fixed partition count): + +``` +hash("order-123") % 3 = 1 ← Always 1 +hash("order-123") % 3 = 1 ← Always 1 +hash("order-123") % 3 = 1 ← Always 1 + +hash("order-456") % 3 = 2 ← Always 2 +hash("order-456") % 3 = 2 ← Always 2 +``` + +### What Makes a Good Partition Key? + +| Characteristic | Good | Bad | +|----------------|------|-----| +| **Cardinality** | High (many unique values) | Low (few values) | +| **Distribution** | Even spread across values | Skewed (hot keys) | +| **Stability** | Doesn't change for entity | Changes over time | +| **Business meaning** | Represents ordering boundary | Arbitrary | + +--- + +## Iggy Partitioning Strategies + +Apache Iggy provides three built-in partitioning strategies through the `Partitioning` struct and `PartitioningKind` enum. + +### Strategy 1: Balanced (Round-Robin) + +The server automatically distributes messages across partitions using round-robin: + +```rust +use iggy::prelude::*; + +// Messages distributed: P0 → P1 → P2 → P0 → P1 → P2 → ... +let partitioning = Partitioning::balanced(); +``` + +**When to use:** +- Order of events doesn't matter +- Maximum throughput distribution +- Stateless event processing (logs, metrics) + +**Trade-off:** No ordering guarantees across messages. + +### Strategy 2: Partition ID (Direct Assignment) + +Explicitly specify the target partition: + +```rust +use iggy::prelude::*; + +// Always send to partition 2 +let partitioning = Partitioning::partition_id(2); +``` + +**When to use:** +- Priority queues (partition 0 = high priority) +- Manual load balancing +- Testing specific partitions +- Sticky routing based on external logic + +**Trade-off:** You manage distribution manually. + +### Strategy 3: Messages Key (Hash-Based) + +The server calculates partition using **MurmurHash3** of your key: + +```rust +use iggy::prelude::*; + +// All messages with same key go to same partition +// partition = murmur3(key) % partition_count + +// String keys +let partitioning = Partitioning::messages_key_str("order-123")?; + +// Byte slice keys +let partitioning = Partitioning::messages_key(b"order-123")?; + +// Numeric keys (efficient - no string conversion) +let partitioning = Partitioning::messages_key_u64(order_id); +let partitioning = Partitioning::messages_key_u128(uuid_as_u128); +``` + +**Key constraints:** +- Maximum key length: **255 bytes** +- Supports: `&[u8]`, `&str`, `u32`, `u64`, `u128` + +**When to use:** +- Ordering required for related events +- Entity-based processing (orders, users, accounts) +- Session affinity + +### Partitioning Strategy Comparison + +| Strategy | Ordering | Distribution | Use Case | +|----------|----------|--------------|----------| +| `balanced()` | None | Even (round-robin) | Logs, metrics, stateless | +| `partition_id(n)` | Per-partition | Manual | Priority queues, testing | +| `messages_key(k)` | Per-key | Hash-based | Entity workflows, sessions | + +### Understanding MurmurHash3 + +Iggy uses MurmurHash3 for key-based partitioning: + +``` +partition_id = murmur3_hash(key_bytes) % partition_count +``` + +**Properties of MurmurHash3:** +- Non-cryptographic (fast, not secure) +- Excellent distribution (minimal collisions) +- Deterministic (same key = same hash) +- Used by Kafka, Cassandra, and other distributed systems + +**Why it matters:** +``` +murmur3("order-123") = 0x8A3F2B1C (example) +murmur3("order-124") = 0x2D7E9F8A (completely different) + +Even sequential keys distribute evenly across partitions. +``` + +--- + +## Choosing Partition Keys + +### Decision Framework + +Ask yourself: **"Which events MUST be processed in order?"** + +``` +If events for Entity X must be ordered: + partition_key = Entity X's identifier +``` + +### Domain-Specific Examples + +#### E-commerce + +``` +┌─────────────────┬─────────────────┬──────────────────────────────┐ +│ Event Type │ Partition Key │ Reasoning │ +├─────────────────┼─────────────────┼──────────────────────────────┤ +│ Order events │ order_id │ Order lifecycle must be │ +│ │ │ processed sequentially │ +├─────────────────┼─────────────────┼──────────────────────────────┤ +│ Cart events │ user_id │ User's cart modifications │ +│ │ │ must be ordered │ +├─────────────────┼─────────────────┼──────────────────────────────┤ +│ Inventory │ product_id │ Stock changes for a product │ +│ │ │ must be sequential │ +└─────────────────┴─────────────────┴──────────────────────────────┘ +``` + +#### Financial Services + +``` +┌─────────────────┬─────────────────┬──────────────────────────────┐ +│ Event Type │ Partition Key │ Reasoning │ +├─────────────────┼─────────────────┼──────────────────────────────┤ +│ Transactions │ account_id │ Balance updates must be │ +│ │ │ strictly ordered │ +├─────────────────┼─────────────────┼──────────────────────────────┤ +│ Trades │ symbol │ Price updates per symbol │ +│ │ │ must be sequential │ +├─────────────────┼─────────────────┼──────────────────────────────┤ +│ Audit logs │ transaction_id │ Audit trail for each │ +│ │ │ transaction stays together │ +└─────────────────┴─────────────────┴──────────────────────────────┘ +``` + +#### IoT / Telemetry + +``` +┌─────────────────┬─────────────────┬──────────────────────────────┐ +│ Event Type │ Partition Key │ Reasoning │ +├─────────────────┼─────────────────┼──────────────────────────────┤ +│ Sensor readings │ device_id │ Time-series per device │ +│ │ │ must be ordered │ +├─────────────────┼─────────────────┼──────────────────────────────┤ +│ GPS locations │ vehicle_id │ Route reconstruction │ +│ │ │ needs ordered points │ +├─────────────────┼─────────────────┼──────────────────────────────┤ +│ Metrics │ host_id │ Per-host metrics stay │ +│ │ │ together for analysis │ +└─────────────────┴─────────────────┴──────────────────────────────┘ +``` + +#### User Activity + +``` +┌─────────────────┬─────────────────┬──────────────────────────────┐ +│ Event Type │ Partition Key │ Reasoning │ +├─────────────────┼─────────────────┼──────────────────────────────┤ +│ Clickstream │ session_id │ User journey must be │ +│ │ │ reconstructable in order │ +├─────────────────┼─────────────────┼──────────────────────────────┤ +│ User actions │ user_id │ User's actions stay │ +│ │ │ together │ +├─────────────────┼─────────────────┼──────────────────────────────┤ +│ Game events │ match_id │ Game state changes must │ +│ │ │ be ordered per match │ +└─────────────────┴─────────────────┴──────────────────────────────┘ +``` + +--- + +## Common Partitioning Mistakes + +### Mistake 1: Using Partitions as Categories + +``` +❌ WRONG: Treating partitions like separate queues + +Topic: "events" (3 partitions) + Partition 0 = "user events" ← WRONG! + Partition 1 = "order events" ← WRONG! + Partition 2 = "system events" ← WRONG! + +✅ CORRECT: Use separate topics for different event types + +Topic: "user-events" (3 partitions) +Topic: "order-events" (3 partitions) +Topic: "system-events" (3 partitions) +``` + +**Why it's wrong:** Partitions should distribute load, not categorize data. Use topics for categorization. + +### Mistake 2: Too Few Partitions + +``` +❌ WRONG: 1 partition for high-throughput topic + +Topic: "transactions" (1 partition) + → Max 1 consumer + → Bottleneck at ~10K msg/sec + → Cannot scale horizontally + +✅ CORRECT: Plan for growth + +Topic: "transactions" (10 partitions) + → Up to 10 consumers + → Can handle 100K+ msg/sec + → Room to grow +``` + +### Mistake 3: Too Many Partitions + +``` +❌ WRONG: 1000 partitions "just in case" + +Topic: "notifications" (1000 partitions) + → Memory overhead for each partition + → Increased leader election time + → Most partitions sit idle + → Harder to rebalance + +✅ CORRECT: Start reasonable, increase as needed + +Topic: "notifications" (10 partitions) + → Monitor throughput + → Add partitions when needed +``` + +### Mistake 4: Low-Cardinality Partition Keys + +``` +❌ WRONG: Using status as partition key + +partition_key = order.status // Only 5 possible values! + +Distribution: + Partition for "pending": 90% of messages ← HOT! + Partition for "shipped": 5% of messages + Partition for "delivered": 3% of messages + Partition for "cancelled": 2% of messages + +✅ CORRECT: Use high-cardinality key + +partition_key = order.id // Millions of unique values + +Distribution: + Partition 0: ~33% of messages + Partition 1: ~33% of messages + Partition 2: ~33% of messages +``` + +### Mistake 5: Changing Partition Count + +``` +❌ DANGEROUS: Adding partitions to existing topic + +Before: 3 partitions + hash("order-123") % 3 = 1 → Partition 1 + +After: 5 partitions + hash("order-123") % 5 = 2 → Partition 2 ← DIFFERENT! + +Old events for "order-123" in Partition 1 +New events for "order-123" in Partition 2 +Ordering broken! + +✅ SAFE APPROACH: +1. Create new topic with desired partition count +2. Migrate consumers to new topic +3. Dual-write during transition +4. Deprecate old topic +``` + +### Mistake 6: No Partition Key at All + +``` +❌ WRONG: Random distribution (no key) + +producer.send(message); // No partition key + +Events randomly distributed: + Order 123 Created → Partition 0 + Order 123 Paid → Partition 2 ← Out of order risk! + Order 123 Shipped → Partition 1 ← Out of order risk! + +✅ CORRECT: Always use meaningful partition key + +producer.send_with_key(order_id, message); + +Events consistently routed: + Order 123 Created → Partition 1 + Order 123 Paid → Partition 1 ← Same partition + Order 123 Shipped → Partition 1 ← Same partition +``` + +--- + +## Partition Count Guidelines + +### Factors to Consider + +``` +Partition Count = max( + expected_throughput / throughput_per_partition, + max_consumer_parallelism, + minimum_for_availability +) +``` + +### Rules of Thumb + +| Throughput | Partitions | Notes | +|------------|------------|-------| +| < 1K msg/sec | 1-3 | Development, low-volume | +| 1K-10K msg/sec | 3-10 | Small production | +| 10K-100K msg/sec | 10-30 | Medium production | +| 100K+ msg/sec | 30-100+ | High-scale production | + +### Capacity Planning Formula + +``` +Required Partitions = Peak Messages Per Second / Messages Per Partition Per Second + +Example: + Peak throughput: 50,000 msg/sec + Per-partition throughput: 5,000 msg/sec (conservative estimate) + Required: 50,000 / 5,000 = 10 partitions + + Add buffer: 10 * 1.5 = 15 partitions +``` + +### Starting Recommendations + +| Use Case | Starting Partitions | Scale Trigger | +|----------|---------------------|---------------| +| Prototype/Dev | 1-3 | N/A | +| Small app | 3-6 | Consumer lag > 1 min | +| Medium app | 6-12 | Consumer lag > 30 sec | +| Large app | 12-30 | Consumer lag > 10 sec | +| Mission-critical | 30+ | Any lag increase | + +--- + +## Iggy Partition Storage Architecture + +Understanding how Iggy stores partition data helps you make better configuration decisions. + +### Physical Storage Layout + +``` +data/ # Base data directory +└── streams/ + └── {stream_id}/ + └── topics/ + └── {topic_id}/ + └── partitions/ + ├── 0/ # Partition 0 + │ ├── 00000000000000000001/ # Segment 1 + │ │ ├── .log # Message data + │ │ ├── .index # Offset index + │ │ └── .timeindex # Timestamp index + │ └── 00000000000000001001/ # Segment 2 + │ ├── .log + │ ├── .index + │ └── .timeindex + ├── 1/ # Partition 1 + └── 2/ # Partition 2 +``` + +### Segments + +Each partition is divided into **segments** - fixed-size files that make up the append-only log: + +``` +Partition 0: +┌─────────────┐ ┌─────────────┐ ┌─────────────┐ +│ Segment 1 │ → │ Segment 2 │ → │ Segment 3 │ → (active) +│ (closed) │ │ (closed) │ │ (writing) │ +│ 1 GiB │ │ 1 GiB │ │ 0.3 GiB │ +└─────────────┘ └─────────────┘ └─────────────┘ + offset offset offset + 0-999K 1M-1.99M 2M-current +``` + +**Segment benefits:** +- Efficient deletion (drop entire segment files) +- Parallel reads from different segments +- Memory-mapped I/O for performance +- Archival/backup at segment granularity + +### Indexes + +Each segment has two indexes for fast lookups: + +**Offset Index (`.index`):** +``` +Maps message offset → file position +┌────────────┬───────────────┐ +│ Offset │ File Position │ +├────────────┼───────────────┤ +│ 1000000 │ 0 │ +│ 1000100 │ 52480 │ ← Sparse index (every N messages) +│ 1000200 │ 104960 │ +└────────────┴───────────────┘ + +Lookup: offset 1000150 → scan from 52480 +``` + +**Time Index (`.timeindex`):** +``` +Maps timestamp → offset +┌─────────────────────┬────────────┐ +│ Timestamp │ Offset │ +├─────────────────────┼────────────┤ +│ 2024-01-15T10:00:00 │ 1000000 │ +│ 2024-01-15T10:01:00 │ 1000500 │ +│ 2024-01-15T10:02:00 │ 1001000 │ +└─────────────────────┴────────────┘ + +Lookup: "messages after 10:01:30" → start at offset 1000500 +``` + +### Memory and Caching + +Iggy provides configurable index caching: + +| Cache Mode | Behavior | Memory Usage | Read Performance | +|------------|----------|--------------|------------------| +| `all` | All indexes in memory | High | Fastest | +| `open_segment` | Only active segment | Medium | Fast for recent | +| `none` | Read from disk | Low | Slower | + +--- + +## Iggy Server Configuration + +These settings in `server.toml` affect partition behavior. + +### Partition Settings + +```toml +[system.partition] +# Directory for partition data (relative to topic.path) +path = "partitions" + +# Force fsync after every write (durability vs performance) +# true = data safe on disk immediately (slower) +# false = OS manages write buffering (faster, small data loss risk on crash) +enforce_fsync = false + +# Validate CRC checksums when loading data +# true = detect corruption on startup (slower startup) +# false = trust data integrity (faster startup) +validate_checksum = false + +# Buffered messages before forced disk write +# Higher = better throughput, more memory, larger data loss window +messages_required_to_save = 1024 # default, minimum: 32 + +# Buffered bytes before forced disk write +# Triggers save when either this OR messages_required_to_save is reached +size_of_messages_required_to_save = "1 MiB" # minimum: 512 B +``` + +### Segment Settings + +```toml +[system.segment] +# Maximum segment file size before rolling to new segment +# Smaller = more files, faster deletion; Larger = fewer files, better sequential I/O +size = "1 GiB" # maximum: 1 GiB + +# Message expiration time (retention policy) +# "none" = keep forever +# Time format: "7 days", "24 hours", "30 minutes" +message_expiry = "none" + +# What happens when segments expire +# true = move to archive directory +# false = delete permanently +archive_expired = false + +# Index caching strategy +# "all" = cache all indexes (fastest reads, most memory) +# "open_segment" = cache only active segment (balanced) +# "none" = no caching (lowest memory, slowest reads) +cache_indexes = "open_segment" + +# File system confirmation behavior +# "wait" = block until OS confirms write +# "no_wait" = return immediately (faster, less safe) +server_confirmation = "wait" +``` + +### Topic Settings (Affects All Partitions) + +```toml +[system.topic] +# Maximum total size for all partitions in a topic +# "unlimited" or size like "100 GiB" +max_size = "unlimited" + +# Auto-delete oldest segments when max_size is reached +# Only takes effect if max_size is set +delete_oldest_segments = false +``` + +### Message Deduplication + +```toml +[system.message_deduplication] +# Enable server-side duplicate detection by message ID +enabled = false + +# Maximum message IDs to track +max_entries = 10000 + +# How long to remember message IDs +expiry = "1 m" +``` + +### Configuration Trade-offs + +| Goal | Configuration | +|------|---------------| +| **Maximum durability** | `enforce_fsync = true`, `validate_checksum = true` | +| **Maximum throughput** | `enforce_fsync = false`, `messages_required_to_save = 5000` | +| **Low memory** | `cache_indexes = "none"`, smaller `messages_required_to_save` | +| **Fast reads** | `cache_indexes = "all"` | +| **Auto-cleanup** | `message_expiry = "7 days"`, `delete_oldest_segments = true` | + +--- + +## Rebalancing and Consumer Groups + +### How Consumer Groups Use Partitions + +``` +Topic: "orders" (6 partitions) +Consumer Group: "order-processors" + +Scenario 1: 2 consumers +┌─────────────────────────────────────────┐ +│ Consumer A: P0, P1, P2 │ +│ Consumer B: P3, P4, P5 │ +└─────────────────────────────────────────┘ + +Scenario 2: 3 consumers (rebalance) +┌─────────────────────────────────────────┐ +│ Consumer A: P0, P1 │ +│ Consumer B: P2, P3 │ +│ Consumer C: P4, P5 │ +└─────────────────────────────────────────┘ + +Scenario 3: 6 consumers (perfect distribution) +┌─────────────────────────────────────────┐ +│ Consumer A: P0 │ +│ Consumer B: P1 │ +│ Consumer C: P2 │ +│ Consumer D: P3 │ +│ Consumer E: P4 │ +│ Consumer F: P5 │ +└─────────────────────────────────────────┘ + +Scenario 4: 8 consumers (2 idle) +┌─────────────────────────────────────────┐ +│ Consumer A: P0 │ +│ Consumer B: P1 │ +│ Consumer C: P2 │ +│ Consumer D: P3 │ +│ Consumer E: P4 │ +│ Consumer F: P5 │ +│ Consumer G: IDLE (no partition) │ +│ Consumer H: IDLE (no partition) │ +└─────────────────────────────────────────┘ +``` + +### Rebalancing Triggers + +| Event | What Happens | +|-------|--------------| +| Consumer joins | Partitions redistributed | +| Consumer leaves/crashes | Partitions reassigned to remaining consumers | +| Consumer heartbeat timeout | Treated as crash, partitions reassigned | + +### Rebalancing Impact + +``` +During rebalance: +1. All consumers stop processing (brief pause) +2. Coordinator recalculates assignments +3. Consumers receive new assignments +4. Processing resumes + +Duration: Typically 1-30 seconds depending on group size +``` + +### Minimizing Rebalance Impact + +```rust +// Use longer session timeout to prevent spurious rebalances +let consumer = client + .consumer_group("stream", "topic", "group")? + .session_timeout(Duration::from_secs(30)) // Default is often 10s + .heartbeat_interval(Duration::from_secs(10)) + .build(); + +// Process batches, not single messages (reduces commit frequency) +let batch = consumer.poll(100).await?; +for message in batch { + process(message)?; +} +consumer.commit().await?; // One commit per batch +``` + +--- + +## Custom Partitioners + +Iggy supports custom partitioning logic through the `Partitioner` trait. This is useful when built-in strategies don't meet your needs. + +### The Partitioner Trait + +```rust +use iggy::prelude::*; + +/// Custom partitioner must implement this trait +pub trait Partitioner: Send + Sync + Debug { + /// Calculate which partition a message should go to + fn calculate_partition_id( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + messages: &[IggyMessage], + ) -> Result; +} +``` + +### Example: Weighted Partitioner + +Route more traffic to specific partitions (e.g., for heterogeneous hardware): + +```rust +use iggy::prelude::*; +use std::sync::atomic::{AtomicU64, Ordering}; + +/// Routes 70% to partitions 0-1, 30% to partitions 2-4 +#[derive(Debug)] +struct WeightedPartitioner { + counter: AtomicU64, + high_capacity_partitions: Vec, // [0, 1] - faster machines + low_capacity_partitions: Vec, // [2, 3, 4] - slower machines + high_capacity_weight: u64, // 70 +} + +impl Partitioner for WeightedPartitioner { + fn calculate_partition_id( + &self, + _stream_id: &Identifier, + _topic_id: &Identifier, + _messages: &[IggyMessage], + ) -> Result { + let count = self.counter.fetch_add(1, Ordering::Relaxed); + + // 70% to high capacity, 30% to low capacity + if count % 100 < self.high_capacity_weight { + let idx = (count as usize) % self.high_capacity_partitions.len(); + Ok(self.high_capacity_partitions[idx]) + } else { + let idx = (count as usize) % self.low_capacity_partitions.len(); + Ok(self.low_capacity_partitions[idx]) + } + } +} +``` + +### Example: Time-Based Partitioner + +Route messages to partitions based on time windows: + +```rust +use iggy::prelude::*; +use chrono::{Timelike, Utc}; + +/// Routes to different partitions by hour of day +/// Useful for time-bucketed analytics +#[derive(Debug)] +struct HourlyPartitioner { + partitions_count: u32, +} + +impl Partitioner for HourlyPartitioner { + fn calculate_partition_id( + &self, + _stream_id: &Identifier, + _topic_id: &Identifier, + _messages: &[IggyMessage], + ) -> Result { + let hour = Utc::now().hour(); + Ok(hour % self.partitions_count) + } +} +``` + +### Example: Content-Based Partitioner + +Route based on message content: + +```rust +use iggy::prelude::*; +use serde::Deserialize; + +#[derive(Deserialize)] +struct EventEnvelope { + priority: String, + // ... other fields +} + +/// Routes high-priority messages to partition 0 +#[derive(Debug)] +struct PriorityPartitioner { + high_priority_partition: u32, + normal_partitions: Vec, + counter: AtomicU64, +} + +impl Partitioner for PriorityPartitioner { + fn calculate_partition_id( + &self, + _stream_id: &Identifier, + _topic_id: &Identifier, + messages: &[IggyMessage], + ) -> Result { + // Check first message for priority (batch typically has same priority) + if let Some(msg) = messages.first() { + if let Ok(envelope) = serde_json::from_slice::(&msg.payload) { + if envelope.priority == "high" { + return Ok(self.high_priority_partition); + } + } + } + + // Round-robin for normal priority + let count = self.counter.fetch_add(1, Ordering::Relaxed); + let idx = (count as usize) % self.normal_partitions.len(); + Ok(self.normal_partitions[idx]) + } +} +``` + +### Using a Custom Partitioner + +```rust +use iggy::prelude::*; +use std::sync::Arc; + +// Create your custom partitioner +let partitioner = Arc::new(WeightedPartitioner { + counter: AtomicU64::new(0), + high_capacity_partitions: vec![0, 1], + low_capacity_partitions: vec![2, 3, 4], + high_capacity_weight: 70, +}); + +// Use with IggyClient +let client = IggyClient::builder() + .with_tcp() + .with_server_address("127.0.0.1:8090") + .with_partitioner(partitioner) // Inject custom partitioner + .build()?; +``` + +### When to Use Custom Partitioners + +| Use Case | Built-in Strategy | Custom Partitioner | +|----------|-------------------|-------------------| +| Simple key-based | `messages_key()` | Not needed | +| Round-robin | `balanced()` | Not needed | +| Weighted distribution | - | Yes | +| Time-bucketed | - | Yes | +| Content-based routing | - | Yes | +| A/B testing | - | Yes | +| Geographic routing | - | Yes | + +--- + +## Real-World Scenarios + +### Scenario 1: Order Processing Pipeline + +``` +Requirements: +- 10,000 orders/day (~0.1 orders/sec average, 10 orders/sec peak) +- Each order has 5-10 events in its lifecycle +- Order events MUST be processed in sequence +- 3 order processors for redundancy + +Solution: +- Topic: "order-events" with 6 partitions +- Partition key: order_id +- Consumer group: "order-processors" with 3 instances + +Why 6 partitions? +- 3 consumers = minimum 3 partitions +- 6 gives room to scale to 6 consumers +- Even distribution: 2 partitions per consumer +``` + +### Scenario 2: High-Volume Clickstream + +``` +Requirements: +- 1,000,000 clicks/hour (~280 clicks/sec) +- Need to reconstruct user sessions +- Session analysis must see events in order +- Scale processing as needed + +Solution: +- Topic: "clickstream" with 20 partitions +- Partition key: session_id +- Consumer group: "session-analyzers" + +Why 20 partitions? +- 280 msg/sec ÷ ~50 msg/sec per consumer = 6 consumers minimum +- 20 partitions allows scaling to 20 consumers +- High cardinality of session_id ensures even distribution +``` + +### Scenario 3: Financial Transactions + +``` +Requirements: +- Account balance must never go negative +- Transactions for same account must be strictly ordered +- Regulatory audit trail required +- Zero message loss tolerance + +Solution: +- Topic: "transactions" with 12 partitions +- Partition key: account_id +- Consumer group: "transaction-processors" +- Replication factor: 3 (when available) +- acks: all (wait for all replicas) + +Why account_id as key? +- Balance calculations require strict ordering +- Deposit → Withdrawal → Check must be in order +- Different accounts can be processed in parallel +``` + +### Scenario 4: Multi-Tenant SaaS + +``` +Requirements: +- 500 tenants, varying activity levels +- Tenant data isolation +- Fair resource allocation +- Scale per tenant activity + +Solution: +- Topic: "tenant-events" with 50 partitions +- Partition key: tenant_id +- Consumer group per processing type + +Why tenant_id? +- Natural isolation boundary +- Events for a tenant stay together +- Hot tenants spread across partitions via hashing +``` + +--- + +## Code Examples + +### Basic Partitioned Producer + +```rust +use iggy::prelude::*; +use uuid::Uuid; + +struct OrderEvent { + order_id: Uuid, + event_type: String, + data: serde_json::Value, +} + +async fn publish_order_event( + producer: &IggyProducer, + event: &OrderEvent, +) -> Result<(), Error> { + let payload = serde_json::to_vec(event)?; + + // Use order_id as partition key + // All events for this order go to the same partition + let partition_key = event.order_id.to_string(); + + producer + .send_with_key(partition_key, payload) + .await?; + + Ok(()) +} + +// Usage: All these events will be in the same partition, in order +async fn process_order(producer: &IggyProducer, order_id: Uuid) { + let events = vec![ + OrderEvent { + order_id, + event_type: "created".into(), + data: json!({"items": 3, "total": 99.99}), + }, + OrderEvent { + order_id, + event_type: "paid".into(), + data: json!({"method": "credit_card"}), + }, + OrderEvent { + order_id, + event_type: "shipped".into(), + data: json!({"carrier": "UPS", "tracking": "1Z999"}), + }, + ]; + + for event in events { + publish_order_event(producer, &event).await.unwrap(); + } +} +``` + +### Consumer with Partition Awareness + +```rust +async fn run_consumer() -> Result<(), Error> { + let mut consumer = client + .consumer_group("ecommerce", "order-events", "processors")? + .auto_commit(AutoCommit::IntervalOrWhen( + IggyDuration::from_str("5s")?, + AutoCommitWhen::ConsumingAllMessages, + )) + .create_consumer_group_if_not_exists() + .auto_join_consumer_group() + .build(); + + consumer.init().await?; + + while let Some(message) = consumer.next().await { + // message.partition_id tells you which partition this came from + println!( + "Processing message from partition {}: offset {}", + message.partition_id, + message.offset + ); + + let event: OrderEvent = serde_json::from_slice(&message.payload)?; + + // All events for the same order_id come from the same partition + // So they arrive in order + process_order_event(&event).await?; + } + + Ok(()) +} +``` + +### Manual Partition Selection (Advanced) + +```rust +// Sometimes you need explicit control over partition assignment + +async fn send_to_specific_partition( + producer: &IggyProducer, + partition_id: u32, + message: &[u8], +) -> Result<(), Error> { + // Use partition ID directly instead of key-based routing + producer + .send_to_partition(partition_id, message.to_vec()) + .await +} + +// Use case: Priority processing +// High-priority orders go to partition 0 (dedicated fast consumer) +// Normal orders distributed across partitions 1-5 + +async fn send_order_by_priority( + producer: &IggyProducer, + order: &Order, +) -> Result<(), Error> { + let payload = serde_json::to_vec(order)?; + + if order.priority == Priority::High { + producer.send_to_partition(0, payload).await + } else { + // Normal priority: use order_id as key for distribution + producer.send_with_key(order.id.to_string(), payload).await + } +} +``` + +### Partition-Aware Batch Processing + +```rust +use std::collections::HashMap; + +async fn process_batch_by_partition( + messages: Vec, +) -> Result<(), Error> { + // Group messages by partition for efficient processing + let mut by_partition: HashMap> = HashMap::new(); + + for msg in messages { + by_partition + .entry(msg.partition_id) + .or_default() + .push(msg); + } + + // Process each partition's messages (they're already ordered within partition) + for (partition_id, partition_messages) in by_partition { + println!("Processing {} messages from partition {}", + partition_messages.len(), partition_id); + + for msg in partition_messages { + // Messages within a partition are in order + process_message(&msg).await?; + } + } + + Ok(()) +} +``` + +--- + +## Troubleshooting + +### Problem: Uneven Partition Distribution + +**Symptoms:** +- Some partitions have much more data than others +- Some consumers are idle while others are overloaded + +**Diagnosis:** +```bash +# Check partition sizes via Iggy CLI +iggy topic get stream-name topic-name + +# Look for skew in message counts per partition +``` + +**Solutions:** + +1. **Hot key problem**: A few keys produce most messages + ``` + Bad: partition_key = "US" (80% of users) + Good: partition_key = user_id (even distribution) + ``` + +2. **Low cardinality**: Too few unique keys + ``` + Bad: partition_key = status (5 values) + Good: partition_key = order_id (millions of values) + ``` + +3. **Add salting** for hot keys: + ```rust + // Instead of just user_id for power users + let salt = random::() % 10; + let partition_key = format!("{}-{}", user_id, salt); + // Spreads hot user across 10 partitions + // Trade-off: events for same user may be out of order + ``` + +### Problem: Consumer Lag Increasing + +**Symptoms:** +- Messages pile up +- Processing falls behind +- Offset lag grows + +**Solutions:** + +1. **Add consumers** (up to partition count) + ``` + Current: 2 consumers, 6 partitions + Solution: Scale to 6 consumers + ``` + +2. **Increase partitions** (requires migration) + ``` + Current: 3 partitions, 3 consumers at max + Solution: Create new topic with 10 partitions, migrate + ``` + +3. **Optimize consumer** + ```rust + // Process larger batches + .batch_size(1000) // Instead of 100 + + // Reduce commit frequency + .auto_commit(AutoCommit::IntervalOrWhen( + IggyDuration::from_str("10s")?, // Less frequent + AutoCommitWhen::ConsumingAllMessages, + )) + ``` + +### Problem: Out-of-Order Processing + +**Symptoms:** +- Events processed in wrong sequence +- State inconsistencies +- "Cannot find entity" errors + +**Diagnosis:** +```rust +// Add logging to track event order +println!( + "Processing event {} for order {} (partition {}, offset {})", + event.event_type, + event.order_id, + message.partition_id, + message.offset +); +``` + +**Solutions:** + +1. **Add partition key** if missing + ```rust + // Before: no key, random partition + producer.send(message).await; + + // After: consistent partitioning + producer.send_with_key(order_id, message).await; + ``` + +2. **Check for multiple producers** using different keys + ``` + Producer A: partition_key = order.id + Producer B: partition_key = order.user_id ← Inconsistent! + + Fix: All producers must use same key strategy + ``` + +3. **Verify consumer group** name is consistent + ``` + Consumer A: group = "processors" + Consumer B: group = "processor" ← Different group! + + Different groups = both receive all messages + ``` + +--- + +## Summary + +### Key Takeaways + +1. **Partitions enable parallelism**, not categorization +2. **Partition keys ensure ordering** for related events +3. **Choose high-cardinality keys** for even distribution +4. **Start with fewer partitions** and scale as needed +5. **Never change partition count** on active topics +6. **Monitor partition lag** and distribution + +### Quick Decision Guide + +``` +Do I need ordering for related events? +├── Yes → Use partition key (entity ID) +└── No → Random distribution is fine + +How many partitions? +├── Low volume (<1K/sec) → 3-6 +├── Medium (<100K/sec) → 6-20 +└── High (>100K/sec) → 20-100+ + +What should my partition key be? +└── The ID of the entity whose events must be ordered + (order_id, user_id, account_id, device_id, etc.) +``` + +--- + +## Further Reading + +### Apache Iggy Resources +- [Apache Iggy Documentation](https://iggy.apache.org/docs/) - Official documentation +- [Iggy Getting Started Guide](https://iggy.apache.org/docs/introduction/getting-started) - Partitioning strategies explained +- [Iggy Server Configuration](https://iggy.apache.org/docs/server/configuration) - Partition and segment settings +- [Iggy Rust SDK](https://docs.rs/iggy/latest/iggy/) - API reference for `Partitioning`, `Partitioner` trait + +### Distributed Systems Fundamentals +- [Designing Data-Intensive Applications](https://dataintensive.net/) - Chapter 6: Partitioning (Martin Kleppmann) +- [Jay Kreps: The Log](https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying) - Foundational paper on log-based systems +- [Martin Kleppmann's Blog](https://martin.kleppmann.com/) - Distributed systems deep dives + +### Related Technologies +- [Kafka Partitioning](https://kafka.apache.org/documentation/#intro_concepts_and_terms) - Similar concepts (Iggy inspired by Kafka) +- [MurmurHash](https://en.wikipedia.org/wiki/MurmurHash) - Hash algorithm used for partition key routing +- [Cassandra Murmur3Partitioner](https://docs.datastax.com/en/cassandra-oss/3.x/cassandra/architecture/archPartitionerM3P.html) - Another system using murmur3 + +### Performance and Benchmarking +- [Iggy Benchmarks](https://benchmarks.iggy.apache.org) - Official performance metrics +- [Transparent Benchmarking Blog Post](https://iggy.apache.org/blogs) - Iggy's approach to benchmarking + +--- + +*Last updated: December 2025* diff --git a/observability/grafana/provisioning/dashboards/dashboards.yml b/observability/grafana/provisioning/dashboards/dashboards.yml new file mode 100644 index 0000000..fcf3286 --- /dev/null +++ b/observability/grafana/provisioning/dashboards/dashboards.yml @@ -0,0 +1,13 @@ +# Grafana dashboard provisioning +apiVersion: 1 + +providers: + - name: 'Iggy Dashboards' + orgId: 1 + folder: 'Iggy' + folderUid: 'iggy' + type: file + disableDeletion: false + editable: true + options: + path: /etc/grafana/provisioning/dashboards diff --git a/observability/grafana/provisioning/dashboards/iggy-overview.json b/observability/grafana/provisioning/dashboards/iggy-overview.json new file mode 100644 index 0000000..5ee2ea3 --- /dev/null +++ b/observability/grafana/provisioning/dashboards/iggy-overview.json @@ -0,0 +1,304 @@ +{ + "annotations": { + "list": [] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": null, + "links": [], + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "yellow", "value": 80 }, + { "color": "red", "value": 90 } + ] + }, + "unit": "percent" + } + }, + "gridPos": { "h": 6, "w": 6, "x": 0, "y": 0 }, + "id": 1, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": ["lastNotNull"], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "title": "Iggy Server Status", + "type": "stat", + "targets": [ + { + "expr": "up{job=\"iggy\"}", + "legendFormat": "Status", + "refId": "A" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { "type": "linear" }, + "showPoints": "never", + "spanNulls": false, + "stacking": { "group": "A", "mode": "none" }, + "thresholdsStyle": { "mode": "off" } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [{ "color": "green", "value": null }] + }, + "unit": "short" + } + }, + "gridPos": { "h": 8, "w": 12, "x": 6, "y": 0 }, + "id": 2, + "options": { + "legend": { + "calcs": ["mean", "max"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { "mode": "multi", "sort": "desc" } + }, + "title": "HTTP Request Rate", + "type": "timeseries", + "targets": [ + { + "expr": "rate(http_requests_total{job=\"iggy\"}[1m])", + "legendFormat": "{{method}} {{path}}", + "refId": "A" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { "type": "linear" }, + "showPoints": "never", + "spanNulls": false, + "stacking": { "group": "A", "mode": "none" }, + "thresholdsStyle": { "mode": "off" } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [{ "color": "green", "value": null }] + }, + "unit": "bytes" + } + }, + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 8 }, + "id": 3, + "options": { + "legend": { + "calcs": ["mean", "max"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { "mode": "multi", "sort": "desc" } + }, + "title": "Messages Throughput", + "type": "timeseries", + "targets": [ + { + "expr": "rate(iggy_messages_bytes_total{job=\"iggy\"}[1m])", + "legendFormat": "{{stream}} / {{topic}}", + "refId": "A" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { "type": "linear" }, + "showPoints": "never", + "spanNulls": false, + "stacking": { "group": "A", "mode": "none" }, + "thresholdsStyle": { "mode": "off" } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [{ "color": "green", "value": null }] + }, + "unit": "s" + } + }, + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 8 }, + "id": 4, + "options": { + "legend": { + "calcs": ["mean", "p95", "max"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { "mode": "multi", "sort": "desc" } + }, + "title": "Request Latency", + "type": "timeseries", + "targets": [ + { + "expr": "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket{job=\"iggy\"}[5m]))", + "legendFormat": "p95 {{method}}", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.50, rate(http_request_duration_seconds_bucket{job=\"iggy\"}[5m]))", + "legendFormat": "p50 {{method}}", + "refId": "B" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [{ "color": "green", "value": null }] + }, + "unit": "short" + } + }, + "gridPos": { "h": 6, "w": 6, "x": 0, "y": 6 }, + "id": 5, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": ["lastNotNull"], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "title": "Active Streams", + "type": "stat", + "targets": [ + { + "expr": "iggy_streams_count{job=\"iggy\"}", + "legendFormat": "Streams", + "refId": "A" + } + ] + } + ], + "refresh": "5s", + "schemaVersion": 39, + "tags": ["iggy", "messaging"], + "templating": { "list": [] }, + "time": { "from": "now-15m", "to": "now" }, + "timepicker": {}, + "timezone": "browser", + "title": "Iggy Overview", + "uid": "iggy-overview", + "version": 1 +} diff --git a/observability/grafana/provisioning/datasources/datasources.yml b/observability/grafana/provisioning/datasources/datasources.yml new file mode 100644 index 0000000..170333c --- /dev/null +++ b/observability/grafana/provisioning/datasources/datasources.yml @@ -0,0 +1,13 @@ +# Grafana datasource provisioning +apiVersion: 1 + +datasources: + - name: Prometheus + type: prometheus + access: proxy + url: http://prometheus:9090 + isDefault: true + editable: false + jsonData: + timeInterval: "15s" + httpMethod: POST diff --git a/observability/prometheus/prometheus.yml b/observability/prometheus/prometheus.yml new file mode 100644 index 0000000..9484eb0 --- /dev/null +++ b/observability/prometheus/prometheus.yml @@ -0,0 +1,26 @@ +# Prometheus configuration for Iggy observability stack +global: + scrape_interval: 15s + evaluation_interval: 15s + +scrape_configs: + # Iggy server metrics + - job_name: 'iggy' + static_configs: + - targets: ['iggy:3000'] + metrics_path: '/metrics' + scheme: 'http' + + # Sample application metrics (if exposed) + - job_name: 'iggy-sample-app' + static_configs: + - targets: ['app:8000'] + metrics_path: '/metrics' + scheme: 'http' + # Don't fail if app doesn't expose metrics + scrape_timeout: 5s + + # Prometheus self-monitoring + - job_name: 'prometheus' + static_configs: + - targets: ['localhost:9090']