Skip to content

Cloud-native distributed message queue with storage-compute separation architecture. Licensed under BDL 1.0 (academic use permitted, commercial use requires license).

License

Notifications You must be signed in to change notification settings

cloudymoma/rustmq

Repository files navigation

RustMQ: Cloud-Native Distributed Message Queue System

Build Status License: BDL 1.0 Rust Version Version

RustMQ is a next-generation, cloud-native distributed message queue system that combines the high-performance characteristics of Apache Kafka with the cost-effectiveness and operational simplicity of modern cloud architectures. Built from the ground up in Rust, RustMQ leverages a shared-storage architecture that decouples compute from storage, enabling unprecedented elasticity, cost savings, and operational efficiency.

Optimized for Google Cloud Platform: RustMQ is designed with Google Cloud services as the default target, leveraging Google Cloud Storage for cost-effective object storage and Google Kubernetes Engine for orchestration, with all configurations defaulting to the us-central1 region for optimal performance and cost efficiency.

πŸš€ Quick Start

Development Cluster (Single Zone)

# Build images locally
cd docker/ && ./quick-deploy.sh dev-build

# Deploy development cluster  
cd ../gke/ && ./deploy-rustmq-gke.sh deploy --environment development

Production Cluster (Single Zone)

# Build and push production images
cd docker/ && PROJECT_ID=your-project ./quick-deploy.sh production-images

# Deploy production cluster
cd ../gke/ && PROJECT_ID=your-project ./deploy-rustmq-gke.sh deploy --environment production

For detailed setup, see GKE Deployment Guide.

πŸš€ Key Features

  • 10x Cost Reduction: 90% storage cost savings through single-copy storage in Google Cloud Storage
  • 100x Elasticity: Instant scaling with stateless brokers and metadata-only operations
  • Single-Digit Millisecond Latency: Optimized write path with local NVMe WAL and zero-copy data movement
  • Sub-Microsecond Security: Enterprise-grade security with 547ns authorization decisions and 2M+ ops/sec
  • QUIC/HTTP3 Protocol: Reduced connection overhead and head-of-line blocking elimination
  • WebAssembly ETL: Real-time data processing with secure sandboxing and smart filtering
  • Auto-Balancing: Continuous load distribution optimization
  • Google Cloud Native: Default configurations optimized for GCP services

πŸ—οΈ Architecture Overview

RustMQ implements a storage-compute separation architecture with stateless brokers and shared cloud storage for unprecedented elasticity and cost efficiency.

RustMQ Architecture

Click to view the interactive architecture diagram showing RustMQ's innovative storage-compute separation design

Key Architectural Principles

  1. Storage-Compute Separation: Brokers are stateless; all persistent data in shared object storage
  2. Intelligent Tiered Storage: Hot data in WAL/cache, cold data in object storage
  3. Replication Without Data Movement: Shared storage enables instant failover
  4. QUIC/HTTP3 Protocol: Modern transport for reduced latency and head-of-line blocking elimination
  5. Raft Consensus: Distributed coordination for metadata and cluster management
  6. Auto-scaling & Operations: Cloud-native operational capabilities with Kubernetes integration

Architecture Layers

The diagram above illustrates RustMQ's enhanced layered architecture with enterprise security:

  • πŸ”΅ Client Layer - Production-ready SDKs (Rust, Go) with mTLS support and comprehensive admin CLI with complete security management suite
  • 🟑 Enterprise Security Layer - Zero Trust architecture with mTLS authentication, multi-level ACL cache (547ns/1310ns/754ns), certificate management, and 2M+ ops/sec authorization capacity
  • 🟒 Broker Cluster - Stateless compute nodes with MessageBrokerCore, enhanced QUIC/gRPC servers featuring circuit breaker patterns, connection pooling, and real-time health monitoring
  • 🟠 Tiered Storage - Intelligent WAL with upload triggers, workload-isolated caching (hot/cold), and optimized object storage with bandwidth limiting
  • 🟣 Controller Cluster - Raft consensus with distributed ACL storage, metadata management, cluster coordination, and comprehensive admin REST API with advanced rate limiting
  • πŸ”΄ Operational Layer - Production-ready operations with zero-downtime rolling upgrades, automated scaling with partition rebalancing, and Kubernetes integration with volume recovery
  • 🟦 Integration Layer - WebAssembly ETL processing with sandboxing, BigQuery streaming with schema mapping, and comprehensive monitoring infrastructure

Data Flow Patterns

Core Message Flows

  • Write Path: Client β†’ QUIC β†’ Broker β†’ WAL β†’ Cache β†’ Object Storage
  • Read Path: Client ← QUIC ← Broker ← Cache ← Object Storage (if cache miss)
  • Replication: Leader β†’ Followers (Metadata Only) - no data movement due to shared storage

Security & Authentication Flows

  • Client Authentication: Client β†’ mTLS/QUIC β†’ Broker β†’ Certificate Validation β†’ Principal Extraction
  • Authorization: Broker β†’ Multi-Level Cache (L1/L2/L3) β†’ Controller ACL β†’ Permission Decision
  • Inter-Service: Broker ←mTLS/gRPCβ†’ Controller ←mTLS/Raftβ†’ Controller (Cluster)

Administrative & Operational Flows

  • Admin Operations: Admin CLI/API β†’ Controller β†’ Cluster Coordination β†’ Broker Updates
  • Health Monitoring: Background Tasks β†’ Broker Health β†’ Admin API β†’ Real-time Status
  • Scaling Operations: Controller β†’ Partition Rebalancing β†’ Broker Addition/Removal β†’ Traffic Migration

πŸ“‹ Table of Contents

πŸƒ Quick Start

Prerequisites

  • Rust 1.73+ and Cargo - Core development environment
  • Docker and Docker Compose - Container orchestration (see docker/README.md)
  • Google Cloud SDK - For BigQuery integration and GCP services
  • kubectl - For Kubernetes deployment (see docker/README.md)

Production Setup

Option 1: Docker Environment (Recommended)

# Clone the repository
git clone https://github.com/cloudymoma/rustmq.git
cd rustmq

# Start complete RustMQ cluster with all services
cd docker && docker-compose up -d

# Verify cluster health
curl http://localhost:9642/health

# Access services:
# - Broker QUIC: localhost:9092 (mTLS enabled)
# - Admin REST API: localhost:9642 (rate limiting enabled)
# - Controller: localhost:9094 (Raft consensus)

Option 2: Local Build & Run

# Build all production binaries
cargo build --release

# Build with io_uring for optimal I/O performance (Linux only)
cargo build --release --features io-uring

# Verify all tests pass (300+ tests)
cargo test --release

# Start controller cluster (Raft consensus + ACL storage)
./target/release/rustmq-controller --config config/controller.toml &

# Start broker with security enabled
./target/release/rustmq-broker --config config/broker.toml &

# Initialize security infrastructure
./target/release/rustmq-admin ca init --cn "RustMQ Root CA" --org "MyOrg"
./target/release/rustmq-admin certs issue --principal "broker-01" --role broker

# Start Admin REST API with rate limiting
./target/release/rustmq-admin serve-api 8080

Basic Operations

Topic Management

# Create topic with replication
./target/release/rustmq-admin create-topic user-events 12 3

# List all topics with details
./target/release/rustmq-admin list-topics

# Get comprehensive topic information
./target/release/rustmq-admin describe-topic user-events

# Check cluster health
./target/release/rustmq-admin cluster-health

Security Operations

# Certificate management
./target/release/rustmq-admin certs list --role broker
./target/release/rustmq-admin certs validate --cert-file /path/to/cert.pem

# ACL management
./target/release/rustmq-admin acl create \
  --principal "[email protected]" \
  --resource "topic.events.*" \
  --permissions read,write \
  --effect allow

# Security monitoring
./target/release/rustmq-admin security status
./target/release/rustmq-admin audit logs --since "2024-01-01T00:00:00Z"

Client SDK Usage

Rust SDK (Production-Ready)

cd sdk/rust

# Secure producer with mTLS
cargo run --example secure_producer

# Consumer with ACL authorization
cargo run --example secure_consumer

# JWT token authentication
cargo run --example token_authentication

Go SDK (Production-Ready)

cd sdk/go

# Basic producer with TLS
go run examples/tls_producer.go

# Consumer with health monitoring
go run examples/health_monitoring_consumer.go

# Advanced connection management
go run examples/connection_pooling.go

Performance Validation

# Run benchmark tests
cargo bench

# Validate security performance (sub-microsecond authorization)
cargo test --release security::performance

# Test scaling operations
./target/release/rustmq-admin scaling add-brokers 3

# Verify zero-downtime upgrades
./target/release/rustmq-admin operations rolling-upgrade --version latest

πŸ” Security Management CLI

RustMQ now includes a comprehensive security command suite that extends the admin CLI with complete certificate and ACL management capabilities. The security CLI provides enterprise-grade security operations through an intuitive command-line interface.

Key Security Features

  • Certificate Authority Management: Create and manage root CAs with simplified architecture
  • Certificate Lifecycle: Issue, renew, rotate, revoke, and validate certificates
  • Access Control Lists (ACL): Create, manage, and test authorization rules
  • Security Auditing: View audit logs, real-time events, and operation history
  • Security Operations: System status, metrics, health checks, and maintenance
  • Multiple Output Formats: Table, JSON, YAML, and CSV support

Security Command Structure

rustmq-admin [OPTIONS] <COMMAND>

Security Commands:
  ca        Certificate Authority management commands
  certs     Certificate lifecycle management commands  
  acl       ACL management commands
  audit     Security audit commands
  security  General security commands

Global Options:
  --api-url <URL>     Admin API base URL (default: http://127.0.0.1:8080)
  --format <FORMAT>   Output format (table, json, yaml, csv)
  --no-color          Disable colored output
  --verbose           Enable verbose output

Certificate Authority Operations

# Initialize root CA
rustmq-admin ca init \
  --cn "RustMQ Root CA" \
  --org "RustMQ Corp" \
  --country US \
  --validity-years 10

# List CAs with filtering
rustmq-admin ca list --status active --format table

# View CA information
rustmq-admin ca info root_ca_1

# Export CA certificate for client distribution
rustmq-admin ca export --ca-id root_ca_1 --output ca-cert.pem

Certificate Management

# Issue certificates for different roles
rustmq-admin certs issue \
  --principal "broker-01.rustmq.com" \
  --role broker \
  --san "broker-01" \
  --san "192.168.1.100" \
  --validity-days 365

# Certificate lifecycle operations
rustmq-admin certs list --filter active --role broker
rustmq-admin certs renew cert_12345
rustmq-admin certs rotate cert_12345  # Generate new key pair
rustmq-admin certs revoke cert_12345 --reason "key-compromise"

# Certificate validation and status
rustmq-admin certs expiring --days 30
rustmq-admin certs validate --cert-file /path/to/cert.pem
rustmq-admin certs status cert_12345

ACL Management

# Create ACL rules with conditions
rustmq-admin acl create \
  --principal "[email protected]" \
  --resource "topic.users.*" \
  --permissions "read,write" \
  --effect allow \
  --conditions "source_ip=192.168.1.0/24"

# ACL evaluation and testing
rustmq-admin acl test \
  --principal "[email protected]" \
  --resource "topic.users.data" \
  --operation read

rustmq-admin acl permissions "[email protected]"
rustmq-admin acl bulk-test --input-file test_cases.json

# ACL operations
rustmq-admin acl sync --force
rustmq-admin acl cache warm --principals "user1,user2"

Security Auditing

# View audit logs with filtering
rustmq-admin audit logs \
  --since "2024-01-01T00:00:00Z" \
  --type certificate_issued \
  --limit 50

# Real-time event monitoring
rustmq-admin audit events --follow --filter authentication

# Operation-specific audits
rustmq-admin audit certificates --operation revoke
rustmq-admin audit acl --principal "[email protected]"

Security Operations

# System status and health
rustmq-admin security status
rustmq-admin security metrics
rustmq-admin security health

# Maintenance operations
rustmq-admin security cleanup --expired-certs --dry-run
rustmq-admin security backup --output backup.json --include-certs
rustmq-admin security restore --input backup.json --force

Output Format Examples

Table Format (Human-readable):

CERTIFICATE_ID    COMMON_NAME              STATUS    EXPIRES_IN
cert_12345       broker-01.rustmq.com     active    335 days
cert_67890       client-01.rustmq.com     active    280 days

JSON Format (Machine-readable):

rustmq-admin certs list --format json | jq '.[] | {id: .certificate_id, cn: .common_name}'

CSV Format (Spreadsheet-compatible):

rustmq-admin acl list --format csv > acl_rules.csv

User Experience Features

  • Progress Indicators: Visual feedback for long-running operations
  • Confirmation Prompts: Safety checks for destructive operations (use --force to skip)
  • Color Output: Rich formatting with --no-color option for scripts
  • Comprehensive Error Handling: Clear error messages with troubleshooting hints

Documentation and Examples

Integration

The security CLI integrates seamlessly with:

  • Admin REST API: All commands use standardized REST endpoints
  • Rate Limiting: Handles API rate limits gracefully with retry logic
  • Authentication: Configurable API authentication and authorization
  • Existing Commands: Backward compatible with all existing topic management commands

πŸ” Enterprise Security

RustMQ provides enterprise-grade security with Zero Trust architecture, delivering sub-microsecond authorization performance while maintaining the highest security standards.

Key Security Features

  • mTLS Authentication: Mutual TLS for all client-broker communications with validated certificate chains (fixed August 2025)
  • Ultra-Fast Authorization: Multi-level ACL caching (L1/L2/L3) with sub-microsecond latency
  • Complete Certificate Management: Full CA operations, automated renewal, and revocation capabilities with proper X.509 certificate signing
  • Distributed ACL System: Raft consensus for consistent authorization policies across the cluster
  • Zero Trust Architecture: Every request authenticated and authorized with comprehensive audit trails
  • Performance-Oriented Design: String interning, batch fetching, and intelligent caching for production workloads
  • βœ… Production-Ready X.509: Proper certificate signing chains ensuring enterprise-grade security
  • βœ… Advanced Certificate Caching: WebPKI-based cache keys with intelligent invalidation and batch operations

⚑ Measured Performance Characteristics

Benchmark Results (Verified in Production):

  • L1 Cache: 547ns (54% better than 1200ns target) - 1.8M ops/sec capacity
  • L2 Cache: 1,310ns (74% better than 5ΞΌs target) - 763K ops/sec capacity
  • Bloom Filter: 754ns (25% better than 1ΞΌs target) - 1.3M ops/sec capacity
  • System Throughput: 2.08M operations/second (108% better than 1M target)
  • Memory Efficiency: 60-80% reduction through string interning and optimized data structures
  • Authentication: <1ms certificate validation with caching and principal extraction
  • Zero False Negatives: 0% false negative rate (mathematically guaranteed)

Production Readiness: βœ… All SLA targets exceeded by significant margins

Security Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Zero Trust Security                      β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  Client Certificate ──mTLS──> Authentication Manager        β”‚
β”‚       β”‚                              β”‚                     β”‚
β”‚       └──> Principal Extraction ──> Authorization Manager  β”‚
β”‚                                       β”‚                     β”‚
β”‚               β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚               β”‚        Multi-Level ACL Cache               β”‚
β”‚               β”‚ L1 (547ns) β†’ L2 (1310ns) β†’ L3 Bloom (754ns)β”‚
β”‚               β”‚              β”‚                             β”‚
β”‚               β”‚              β–Ό                             β”‚
β”‚               β”‚         Controller ACL                      β”‚
β”‚               β”‚      (Raft Consensus)                      β”‚
β”‚               β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Quick Security Setup

# Initialize root CA for your organization
rustmq-admin ca init --cn "RustMQ Root CA" --org "MyOrg" --validity-years 10

# Issue broker certificate with proper role
rustmq-admin certs issue \
  --principal "broker-01.internal.company.com" \
  --role broker \
  --san "broker-01" \
  --san "192.168.1.100" \
  --validity-days 365

# Issue client certificate for application
rustmq-admin certs issue \
  --principal "[email protected]" \
  --role client \
  --validity-days 90

# Create comprehensive ACL rule with conditions
rustmq-admin acl create \
  --principal "[email protected]" \
  --resource "topic.events.*" \
  --permissions read,write \
  --effect allow \
  --conditions "source_ip=10.0.0.0/8,time_range=09:00-17:00"

# Test authorization before deployment
rustmq-admin acl test \
  --principal "[email protected]" \
  --resource "topic.events.user-login" \
  --operation read

# Check comprehensive security status
rustmq-admin security status

Security Components

Certificate Authority Management

  • Root CA Operations: Initialize, manage, and rotate certificate authorities
  • Intermediate CAs: Create delegated CAs for different environments or teams
  • Certificate Lifecycle: Issue, renew, rotate, and revoke certificates with audit trails
  • Automated Validation: Real-time certificate status checking and validation

mTLS Authentication

  • Mutual Authentication: Both client and server certificate validation
  • Principal Extraction: Automatic extraction of identity from certificate subjects
  • Certificate Caching: High-performance certificate validation with intelligent caching
  • Revocation Checking: Support for CRL and OCSP certificate revocation

Multi-Level Authorization

  • L1 Cache (Connection-Local): ~10ns lookup for frequently accessed permissions
  • L2 Cache (Broker-Wide): ~50ns lookup with LRU eviction and sharding
  • L3 Bloom Filter: ~20ns negative lookup rejection to reduce controller load
  • Batch Fetching: Intelligent batching reduces controller RPC load by 10-100x

Access Control Lists (ACL)

  • Resource Patterns: Flexible pattern matching for topics, consumer groups, and operations
  • Conditional Rules: IP-based, time-based, and custom condition support
  • Raft Consensus: Distributed ACL storage with strong consistency guarantees
  • Policy Management: Comprehensive policy creation, testing, and management tools

Security Documentation

Comprehensive security documentation is available:

Performance & Architecture

Operations & Configuration

Security Examples

Ready-to-use security examples and configurations:

# Basic mTLS setup
examples/security/basic_mtls_setup/

# ACL policy examples
examples/security/acl_policies/

# Certificate rotation workflow
examples/security/certificate_rotation/

# Security monitoring setup
examples/security/monitoring/

# Kubernetes security manifests
examples/security/kubernetes/

Production Security Features

Enterprise-Grade Authentication

  • Zero Trust Model: Every request requires valid certificate and authorization
  • Multi-Factor Security: Certificate-based identity plus ACL-based authorization
  • Audit Trails: Comprehensive logging of all security events and decisions
  • Performance Monitoring: Real-time security metrics and performance tracking

Advanced Authorization

  • Sub-100ns Performance: Production-optimized authorization with multi-level caching
  • Memory Efficient: String interning reduces memory usage by 60-80%
  • Highly Scalable: Linear performance scaling up to 100K ACL rules
  • Intelligent Caching: Bloom filters and LRU caches minimize controller load

Certificate Management

  • Automated Lifecycle: Automated certificate renewal and rotation capabilities
  • Simplified CA Architecture: Root CA only for improved performance and reduced complexity
  • Revocation Management: Real-time certificate revocation and status checking
  • Role-Based Certificates: Different certificate types for brokers, clients, and admins

Integration with RustMQ Components

  • QUIC Server: Enhanced with mTLS support for secure client connections
  • Admin REST API: Complete security API with 30+ endpoints for certificate and ACL management
  • Controller Cluster: Raft consensus integration for distributed ACL storage
  • Broker Network: Secure broker-to-broker communication with certificate validation
  • Monitoring: Security metrics integration with performance and health monitoring

πŸ› οΈ Admin REST API

RustMQ provides a comprehensive REST API for cluster management, monitoring, and operations. The Admin API includes real-time health tracking, topic management, broker monitoring, and operational metrics.

πŸš€ Key Features

  • Real-time Health Monitoring: Live broker health tracking with automatic timeout detection
  • Cluster Status: Comprehensive cluster health assessment with leadership tracking
  • Topic Management: CRUD operations for topics with partition and replication management
  • Broker Operations: Broker listing with health status and rack awareness
  • Operational Metrics: Uptime tracking and performance monitoring
  • Advanced Rate Limiting: Token bucket algorithm with configurable global, per-IP, and endpoint-specific limits
  • Production Ready: Comprehensive error handling and JSON API responses

πŸƒ Quick Start

Start the Admin API server:

# Start with default settings (port 8080)
./target/release/rustmq-admin serve-api

# Start on custom port
./target/release/rustmq-admin serve-api 9642

# Docker environment (included in docker-compose)
docker-compose up -d
# Admin API available at http://localhost:9642

πŸ“Š Health Monitoring

The Admin API provides comprehensive health monitoring with real-time broker tracking:

Health Endpoint

# Check service health and uptime
curl http://localhost:8080/health

Response:

{
  "status": "ok",
  "version": "0.1.0",
  "uptime_seconds": 3600,
  "is_leader": true,
  "raft_term": 1
}

Cluster Status

# Get comprehensive cluster status
curl http://localhost:8080/api/v1/cluster

Response:

{
  "success": true,
  "data": {
    "brokers": [
      {
        "id": "broker-1",
        "host": "localhost",
        "port_quic": 9092,
        "port_rpc": 9093,
        "rack_id": "rack-1",
        "online": true
      },
      {
        "id": "broker-2",
        "host": "localhost",
        "port_quic": 9192,
        "port_rpc": 9193,
        "rack_id": "rack-2", 
        "online": false
      }
    ],
    "topics": [],
    "leader": "controller-1",
    "term": 1,
    "healthy": true
  },
  "error": null,
  "leader_hint": null
}

πŸ“‹ Topic Management

List Topics

curl http://localhost:8080/api/v1/topics

Create Topic

curl -X POST http://localhost:8080/api/v1/topics \
  -H "Content-Type: application/json" \
  -d '{
    "name": "user-events",
    "partitions": 12,
    "replication_factor": 3,
    "retention_ms": 604800000,
    "segment_bytes": 1073741824,
    "compression_type": "lz4"
  }'

Response:

{
  "success": true,
  "data": "Topic 'user-events' created",
  "error": null,
  "leader_hint": "controller-1"
}

Describe Topic

curl http://localhost:8080/api/v1/topics/user-events

Response:

{
  "success": true,
  "data": {
    "name": "user-events",
    "partitions": 12,
    "replication_factor": 3,
    "config": {
      "retention_ms": 604800000,
      "segment_bytes": 1073741824,
      "compression_type": "lz4"
    },
    "created_at": "2024-01-15T10:30:00Z",
    "partition_assignments": [
      {
        "partition": 0,
        "leader": "broker-1",
        "replicas": ["broker-1", "broker-2", "broker-3"],
        "in_sync_replicas": ["broker-1", "broker-2"],
        "leader_epoch": 1
      }
    ]
  },
  "error": null,
  "leader_hint": "controller-1"
}

Delete Topic

curl -X DELETE http://localhost:8080/api/v1/topics/user-events

πŸ–₯️ Broker Management

List Brokers

curl http://localhost:8080/api/v1/brokers

Response:

{
  "success": true,
  "data": [
    {
      "id": "broker-1",
      "host": "localhost",
      "port_quic": 9092,
      "port_rpc": 9093,
      "rack_id": "us-central1-a",
      "online": true
    },
    {
      "id": "broker-2", 
      "host": "localhost",
      "port_quic": 9192,
      "port_rpc": 9193,
      "rack_id": "us-central1-b",
      "online": true
    }
  ],
  "error": null,
  "leader_hint": "controller-1"
}

πŸ”§ Health Tracking System

The Admin API includes a sophisticated health tracking system with comprehensive broker health monitoring:

Features

  • Background Health Monitoring: Automatic health checks every 15 seconds
  • Timeout-based Health Assessment: Configurable 30-second health timeout
  • Intelligent Cluster Health: Smart health calculation for small clusters
  • Real-time Updates: Live health status in all broker-related endpoints
  • Stale Entry Cleanup: Automatic cleanup of old health data
  • πŸ†• Broker Health Check API: Comprehensive broker health assessment with component-level monitoring

Health Check Logic

  • Healthy: Last successful health check within 30 seconds
  • Unhealthy: No successful health check or timeout exceeded
  • Cluster Health: For ≀2 brokers: healthy if β‰₯1 broker healthy + leader exists
  • Large Clusters: Healthy if majority of brokers healthy + leader exists

Broker Health Check

The newly implemented broker health check provides detailed component-level monitoring:

  • WAL Health: Write-ahead log performance and status monitoring
  • Cache Health: Memory cache hit rates and efficiency metrics
  • Object Storage Health: Cloud storage connectivity and upload performance
  • Network Health: Connection status and throughput monitoring
  • Replication Health: Follower sync status and replication lag tracking
  • Resource Usage: CPU, memory, disk, and network utilization statistics

For detailed configuration and usage, see Broker Health Monitoring.

🚨 Error Handling

The Admin API provides comprehensive error handling with detailed responses:

Error Response Format

{
  "success": false,
  "data": null,
  "error": "Detailed error message",
  "leader_hint": "controller-2"
}

Common Error Scenarios

  • Topic Not Found (404): Topic doesn't exist
  • Insufficient Brokers: Not enough brokers for replication factor
  • Leader Not Available: Controller leader election in progress
  • Invalid Configuration: Malformed request parameters

πŸ›‘οΈ Rate Limiting

The Admin API includes sophisticated rate limiting to protect against abuse and ensure fair resource usage. Rate limiting is implemented using the Token Bucket algorithm with configurable limits for different scenarios.

πŸš€ Key Features

  • Token Bucket Algorithm: Industry-standard rate limiting with burst capacity
  • Multi-level Rate Limiting: Global, per-IP, and endpoint-specific limits
  • Automatic Cleanup: Background cleanup of expired rate limiters to prevent memory leaks
  • Comprehensive Monitoring: Real-time metrics and statistics tracking
  • Production Ready: Thread-safe implementation with minimal performance overhead

πŸ“Š Rate Limiting Categories

The Admin API applies different rate limits based on endpoint sensitivity and resource requirements:

High-Frequency Endpoints (100 requests/minute)
  • GET /health - Service health checks
  • GET /api/v1/cluster - Cluster status monitoring
Medium-Frequency Endpoints (30 requests/minute)
  • GET /api/v1/topics - Topic listing
  • GET /api/v1/brokers - Broker listing
  • GET /api/v1/topics/{name} - Topic details
Low-Frequency Endpoints (10 requests/minute)
  • POST /api/v1/topics - Topic creation
  • DELETE /api/v1/topics/{name} - Topic deletion

βš™οΈ Configuration

Rate limiting can be configured through TOML configuration or environment variables:

TOML Configuration
[admin.rate_limiting]
enabled = true                      # Enable/disable rate limiting (default: true)
global_burst_size = 1000           # Global burst capacity (default: 1000)
global_refill_rate = 60            # Global refill rate per minute (default: 60)
per_ip_burst_size = 100            # Per-IP burst capacity (default: 100)
per_ip_refill_rate = 30            # Per-IP refill rate per minute (default: 30)
cleanup_interval_seconds = 3600    # Cleanup interval in seconds (default: 3600)

# Endpoint-specific configuration
[admin.rate_limiting.endpoints]
"/health" = { burst_size = 50, refill_rate = 100 }
"/api/v1/cluster" = { burst_size = 50, refill_rate = 100 }
"/api/v1/topics" = { burst_size = 20, refill_rate = 30 }
"/api/v1/brokers" = { burst_size = 20, refill_rate = 30 }
"POST:/api/v1/topics" = { burst_size = 5, refill_rate = 10 }
"DELETE:/api/v1/topics" = { burst_size = 5, refill_rate = 10 }
Environment Variables
# Global rate limiting settings
RUSTMQ_ADMIN_RATE_LIMITING_ENABLED=true
RUSTMQ_ADMIN_GLOBAL_BURST_SIZE=1000
RUSTMQ_ADMIN_GLOBAL_REFILL_RATE=60
RUSTMQ_ADMIN_PER_IP_BURST_SIZE=100
RUSTMQ_ADMIN_PER_IP_REFILL_RATE=30
RUSTMQ_ADMIN_CLEANUP_INTERVAL_SECONDS=3600

πŸ” Rate Limit Headers

All API responses include rate limiting information in the headers:

# Example response headers
HTTP/1.1 200 OK
X-RateLimit-Limit: 30              # Requests per minute allowed
X-RateLimit-Remaining: 25          # Remaining requests in current window
X-RateLimit-Reset: 1640995260      # Unix timestamp when limit resets
X-RateLimit-Type: endpoint         # Type of rate limit applied (global/ip/endpoint)

🚫 Rate Limit Exceeded Response

When rate limits are exceeded, the API returns a 429 status code:

# Request
curl -H "X-Forwarded-For: 192.168.1.100" http://localhost:8080/api/v1/topics

# Response when rate limited
HTTP/1.1 429 Too Many Requests
X-RateLimit-Limit: 30
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1640995320
X-RateLimit-Type: ip
Retry-After: 60

{
  "success": false,
  "data": null,
  "error": "Rate limit exceeded for IP 192.168.1.100. Limit: 30 requests per minute",
  "leader_hint": null
}

🎯 Rate Limiting Strategy

The Admin API employs a hierarchical rate limiting strategy:

  1. Global Rate Limit: Applied to all requests to prevent system overload
  2. Per-IP Rate Limit: Applied per client IP address to prevent individual abuse
  3. Endpoint-Specific Rate Limit: Applied per endpoint based on resource intensity

Rate limits are checked in order, and the most restrictive limit applies. For example:

  • Global limit: 60 requests/minute
  • Per-IP limit: 30 requests/minute
  • Endpoint limit: 10 requests/minute
  • Result: Client is limited to 10 requests/minute for that endpoint

πŸ”§ Operational Benefits

Security
  • DDoS Protection: Prevents overwhelming the API with excessive requests
  • Resource Protection: Ensures critical operations aren't starved by high-frequency requests
  • Fair Usage: Prevents individual clients from monopolizing resources
Performance
  • Memory Efficient: Automatic cleanup prevents unbounded memory growth
  • Low Latency: Token bucket algorithm adds minimal overhead (<1ΞΌs per request)
  • Thread Safe: Concurrent request handling without performance degradation

πŸ“ˆ Monitoring Rate Limiting

Rate limiting statistics are available through the health endpoint:

# Check rate limiting statistics
curl http://localhost:8080/health

# Response includes rate limiting metrics
{
  "status": "ok",
  "version": "0.1.0", 
  "uptime_seconds": 3600,
  "is_leader": true,
  "raft_term": 1,
  "rate_limiting": {
    "enabled": true,
    "active_limiters": 15,
    "total_requests": 1250,
    "blocked_requests": 25,
    "last_cleanup": "2024-01-15T10:30:00Z"
  }
}

πŸ› οΈ Development and Testing

For development environments, rate limiting can be disabled or configured with higher limits:

# Development configuration
[admin.rate_limiting]
enabled = false                     # Disable for local development

# Or use high limits for testing
enabled = true
global_refill_rate = 10000         # Very high global limit
per_ip_refill_rate = 1000          # High per-IP limit

πŸš€ Production Recommendations

For production deployments:

  1. Monitor Rate Limiting Metrics: Track blocked requests and adjust limits as needed
  2. Configure Endpoint-Specific Limits: Set appropriate limits based on operational patterns
  3. Use Load Balancers: Distribute traffic across multiple Admin API instances
  4. Alert on High Block Rates: Set up alerts if > 5% of requests are being blocked
  5. Regular Review: Periodically review and adjust rate limits based on usage patterns

πŸ“ˆ Production Deployment

Production Deployment

For production deployment with Kubernetes, see the comprehensive guide in docker/README.md which includes:

  • Complete Kubernetes manifests
  • Service configuration
  • Health check setup
  • Production resource limits
  • Security configurations

πŸ§ͺ Testing

The Admin API includes comprehensive test coverage:

  • 11 Unit Tests: All API endpoints and health tracking functionality
  • Integration Testing: End-to-end API workflows with mock backends
  • Error Scenario Testing: Comprehensive error condition validation
  • Performance Testing: Health tracking timeout and expiration behavior

Running Tests

# Run admin API tests
cargo test admin::api

# Run specific health tracking tests
cargo test test_broker_health_tracking test_cluster_health_calculation

# All admin tests pass
# test result: ok. 11 passed; 0 failed; 0 ignored; 0 measured

πŸ”§ Configuration

Environment Variables

# Admin API configuration
ADMIN_API_PORT=8080
HEALTH_CHECK_INTERVAL=15    # seconds
HEALTH_TIMEOUT=30          # seconds

TOML Configuration

[admin]
port = 8080
health_check_interval_ms = 15000
health_timeout_ms = 30000
enable_cors = true
log_requests = true

πŸ” Monitoring Integration

The Admin API provides monitoring endpoints for observability:

Metrics Endpoint (Future)

# Prometheus metrics (planned)
curl http://localhost:8080/metrics

Log Analysis

# View API request logs
docker-compose logs rustmq-admin

# Filter for health check logs
docker-compose logs rustmq-admin | grep "Health check"

πŸ“Š BigQuery Subscriber

RustMQ includes a configurable Google Cloud BigQuery subscriber that can stream messages from RustMQ topics directly to BigQuery tables with high throughput and reliability.

Key Features

  • Streaming Inserts: Direct streaming to BigQuery using the insertAll API
  • Storage Write API: Future support for BigQuery Storage Write API (higher throughput)
  • Configurable Batching: Optimize for latency vs throughput with flexible batching
  • Schema Mapping: Direct, custom, or nested JSON field mapping
  • Error Handling: Comprehensive retry logic with dead letter handling
  • Monitoring: Built-in health checks and metrics endpoints
  • Authentication: Support for service account, metadata server, and application default credentials

Quick Start with BigQuery

# Set required environment variables
export GCP_PROJECT_ID="your-gcp-project"
export BIGQUERY_DATASET="analytics"
export BIGQUERY_TABLE="events"
export RUSTMQ_TOPIC="user-events"

# Optional: Set authentication method
export AUTH_METHOD="application_default"  # or "service_account"
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account.json"

# Start the cluster with BigQuery subscriber (from docker/ directory)
cd docker && docker-compose --profile bigquery up -d

# Check BigQuery subscriber health
curl http://localhost:8080/health

# View metrics
curl http://localhost:8080/metrics

Configuration Options

The BigQuery subscriber supports extensive configuration through environment variables:

Required Configuration

  • GCP_PROJECT_ID - Google Cloud Project ID
  • BIGQUERY_DATASET - BigQuery dataset name
  • BIGQUERY_TABLE - BigQuery table name
  • RUSTMQ_TOPIC - RustMQ topic to subscribe to

Authentication

  • AUTH_METHOD - Authentication method (application_default, service_account, metadata_server)
  • GOOGLE_APPLICATION_CREDENTIALS - Path to service account key file

Batching & Performance

  • MAX_ROWS_PER_BATCH - Maximum rows per batch (default: 1000)
  • MAX_BATCH_SIZE_BYTES - Maximum batch size in bytes (default: 10MB)
  • MAX_BATCH_LATENCY_MS - Maximum time to wait before sending partial batch (default: 1000ms)
  • MAX_CONCURRENT_BATCHES - Maximum concurrent batches (default: 10)

Schema Mapping

  • SCHEMA_MAPPING - Mapping strategy (direct, custom, nested)
  • AUTO_CREATE_TABLE - Whether to auto-create table if not exists (default: false)

Error Handling

  • MAX_RETRIES - Maximum retry attempts (default: 3)
  • DEAD_LETTER_ACTION - Action for failed messages (log, drop, dead_letter_queue, file)
  • RETRY_BASE_MS - Base retry delay in milliseconds (default: 1000)
  • RETRY_MAX_MS - Maximum retry delay in milliseconds (default: 30000)

Usage Examples

Basic Streaming Configuration

# Start with basic streaming inserts
docker run --rm \
  -e GCP_PROJECT_ID="my-project" \
  -e BIGQUERY_DATASET="analytics" \
  -e BIGQUERY_TABLE="events" \
  -e RUSTMQ_TOPIC="user-events" \
  -e RUSTMQ_BROKERS="rustmq-broker:9092" \
  rustmq/bigquery-subscriber

High-Throughput Configuration

# Optimized for high throughput
docker run --rm \
  -e GCP_PROJECT_ID="my-project" \
  -e BIGQUERY_DATASET="telemetry" \
  -e BIGQUERY_TABLE="metrics" \
  -e RUSTMQ_TOPIC="telemetry-data" \
  -e MAX_ROWS_PER_BATCH="5000" \
  -e MAX_BATCH_SIZE_BYTES="52428800" \
  -e MAX_BATCH_LATENCY_MS="500" \
  -e MAX_CONCURRENT_BATCHES="50" \
  rustmq/bigquery-subscriber

Custom Schema Mapping

Create a custom configuration file:

# bigquery-config.toml
project_id = "my-project"
dataset = "transformed_data"
table = "processed_events"

[write_method.streaming_inserts]
skip_invalid_rows = true
ignore_unknown_values = true

[subscription]
topic = "raw-events"
broker_endpoints = ["rustmq-broker:9092"]

[schema]
mapping = "custom"

[schema.column_mappings]
"event_id" = "id"
"event_timestamp" = "timestamp" 
"user_data.user_id" = "user_id"
"event_data.action" = "action"

[schema.default_values]
"processed_at" = "CURRENT_TIMESTAMP()"
"version" = "1.0"
# Use custom configuration
docker run --rm \
  -v $(pwd)/bigquery-config.toml:/etc/rustmq/custom-config.toml \
  -e CONFIG_FILE="/etc/rustmq/custom-config.toml" \
  rustmq/bigquery-subscriber

Monitoring and Observability

The BigQuery subscriber exposes health and metrics endpoints:

# Health check endpoint
curl http://localhost:8080/health
# Response: {"status":"healthy","last_successful_insert":"2023-...", ...}

# Metrics endpoint  
curl http://localhost:8080/metrics
# Response: {"messages_received":1500,"messages_processed":1487, ...}

Error Handling and Reliability

The subscriber includes comprehensive error handling:

  • Automatic Retries: Configurable exponential backoff for transient errors
  • Dead Letter Handling: Failed messages can be logged, dropped, or sent to dead letter queue
  • Health Monitoring: Continuous health checks with degraded/unhealthy states
  • Graceful Shutdown: Ensures all pending batches are processed during shutdown

Production Deployment

For production deployments:

  1. Use Service Account Authentication:

    export AUTH_METHOD="service_account"
    export GOOGLE_APPLICATION_CREDENTIALS="/etc/gcp/service-account.json"
  2. Optimize Batching for Your Workload:

    • High volume: Increase batch size and reduce latency
    • Low latency: Reduce batch size and latency threshold
    • Mixed workload: Use default settings
  3. Monitor Key Metrics:

    • Messages processed per second
    • Error rate and retry counts
    • BigQuery insertion latency
    • Backlog size
  4. Set Up Alerting:

    • Health endpoint failures
    • High error rates (>5%)
    • Growing backlog size
    • BigQuery quota issues

🧠 WebAssembly ETL Processing

RustMQ features a powerful WebAssembly (WASM) ETL system that transforms messages in real-time as they flow through the system. This allows you to process, filter, and enrich data without additional infrastructure.

What is WASM ETL?

WASM ETL lets you write custom code in Rust (or other languages) that runs safely inside RustMQ to process messages. It's like having a tiny, secure computer program that can:

  • Transform data: Convert temperature units, normalize emails, add timestamps
  • Filter messages: Remove spam, block oversized messages, require certain headers
  • Enrich content: Add geolocation, detect language, analyze content type
  • Split or combine: Turn one message into many, or combine multiple messages

Key Benefits

  • Safe and Secure: Code runs in a sandbox - can't access files, network, or harm the system
  • High Performance: Near-native speed with efficient binary processing
  • Hot Deployment: Update processing logic without restarting RustMQ
  • Priority-Based: Process messages in stages with different priorities
  • Smart Filtering: Only process messages that match specific topics or conditions

Simple Example

Here's what a basic message processor looks like:

// Transform JSON messages to add processing info
if message.headers.get("content-type") == Some("application/json") {
    let mut json: Value = serde_json::from_slice(&message.value)?;
    json["processed_at"] = chrono::Utc::now().timestamp().into();
    json["processor"] = "my-etl-v1".into();
    message.value = serde_json::to_vec(&json)?;
}

Multi-Stage Processing

Configure complex pipelines that process messages in stages:

  1. Priority 0 (First): Validate message format and required fields
  2. Priority 1 (Second): Transform data and add enrichments (can run in parallel)
  3. Priority 2 (Last): Final formatting and cleanup

Topic Filtering

Only process messages from specific topics using patterns:

  • Exact: "events.user.login" - matches exactly
  • Wildcard: "logs.*.error" - matches any middle part
  • Regex: "^sensor-\\d+\\." - complex pattern matching
  • Prefix/Suffix: "iot.devices." or ".critical"

Getting Started

The complete WASM ETL system includes priority-based pipelines, instance pooling, smart filtering, and comprehensive monitoring. For detailed setup instructions, examples, and advanced configuration:

πŸ“– Complete WASM ETL Deployment Guide

This guide covers:

  • Step-by-step setup and configuration
  • Writing production-ready ETL modules in Rust
  • Building and deploying WASM modules
  • Configuring multi-stage processing pipelines
  • Advanced filtering and conditional processing
  • Performance optimization and monitoring
  • Troubleshooting and best practices

☁️ Google Cloud Platform Setup

Step 1: GCP Project Setup

# Set your project ID
export PROJECT_ID="your-rustmq-project"
export REGION="us-central1"
export ZONE="us-central1-a"

# Create and configure project
gcloud projects create $PROJECT_ID
gcloud config set project $PROJECT_ID
gcloud auth login

# Enable required APIs
gcloud services enable container.googleapis.com
gcloud services enable storage-api.googleapis.com
gcloud services enable compute.googleapis.com
gcloud services enable cloudresourcemanager.googleapis.com

Step 2: GKE Cluster Setup

# Create GKE cluster with optimized node pools
gcloud container clusters create rustmq-cluster \
    --zone=$ZONE \
    --machine-type=n2-standard-4 \
    --num-nodes=3 \
    --enable-autorepair \
    --enable-autoupgrade \
    --enable-network-policy \
    --enable-ip-alias \
    --disk-type=pd-ssd \
    --disk-size=50GB \
    --max-nodes=10 \
    --min-nodes=3 \
    --enable-autoscaling

# Get credentials
gcloud container clusters get-credentials rustmq-cluster --zone=$ZONE

# Create storage class for fast SSD
kubectl apply -f - <<EOF
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: fast-ssd
provisioner: kubernetes.io/gce-pd
parameters:
  type: pd-ssd
  replication-type: regional-pd
  zones: us-central1-a,us-central1-b
allowVolumeExpansion: true
reclaimPolicy: Delete
volumeBindingMode: WaitForFirstConsumer
EOF

Step 3: Cloud Storage Setup

# Create bucket for object storage
gsutil mb -c STANDARD -l $REGION gs://$PROJECT_ID-rustmq-data

# Enable versioning and lifecycle management
gsutil versioning set on gs://$PROJECT_ID-rustmq-data

# Create lifecycle policy for cost optimization
cat > lifecycle.json <<EOF
{
  "lifecycle": {
    "rule": [
      {
        "action": {"type": "SetStorageClass", "storageClass": "NEARLINE"},
        "condition": {"age": 30}
      },
      {
        "action": {"type": "SetStorageClass", "storageClass": "COLDLINE"},
        "condition": {"age": 90}
      },
      {
        "action": {"type": "Delete"},
        "condition": {"age": 365}
      }
    ]
  }
}
EOF

gsutil lifecycle set lifecycle.json gs://$PROJECT_ID-rustmq-data

Step 4: Service Account Setup

# Create service account for RustMQ
gcloud iam service-accounts create rustmq-sa \
    --display-name="RustMQ Service Account" \
    --description="Service account for RustMQ cluster operations"

# Grant necessary permissions
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:rustmq-sa@$PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/storage.objectAdmin"

gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:rustmq-sa@$PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/monitoring.writer"

# Create and download key
gcloud iam service-accounts keys create rustmq-key.json \
    --iam-account=rustmq-sa@$PROJECT_ID.iam.gserviceaccount.com

# Create Kubernetes secret
kubectl create secret generic rustmq-gcp-credentials \
    --from-file=key.json=rustmq-key.json

Step 5: Networking Setup

# Create firewall rules for RustMQ
gcloud compute firewall-rules create rustmq-quic \
    --allow tcp:9092,udp:9092 \
    --source-ranges 0.0.0.0/0 \
    --description "RustMQ QUIC traffic"

gcloud compute firewall-rules create rustmq-rpc \
    --allow tcp:9093 \
    --source-ranges 10.0.0.0/8 \
    --description "RustMQ internal RPC traffic"

gcloud compute firewall-rules create rustmq-admin \
    --allow tcp:9642 \
    --source-ranges 0.0.0.0/0 \
    --description "RustMQ admin API"

βš™οΈ Configuration

RustMQ provides a comprehensive configuration system with optimized settings for development, testing, and production environments. For detailed configuration guide, see Configuration Guide. For comprehensive testing infrastructure details, see Testing Infrastructure Guide.

Configuration Files Overview

RustMQ includes well-structured configuration files for different environments:

πŸ§ͺ Testing Configurations (Ready to Use)

  • config/test-broker.toml - Optimized for unit/integration tests with /tmp storage, disabled fsync, and fast timeouts
  • config/test-controller.toml - Test controller with local addresses and temporary directories

πŸ› οΈ Development Configurations (Ready to Use)

  • config/broker-dev.toml - Development broker with local paths and debug logging
  • config/controller-dev.toml - Development controller with self-signed certificates
  • config/example-development.toml - Comprehensive development template with detailed comments

🏭 Production Configurations (Ready to Use)

  • config/broker.toml - Production broker configuration
  • config/controller.toml - Production controller configuration
  • config/example-production.toml - Production template with enterprise settings

Quick Start

# Testing (use existing optimized configs)
cargo test --lib  # Uses test configs automatically
RUSTMQ_BROKER_CONFIG=config/test-broker.toml cargo test --test integration

# Development (ready-to-use configs)  
cargo run --bin rustmq-broker -- --config config/broker-dev.toml
cargo run --bin rustmq-controller -- --config config/controller-dev.toml

# Production (customize from templates)
cp config/example-production.toml config/my-production.toml
# Edit my-production.toml for your environment
cargo run --bin rustmq-broker -- --config config/my-production.toml

Key Configuration Features

  • πŸ“ Environment-Specific: Separate configs for test/dev/prod with optimal defaults
  • πŸ”§ No New Files Needed: Existing configurations cover all use cases
  • ⚑ Performance Optimized: Test configs use /tmp storage and disabled fsync for speed
  • πŸ”’ Security Ready: Development configs include mTLS with simplified certificate chains
  • ☁️ Cloud Native: Production configs optimized for GCP with proper storage backends

Broker Configuration (broker.toml)

[broker]
id = "broker-001"                    # Unique broker identifier
rack_id = "us-central1-a"            # Availability zone for rack awareness

[network]
quic_listen = "0.0.0.0:9092"        # QUIC/HTTP3 client endpoint
rpc_listen = "0.0.0.0:9093"         # Internal gRPC endpoint
max_connections = 10000              # Maximum concurrent connections
connection_timeout_ms = 30000        # Connection timeout

# QUIC-specific configuration
[network.quic_config]
max_concurrent_uni_streams = 1000
max_concurrent_bidi_streams = 1000
max_idle_timeout_ms = 30000
max_stream_data = 1024000
max_connection_data = 10240000

[wal]
path = "/var/lib/rustmq/wal"        # WAL storage path
capacity_bytes = 10737418240        # 10GB WAL capacity
fsync_on_write = true               # Force sync on write (durability)
segment_size_bytes = 1073741824     # 1GB segment size
buffer_size = 65536                 # 64KB buffer size
upload_interval_ms = 600000         # 10 minutes upload interval
flush_interval_ms = 1000            # 1 second flush interval

[cache]
write_cache_size_bytes = 1073741824  # 1GB hot data cache
read_cache_size_bytes = 2147483648   # 2GB cold data cache
eviction_policy = "Moka"             # Cache eviction policy (Moka/Lru/Lfu/Random)

[object_storage]
storage_type = "S3"                 # Storage backend (S3/Gcs/Azure/Local)
bucket = "rustmq-data"              # Storage bucket name
region = "us-central1"              # Storage region
endpoint = "http://minio:9000"      # Storage endpoint
access_key = "rustmq-access-key"    # Optional: Access key
secret_key = "rustmq-secret-key"    # Optional: Secret key
multipart_threshold = 104857600     # 100MB multipart upload threshold
max_concurrent_uploads = 10         # Concurrent upload limit

[controller]
endpoints = ["controller-1:9094", "controller-2:9094", "controller-3:9094"]
election_timeout_ms = 5000          # Leader election timeout
heartbeat_interval_ms = 1000        # Heartbeat frequency

[replication]
min_in_sync_replicas = 2            # Minimum replicas for acknowledgment
ack_timeout_ms = 5000               # Replication acknowledgment timeout
max_replication_lag = 1000          # Maximum acceptable lag
heartbeat_timeout_ms = 30000        # Follower heartbeat timeout (30 seconds)

[etl]
enabled = true                      # Enable WebAssembly ETL processing
memory_limit_bytes = 67108864       # 64MB memory limit per module
execution_timeout_ms = 5000         # Execution timeout
max_concurrent_executions = 100     # Concurrent execution limit

[scaling]
max_concurrent_additions = 3        # Max brokers added simultaneously
max_concurrent_decommissions = 1    # Max brokers decommissioned simultaneously
rebalance_timeout_ms = 300000       # Partition rebalancing timeout
traffic_migration_rate = 0.1        # Traffic migration rate per minute
health_check_timeout_ms = 30000     # Health check timeout

[operations]
allow_runtime_config_updates = true # Enable runtime config updates
upgrade_velocity = 3                # Brokers upgraded per minute
graceful_shutdown_timeout_ms = 30000 # Graceful shutdown timeout

[operations.kubernetes]
use_stateful_sets = true            # Use StatefulSets for deployment
pvc_storage_class = "fast-ssd"      # Storage class for persistent volumes
wal_volume_size = "50Gi"            # WAL volume size
enable_pod_affinity = true          # Enable pod affinity for volume attachment

Controller Configuration (controller.toml)

[controller]
node_id = "controller-001"               # Unique controller identifier
raft_listen = "0.0.0.0:9095"           # Raft consensus endpoint
rpc_listen = "0.0.0.0:9094"            # Internal gRPC endpoint
http_listen = "0.0.0.0:9642"           # Admin REST API endpoint

[raft]
peers = [
  "controller-1@controller-1:9095",
  "controller-2@controller-2:9095", 
  "controller-3@controller-3:9095"
]
election_timeout_ms = 5000              # Leader election timeout
heartbeat_interval_ms = 1000            # Heartbeat frequency

[admin]
port = 9642                             # Admin REST API port
health_check_interval_ms = 15000        # Health check interval
health_timeout_ms = 30000               # Health timeout
enable_cors = true                      # Enable CORS headers
log_requests = true                     # Log API requests

# Rate limiting configuration for Admin REST API
[admin.rate_limiting]
enabled = true                          # Enable rate limiting (default: true)
global_burst_size = 1000               # Global burst capacity
global_refill_rate = 60                # Global requests per minute
per_ip_burst_size = 100                # Per-IP burst capacity  
per_ip_refill_rate = 30                # Per-IP requests per minute
cleanup_interval_seconds = 3600        # Cleanup expired limiters (1 hour)

# Endpoint-specific rate limits
[admin.rate_limiting.endpoints]
"/health" = { burst_size = 50, refill_rate = 100 }
"/api/v1/cluster" = { burst_size = 50, refill_rate = 100 }
"/api/v1/topics" = { burst_size = 20, refill_rate = 30 }
"/api/v1/brokers" = { burst_size = 20, refill_rate = 30 }
"POST:/api/v1/topics" = { burst_size = 5, refill_rate = 10 }
"DELETE:/api/v1/topics" = { burst_size = 5, refill_rate = 10 }

[autobalancer]
enabled = true                          # Enable auto-balancing
cpu_threshold = 0.80                   # CPU threshold for rebalancing
memory_threshold = 0.75                # Memory threshold for rebalancing
cooldown_seconds = 300                 # Cooldown between rebalancing operations

Environment Variables

# Core settings
RUSTMQ_BROKER_ID=broker-001
RUSTMQ_RACK_ID=us-central1-a
RUSTMQ_LOG_LEVEL=info

# Storage settings
RUSTMQ_WAL_PATH=/var/lib/rustmq/wal
RUSTMQ_STORAGE_BUCKET=rustmq-data
RUSTMQ_STORAGE_REGION=us-central1

# GCP settings
GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json
GCP_PROJECT_ID=your-project-id

# Performance tuning
RUSTMQ_CACHE_SIZE=2147483648
RUSTMQ_MAX_CONNECTIONS=10000
RUSTMQ_BATCH_SIZE=1000

# Admin API rate limiting settings
RUSTMQ_ADMIN_RATE_LIMITING_ENABLED=true
RUSTMQ_ADMIN_GLOBAL_BURST_SIZE=1000
RUSTMQ_ADMIN_GLOBAL_REFILL_RATE=60
RUSTMQ_ADMIN_PER_IP_BURST_SIZE=100
RUSTMQ_ADMIN_PER_IP_REFILL_RATE=30
RUSTMQ_ADMIN_CLEANUP_INTERVAL_SECONDS=3600

πŸ”§ Message Broker Core API

RustMQ now includes a fully implemented high-level Message Broker Core that provides intuitive producer and consumer APIs with comprehensive error handling, automatic partition management, and flexible acknowledgment levels.

Architecture Overview

The Message Broker Core is built on a modular architecture that integrates seamlessly with RustMQ's distributed storage and replication systems:

use rustmq::broker::core::*;

// Create a broker core instance with your storage backends
let core = MessageBrokerCore::new(
    wal,               // Write-Ahead Log implementation
    object_storage,    // Object storage backend (S3/GCS/Azure)
    cache,             // Distributed cache layer
    replication_manager, // Replication coordinator
    network_handler,   // Network communication handler
    broker_id,         // Unique broker identifier
);

Producer API

The Producer trait provides a simple, high-performance interface for message production:

#[async_trait]
pub trait Producer {
    /// Send a single record to a topic-partition
    async fn send(&self, record: ProduceRecord) -> Result<ProduceResult>;
    
    /// Send a batch of records for optimized throughput
    async fn send_batch(&self, records: Vec<ProduceRecord>) -> Result<Vec<ProduceResult>>;
    
    /// Flush any pending records to ensure durability
    async fn flush(&self) -> Result<()>;
}

Single Message Production

let producer = core.create_producer();

let record = ProduceRecord {
    topic: "user-events".to_string(),
    partition: Some(0),                    // Optional: let RustMQ choose partition
    key: Some(b"user123".to_vec()),
    value: b"login_event".to_vec(),
    headers: vec![Header {
        key: "content-type".to_string(),
        value: b"application/json".to_vec(),
    }],
    acks: AcknowledgmentLevel::All,        // Wait for all replicas
    timeout_ms: 5000,
};

let result = producer.send(record).await?;
println!("Message produced at offset: {}", result.offset);

Batch Production for High Throughput

let mut batch = Vec::new();
for i in 0..1000 {
    batch.push(ProduceRecord {
        topic: "metrics".to_string(),
        partition: None,  // Auto-partition based on key hash
        key: Some(format!("sensor_{}", i % 10).into_bytes()),
        value: format!("{{\"value\": {}, \"timestamp\": {}}}", i, timestamp).into_bytes(),
        headers: vec![],
        acks: AcknowledgmentLevel::Leader,  // Faster acknowledgment
        timeout_ms: 1000,
    });
}

let results = producer.send_batch(batch).await?;
println!("Produced {} messages", results.len());

Consumer API

The Consumer trait provides flexible message consumption with automatic offset management:

#[async_trait]
pub trait Consumer {
    /// Subscribe to one or more topics
    async fn subscribe(&mut self, topics: Vec<TopicName>) -> Result<()>;
    
    /// Poll for new records with configurable timeout
    async fn poll(&mut self, timeout_ms: u32) -> Result<Vec<ConsumeRecord>>;
    
    /// Commit specific offsets for durability
    async fn commit_offsets(&mut self, offsets: HashMap<TopicPartition, Offset>) -> Result<()>;
    
    /// Seek to a specific offset for replay scenarios
    async fn seek(&mut self, topic_partition: TopicPartition, offset: Offset) -> Result<()>;
}

Basic Consumer Usage

let mut consumer = core.create_consumer("analytics-group".to_string());

// Subscribe to topics
consumer.subscribe(vec!["user-events".to_string(), "orders".to_string()]).await?;

// Consume messages
loop {
    let records = consumer.poll(1000).await?;
    
    for record in records {
        println!("Received: topic={}, partition={}, offset={}", 
                 record.topic_partition.topic,
                 record.topic_partition.partition,
                 record.offset);
        
        // Process your message
        process_message(&record.value).await?;
        
        // Optional: Manual offset commit for exactly-once processing
        let mut offsets = HashMap::new();
        offsets.insert(record.topic_partition.clone(), record.offset + 1);
        consumer.commit_offsets(offsets).await?;
    }
}

Consumer Seek for Message Replay

// Replay messages from a specific point in time
let topic_partition = TopicPartition {
    topic: "user-events".to_string(),
    partition: 0,
};

// Seek to offset 1000 to replay messages
consumer.seek(topic_partition, 1000).await?;

// Continue normal polling - will start from offset 1000
let records = consumer.poll(5000).await?;

Acknowledgment Levels

RustMQ supports flexible acknowledgment levels for different durability and performance requirements:

use rustmq::types::AcknowledgmentLevel;

// Maximum performance - fire and forget
acks: AcknowledgmentLevel::None,

// Fast acknowledgment - leader only
acks: AcknowledgmentLevel::Leader,

// High availability - majority of replicas
acks: AcknowledgmentLevel::Majority,

// Maximum durability - all replicas
acks: AcknowledgmentLevel::All,

// Custom requirement - specific number of replicas
acks: AcknowledgmentLevel::Custom(3),

Error Handling

The Broker Core provides comprehensive error handling with detailed error types:

use rustmq::error::RustMqError;

match producer.send(record).await {
    Ok(result) => println!("Success: offset {}", result.offset),
    Err(RustMqError::NotLeader(partition)) => {
        println!("Not leader for partition: {}", partition);
        // Retry with updated metadata
    },
    Err(RustMqError::OffsetOutOfRange(msg)) => {
        println!("Offset out of range: {}", msg);
        // Seek to valid offset
    },
    Err(RustMqError::Timeout) => {
        println!("Request timed out");
        // Implement retry logic
    },
    Err(e) => println!("Other error: {}", e),
}

Integration with Storage Layers

The Broker Core seamlessly integrates with RustMQ's tiered storage architecture:

  • Local WAL: Recent messages are served from high-speed local NVMe storage
  • Cache Layer: Frequently accessed messages are cached for optimal performance
  • Object Storage: Historical messages are automatically migrated to cost-effective cloud storage
  • Intelligent Routing: The core automatically routes read requests to the optimal storage tier

Testing and Validation

The Message Broker Core includes comprehensive test coverage:

  • Unit Tests: Core functionality with 88 passing tests
  • Integration Tests: End-to-end workflows with 9 comprehensive test scenarios
  • Mock Implementations: Complete test doubles for all dependencies
  • Error Scenarios: Comprehensive error condition testing

Performance Characteristics

I/O Performance Optimizations

RustMQ features advanced I/O optimizations with automatic backend selection for maximum performance:

  • πŸ”₯ io_uring Backend (Linux): True asynchronous I/O with 2-10x lower latency (0.5-2ΞΌs vs 5-20ΞΌs)

    • Throughput: 3-5x higher IOPS for small random I/O operations
    • CPU Efficiency: 50-80% reduction in CPU usage for I/O-heavy workloads
    • Memory Efficiency: No thread pool overhead, direct kernel communication
    • Feature Flag: Enable with --features io-uring (automatic detection on Linux 5.6+)
  • πŸ›‘οΈ Fallback Backend: High-performance tokio::fs implementation for cross-platform compatibility

    • Automatic Selection: Runtime detection with transparent fallback
    • Platform Support: Windows, macOS, Linux (when io_uring unavailable)
    • Consistent API: Same performance characteristics across all platforms

Overall Performance

  • Low Latency: Sub-millisecond produce latency for local WAL writes (optimized with io_uring)
  • High Throughput: Batch production for maximum throughput scenarios
  • Automatic Partitioning: Intelligent partition selection based on message keys
  • Zero-Copy Operations: Efficient memory usage throughout the message path with buffer reuse (detailed optimization guide)
  • Async Throughout: Non-blocking I/O for maximum concurrency
  • Platform Adaptive: Automatically selects optimal I/O backend based on system capabilities

πŸ“¦ Client SDKs

RustMQ provides official client SDKs for multiple programming languages with production-ready features and comprehensive documentation.

πŸ¦€ Rust SDK

  • Location: sdk/rust/
  • Status: βœ… Fully Implemented - Production-ready client library with comprehensive producer API
  • Features:
    • Advanced Producer API: Builder pattern with intelligent batching, flush mechanisms, and configurable acknowledgment levels
    • Async/Await: Built on Tokio with zero-copy operations and streaming support
    • QUIC Transport: Modern HTTP/3 protocol for low-latency communication
    • Comprehensive Error Handling: Detailed error types with retry logic and timeout management
    • Performance Monitoring: Built-in metrics for messages sent/failed, batch sizes, and timing
  • Build: cargo build --release
  • Install: Add to Cargo.toml: rustmq-client = { path = "sdk/rust" }

Producer API

The Rust SDK provides a comprehensive Producer API with intelligent batching, flush mechanisms, and production-ready features. Based on the actual implementation:

Basic Producer Usage
use rustmq_client::*;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create client connection
    let config = ClientConfig {
        brokers: vec!["localhost:9092".to_string()],
        client_id: Some("my-app-producer".to_string()),
        connect_timeout: Duration::from_secs(10),
        request_timeout: Duration::from_secs(30),
        ..Default::default()
    };
    
    let client = RustMqClient::new(config).await?;
    
    // Create producer with custom configuration
    let producer_config = ProducerConfig {
        batch_size: 100,                           // Batch up to 100 messages
        batch_timeout: Duration::from_millis(10),  // Or send after 10ms
        ack_level: AckLevel::All,                   // Wait for all replicas
        producer_id: Some("my-app-producer".to_string()),
        ..Default::default()
    };
    
    let producer = ProducerBuilder::new()
        .topic("user-events")
        .config(producer_config)
        .client(client)
        .build()
        .await?;
    
    // Send a single message and wait for acknowledgment
    let message = Message::builder()
        .topic("user-events")
        .payload("user logged in")
        .header("user-id", "12345")
        .header("event-type", "login")
        .build()?;
    
    let result = producer.send(message).await?;
    println!("Message sent to partition {} at offset {}", 
             result.partition, result.offset);
    
    Ok(())
}
Fire-and-Forget Messages
// High-throughput fire-and-forget sending
for i in 0..1000 {
    let message = Message::builder()
        .topic("metrics")
        .payload(format!("{{\"value\": {}, \"timestamp\": {}}}", i, timestamp))
        .header("sensor-id", &format!("sensor-{}", i % 10))
        .build()?;
    
    // Returns immediately after queuing - no waiting for broker
    producer.send_async(message).await?;
}

// Flush to ensure all messages are sent
producer.flush().await?;
Batch Operations
// Prepare a batch of messages
let messages: Vec<_> = (0..50).map(|i| {
    Message::builder()
        .topic("batch-topic")
        .payload(format!("message-{}", i))
        .header("batch-id", "batch-123")
        .build().unwrap()
}).collect();

// Send batch and wait for all acknowledgments
let results = producer.send_batch(messages).await?;

for result in results {
    println!("Message {} sent to offset {}", 
             result.message_id, result.offset);
}
Producer Configuration Options
let producer_config = ProducerConfig {
    // Batching configuration
    batch_size: 100,                           // Messages per batch
    batch_timeout: Duration::from_millis(10),  // Maximum wait time
    
    // Reliability configuration  
    ack_level: AckLevel::All,                   // All, Leader, or None
    max_message_size: 1024 * 1024,             // 1MB max message size
    idempotent: true,                           // Enable idempotent producer
    
    // Producer identification
    producer_id: Some("my-producer".to_string()),
    
    // Advanced configuration
    compression: CompressionConfig {
        enabled: true,
        algorithm: CompressionAlgorithm::Lz4,
        level: 6,
        min_size: 1024,
    },
    
    default_properties: HashMap::from([
        ("app".to_string(), "my-application".to_string()),
        ("version".to_string(), "1.0.0".to_string()),
    ]),
};
Error Handling
use rustmq_sdk::error::ClientError;

match producer.send(message).await {
    Ok(result) => {
        println!("Success: {} at offset {}", result.message_id, result.offset);
    }
    Err(ClientError::Timeout { timeout_ms }) => {
        println!("Request timed out after {}ms", timeout_ms);
        // Implement retry logic
    }
    Err(ClientError::Broker(msg)) => {
        println!("Broker error: {}", msg);
        // Handle broker-side errors
    }
    Err(ClientError::MessageTooLarge { size, max_size }) => {
        println!("Message too large: {} bytes (max: {})", size, max_size);
        // Reduce message size
    }
    Err(e) => {
        println!("Other error: {}", e);
    }
}
Monitoring and Metrics
// Get producer performance metrics
let metrics = producer.metrics().await;

println!("Messages sent: {}", 
         metrics.messages_sent.load(std::sync::atomic::Ordering::Relaxed));
println!("Messages failed: {}", 
         metrics.messages_failed.load(std::sync::atomic::Ordering::Relaxed));
println!("Batches sent: {}", 
         metrics.batches_sent.load(std::sync::atomic::Ordering::Relaxed));
println!("Average batch size: {:.2}", 
         *metrics.average_batch_size.read().await);

if let Some(last_send) = *metrics.last_send_time.read().await {
    println!("Last send: {:?} ago", last_send.elapsed());
}
Graceful Shutdown
// Proper producer shutdown
async fn shutdown_producer(producer: Producer) -> Result<(), ClientError> {
    // Flush all pending messages
    producer.flush().await?;
    
    // Close producer and cleanup resources
    producer.close().await?;
    
    println!("Producer shut down gracefully");
    Ok(())
}

🐹 Go SDK

  • Location: sdk/go/
  • Status: βœ… Fully Implemented - Production-ready client library with sophisticated connection layer
  • Features:
    • Advanced Connection Management: QUIC transport with intelligent connection pooling, round-robin load balancing, and automatic failover
    • Comprehensive TLS/mTLS Support: Full client certificate authentication with CA validation and configurable trust stores
    • Health Check System: Real-time broker health monitoring with JSON message exchange and automatic cleanup of failed connections
    • Robust Reconnection Logic: Exponential backoff with jitter, per-broker state tracking, and intelligent failure recovery
    • Producer API with Batching: Intelligent message batching with configurable size/timeout thresholds and compression support
    • Extensive Statistics: Connection metrics, health check tracking, error monitoring, traffic analytics, and reconnection statistics
    • Production-Ready Features: Concurrent-safe operations, goroutine-based processing, configurable timeouts, and comprehensive error handling
  • Build: go build ./...
  • Install: import "github.com/rustmq/rustmq/sdk/go/rustmq"

Go SDK Connection Layer Highlights

The Go SDK features a sophisticated connection management system designed for production environments:

TLS/mTLS Configuration

config := &rustmq.ClientConfig{
    EnableTLS: true,
    TLSConfig: &rustmq.TLSConfig{
        CACert:     "/etc/ssl/certs/ca.pem",
        ClientCert: "/etc/ssl/certs/client.pem",
        ClientKey:  "/etc/ssl/private/client.key",
        ServerName: "rustmq.example.com",
    },
}

Health Check & Reconnection

// Automatic health monitoring with configurable intervals
config.KeepAliveInterval = 30 * time.Second

// Exponential backoff with jitter for reconnection
config.RetryConfig = &rustmq.RetryConfig{
    MaxRetries: 10,
    BaseDelay:  100 * time.Millisecond,
    MaxDelay:   30 * time.Second,
    Multiplier: 2.0,
    Jitter:     true,
}

Producer with Intelligent Batching

// Create producer with batching configuration
producerConfig := &rustmq.ProducerConfig{
    BatchSize:    100,
    BatchTimeout: 100 * time.Millisecond,
    AckLevel:     rustmq.AckAll,
    Idempotent:   true,
}

producer, err := client.CreateProducer("topic", producerConfig)
if err != nil {
    log.Fatal(err)
}

// Send message with automatic batching
result, err := producer.Send(ctx, message)

Connection Statistics

stats := client.Stats()
fmt.Printf("Active: %d/%d, Reconnects: %d, Health Checks: %d", 
    stats.ActiveConnections, stats.TotalConnections,
    stats.ReconnectAttempts, stats.HealthChecks)

// Additional statistics available
fmt.Printf("Bytes: Sent=%d, Received=%d, Errors=%d", 
    stats.BytesSent, stats.BytesReceived, stats.Errors)

Common SDK Features

  • QUIC/HTTP3 Transport: Low-latency, multiplexed connections
  • Producer APIs: Sync/async sending, batching, compression
  • Consumer APIs: Auto-commit, manual offset management, consumer groups
  • Stream Processing: Real-time message transformation pipelines
  • Configuration: Comprehensive client, producer, consumer settings
  • Monitoring: Built-in metrics, health checks, observability
  • Error Handling: Retry logic, circuit breakers, dead letter queues
  • Security: TLS/mTLS, authentication, authorization

Quick Start

Rust SDK

cd sdk/rust

# Basic producer example
cargo run --example simple_producer

# Advanced consumer with multi-partition support
cargo run --example advanced_consumer

# Stream processing example
cargo run --example stream_processor

Go SDK

cd sdk/go

# Basic producer example
go run examples/simple_producer.go

# Basic consumer example
go run examples/simple_consumer.go

# Advanced stream processing
go run examples/advanced_stream_processor.go

See individual SDK READMEs for detailed usage, configuration, performance tuning, and API documentation.

πŸ“š Usage Examples

Client Examples

Note: The following are examples of the intended client API. Current implementation is in early development stage and these clients are not yet available.

Rust Client Example

// Cargo.toml
[dependencies]
rustmq-client = { path = "sdk/rust" }
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }

// main.rs
use rustmq_client::*;
use serde::{Serialize, Deserialize};
use std::time::Duration;

#[derive(Serialize, Deserialize)]
struct OrderEvent {
    order_id: String,
    customer_id: String,
    amount: f64,
    timestamp: u64,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create client connection
    let config = ClientConfig {
        brokers: vec!["localhost:9092".to_string()],
        client_id: Some("order-processor".to_string()),
        ..Default::default()
    };
    
    let client = RustMqClient::new(config).await?;
    
    // Create producer
    let producer = ProducerBuilder::new()
        .topic("orders")
        .client(client.clone())
        .build()
        .await?;

    // Produce messages
    let order = OrderEvent {
        order_id: "order-123".to_string(),
        customer_id: "customer-456".to_string(),
        amount: 99.99,
        timestamp: std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)?
            .as_secs(),
    };

    let message = Message::builder()
        .topic("orders")
        .key(&order.order_id)
        .payload(serde_json::to_vec(&order)?)
        .header("content-type", "application/json")
        .build()?;

    let result = producer.send(message).await?;
    println!("Message produced at offset: {}", result.offset);

    // Create consumer
    let consumer = ConsumerBuilder::new()
        .topic("orders")
        .consumer_group("order-processors")
        .client(client)
        .build()
        .await?;
    
    // Consume with automatic offset management
    while let Some(consumer_message) = consumer.receive().await? {
        let message = &consumer_message.message;
        let order: OrderEvent = serde_json::from_slice(&message.payload)?;
        
        // Process the order
        process_order(order).await?;
        
        // Acknowledge message
        consumer_message.ack().await?;
    }
    
    Ok(())
}

async fn process_order(order: OrderEvent) -> Result<(), Box<dyn std::error::Error>> {
    println!("Processing order {} for customer {} amount ${}", 
             order.order_id, order.customer_id, order.amount);
    
    // Your business logic here
    tokio::time::sleep(Duration::from_millis(100)).await;
    
    Ok(())
}

Go Client Example

// go.mod
module rustmq-example

go 1.21

require (
    github.com/rustmq/rustmq/sdk/go v0.1.0
    github.com/google/uuid v1.3.0
)

// main.go
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"

    "github.com/google/uuid"
    "github.com/rustmq/rustmq/sdk/go/rustmq"
)

type OrderEvent struct {
    OrderID    string  `json:"order_id"`
    CustomerID string  `json:"customer_id"`
    Amount     float64 `json:"amount"`
    Timestamp  int64   `json:"timestamp"`
}

func main() {
    // Create client configuration
    config := &rustmq.ClientConfig{
        Brokers:  []string{"localhost:9092"},
        ClientID: "order-processor",
    }
    
    client, err := rustmq.NewClient(config)
    if err != nil {
        log.Fatal("Failed to create client:", err)
    }
    defer client.Close()

    // Producer example
    producer, err := client.CreateProducer("orders")
    if err != nil {
        log.Fatal("Failed to create producer:", err)
    }
    defer producer.Close()

    // Send some orders
    for i := 0; i < 10; i++ {
        order := OrderEvent{
            OrderID:    uuid.New().String(),
            CustomerID: fmt.Sprintf("customer-%d", i%5),
            Amount:     float64((i + 1) * 25),
            Timestamp:  time.Now().UnixMilli(),
        }

        orderBytes, _ := json.Marshal(order)
        
        message := rustmq.NewMessage().
            Topic("orders").
            KeyString(order.OrderID).
            Payload(orderBytes).
            Header("content-type", "application/json").
            Build()

        ctx := context.Background()
        result, err := producer.Send(ctx, message)
        if err != nil {
            log.Printf("Failed to send message: %v", err)
            continue
        }
        
        fmt.Printf("Message sent at offset: %d, partition: %d\n", 
            result.Offset, result.Partition)
    }

    // Consumer example
    consumer, err := client.CreateConsumer("orders", "order-processors")
    if err != nil {
        log.Fatal("Failed to create consumer:", err)
    }
    defer consumer.Close()

    // Consume messages
    for i := 0; i < 10; i++ {
        ctx := context.Background()
        message, err := consumer.Receive(ctx)
        if err != nil {
            log.Printf("Receive error: %v", err)
            continue
        }

        var order OrderEvent
        if err := json.Unmarshal(message.Message.Payload, &order); err != nil {
            log.Printf("Failed to unmarshal order: %v", err)
            message.Ack()
            continue
        }

        // Process the order
        if err := processOrder(order); err != nil {
            log.Printf("Failed to process order %s: %v", order.OrderID, err)
            message.Nack() // Retry
            continue
        }

        fmt.Printf("Processed order %s for customer %s amount $%.2f\n",
            order.OrderID, order.CustomerID, order.Amount)
            
        // Acknowledge successful processing
        message.Ack()
    }
}

func processOrder(order OrderEvent) error {
    // Your business logic here
    time.Sleep(100 * time.Millisecond)
    return nil
}

Admin Operations

βœ… Fully Implemented: The Admin REST API is production-ready with comprehensive cluster management capabilities.

# Create topic with custom configuration
curl -X POST http://localhost:8080/api/v1/topics \
  -H "Content-Type: application/json" \
  -d '{
    "name": "user-events",
    "partitions": 24,
    "replication_factor": 3,
    "retention_ms": 604800000,
    "segment_bytes": 1073741824,
    "compression_type": "lz4"
  }'

# List topics
curl http://localhost:8080/api/v1/topics

# Get topic details
curl http://localhost:8080/api/v1/topics/user-events

# Delete topic
curl -X DELETE http://localhost:8080/api/v1/topics/user-events

# Get cluster health and status
curl http://localhost:8080/api/v1/cluster

# List brokers with health status
curl http://localhost:8080/api/v1/brokers

# Check service health and uptime
curl http://localhost:8080/health

# Advanced features (future implementation)
# Partition rebalancing, ETL module management, and metrics endpoints
# will be available in future releases

πŸ“Š Future Performance Tuning (Not Yet Implemented)

Planned Broker Optimization

# High-throughput configuration
[wal]
capacity_bytes = 53687091200        # 50GB for high-volume topics
fsync_on_write = false              # Disable for maximum throughput
segment_size_bytes = 2147483648     # 2GB segments
buffer_size = 1048576               # 1MB buffer

[cache]
write_cache_size_bytes = 8589934592  # 8GB hot cache
read_cache_size_bytes = 17179869184  # 16GB cold cache

[network]
max_connections = 50000             # Increase connection limit
connection_timeout_ms = 60000       # Longer timeout for slow clients

[object_storage]
max_concurrent_uploads = 50         # More concurrent uploads
multipart_threshold = 52428800      # 50MB threshold

# Low-latency configuration
[wal]
fsync_on_write = true               # Enable for durability
buffer_size = 4096                  # Smaller buffers for low latency

[replication]
min_in_sync_replicas = 1            # Reduce for lower latency
ack_timeout_ms = 1000               # Faster timeouts
heartbeat_timeout_ms = 10000        # Shorter heartbeat timeout for faster failover

Planned Kubernetes Resource Tuning

# High-performance broker configuration
resources:
  requests:
    memory: "16Gi"
    cpu: "8"
    ephemeral-storage: "100Gi"
  limits:
    memory: "32Gi"
    cpu: "16"
    ephemeral-storage: "200Gi"

# Node affinity for performance
nodeSelector:
  cloud.google.com/gke-nodepool: high-performance
  
affinity:
  podAntiAffinity:
    requiredDuringSchedulingIgnoredDuringExecution:
    - labelSelector:
        matchLabels:
          app: rustmq-broker
      topologyKey: kubernetes.io/hostname

# Volume configuration for maximum IOPS
volumeClaimTemplates:
- metadata:
    name: wal-storage
  spec:
    accessModes: ["ReadWriteOnce"]
    storageClassName: fast-ssd
    resources:
      requests:
        storage: 500Gi

πŸ“ˆ Future Monitoring (Not Yet Implemented)

Planned Prometheus Configuration

# prometheus-config.yaml - future monitoring setup
apiVersion: v1
kind: ConfigMap
metadata:
  name: prometheus-config
data:
  prometheus.yml: |
    global:
      scrape_interval: 15s
      
    scrape_configs:
    - job_name: 'rustmq-brokers'
      kubernetes_sd_configs:
      - role: pod
        namespaces:
          names: [rustmq]
      relabel_configs:
      - source_labels: [__meta_kubernetes_pod_label_app]
        action: keep
        regex: rustmq-broker
      - source_labels: [__meta_kubernetes_pod_ip]
        target_label: __address__
        replacement: ${1}:9642
        
    - job_name: 'rustmq-controllers'
      kubernetes_sd_configs:
      - role: pod
        namespaces:
          names: [rustmq]
      relabel_configs:
      - source_labels: [__meta_kubernetes_pod_label_app]
        action: keep
        regex: rustmq-controller
      - source_labels: [__meta_kubernetes_pod_ip]
        target_label: __address__
        replacement: ${1}:9642

Future Monitoring (Not Yet Implemented)

Planned metrics to monitor:

  • Throughput: rate(messages_produced_total[5m]), rate(messages_consumed_total[5m])
  • Latency: produce_latency_seconds, consume_latency_seconds
  • Storage: wal_size_bytes, cache_hit_ratio, object_storage_upload_rate
  • Replication: replication_lag, in_sync_replicas_count
  • System: cpu_usage, memory_usage, disk_iops, network_throughput

Future Alerting (Not Yet Implemented)

# alerts.yaml - planned alerting rules
groups:
- name: rustmq.rules
  rules:
  - alert: HighProduceLatency
    expr: histogram_quantile(0.95, produce_latency_seconds) > 0.1
    for: 2m
    labels:
      severity: warning
    annotations:
      summary: "High produce latency detected"
      
  - alert: ReplicationLagHigh
    expr: replication_lag > 10000
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "Replication lag is too high"
      
  - alert: BrokerDown
    expr: up{job="rustmq-brokers"} == 0
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "RustMQ broker is down"

πŸ”§ Development & Troubleshooting

πŸš€ Environment Setup

RustMQ provides a comprehensive environment setup script that can configure both development and production environments:

Development Environment Setup

Set up a complete local development environment with certificates, configurations, and services:

# Set up complete development environment (recommended)
./generate-certs.sh develop

# Force regenerate existing setup
./generate-certs.sh develop --force

# View available options
./generate-certs.sh --help

What the development setup provides:

  • βœ… Self-signed development certificates with simplified CA chain
  • βœ… Development configuration files for broker, controller, and admin
  • βœ… Local data directories and startup scripts
  • βœ… Example applications and test clients
  • βœ… Ready-to-run local cluster

Quick start after setup:

# Start the complete cluster
./start-cluster-dev.sh

# Or start services individually
./start-controller-dev.sh  # Start controller first
./start-broker-dev.sh      # Start broker

# Test with examples
cargo run --example secure_producer
cargo run --example secure_consumer

# Admin operations
cargo run --bin rustmq-admin -- --config config/admin-dev.toml cluster status

Production Environment Setup

Get comprehensive guidance for production deployment:

# Show production setup guidance
./generate-certs.sh production

What the production guidance provides:

  • πŸ“‹ Production setup guidance and checklists
  • πŸ“‹ Security best practices and hardening
  • πŸ“‹ Deployment options (Kubernetes, Docker, systemd)
  • πŸ“‹ Monitoring and observability setup
  • πŸ“‹ Certificate management with external CA

Environment Setup Features

Development Environment (./generate-certs.sh develop):

  • Complete Setup: Creates certificates, configs, data directories, and startup scripts
  • Certificate Chain: Proper CA-signed certificates (fixes August 2025 certificate signing issues)
  • Ready to Use: Start developing immediately with ./start-cluster-dev.sh
  • Examples Included: Secure producer/consumer examples with mTLS
  • Validation: Automatic setup validation and certificate verification

Production Environment (./generate-certs.sh production):

  • Security Guidance: Enterprise-grade security setup instructions
  • Certificate Management: External CA integration and certificate lifecycle
  • Deployment Options: Kubernetes, Docker, and systemd deployment guides
  • Monitoring Setup: Comprehensive observability and alerting configuration
  • Best Practices: Production hardening and operational procedures

πŸ” Development Certificates

The development environment automatically generates certificates with simplified signing chains:

  • certs/ca.pem - Root CA certificate (self-signed for development)
  • certs/server.pem + certs/server.key - Server certificate and private key (CA-signed)
  • certs/client.pem + certs/client.key - Client certificate and private key (CA-signed)
  • certs/admin.pem + certs/admin.key - Admin certificate and private key (CA-signed)

⚠️ Security Notice: These are development-only certificates. For production, follow the production setup guide!

Current Development Issues

Note: Since RustMQ is in early development, most "issues" are actually missing implementations.

  1. Services Not Responding
# Both broker and controller services are now production-ready with full functionality
# Check if they started successfully
docker-compose logs rustmq-broker-1
docker-compose logs rustmq-controller-1

# Look for configuration loading messages
# Services should log "started successfully" then sleep
  1. Build Issues
# Ensure Rust toolchain is up to date
rustup update

# Clean build if needed
cargo clean
cargo build --release

# Run tests to verify implementation
cargo test
  1. Configuration Issues
# Validate configuration
cargo run --bin rustmq-broker -- --config config/broker.toml

# Check configuration structure in src/config.rs
# All fields must be present in TOML files

Log Analysis

# View service logs (from docker/ directory)
cd docker
docker-compose logs rustmq-broker-1
docker-compose logs rustmq-controller-1

# Check for configuration validation errors
docker-compose logs | grep ERROR

# Monitor BigQuery subscriber demo
docker-compose logs rustmq-bigquery-subscriber

For complete Docker and Kubernetes deployment guides, troubleshooting, and configuration details, see docker/README.md.

🀝 Contributing

We welcome contributions to help implement the remaining features!

Current Development Priorities

  1. Message Broker Core: Implement actual produce/consume functionality
  2. Network Layer: Complete QUIC/gRPC server implementations
  3. Distributed Coordination: Implement Raft consensus and metadata management
  4. Client Libraries: Build Rust and Go client libraries
  5. Admin API: Implement REST API for cluster management

Development Setup

# Clone and setup
git clone https://github.com/cloudymoma/rustmq.git
cd rustmq

# Install development dependencies
cargo install cargo-watch cargo-audit cargo-tarpaulin

# Run tests with coverage
cargo tarpaulin --out Html

# Watch for changes during development
cargo watch -x test -x clippy

Testing

# Unit tests (currently 88 tests passing)
cargo test --lib

# Integration tests (9 broker core tests + others)
cargo test --test integration_broker_core

# Run specific module tests
cargo test storage::
cargo test scaling::
cargo test broker::core

# Run with features
cargo test --features "io-uring,wasm"

# All tests
cargo test

πŸ“„ License

This project is licensed under The Bindiego License (BDL), Version 1.0 - see the LICENSE file for details.

License Summary

  • βœ… Academic Use: Freely available for teaching, research, and educational purposes
  • βœ… Contributions: Welcome contributions back to the original project
  • ❌ Commercial Use: Prohibited without separate commercial license
  • ❌ Managed Services: Cannot offer RustMQ as a hosted service

For commercial licensing inquiries, please contact the license holder through the official repository.

πŸ”— Links


RustMQ - Built with ❀️ in Rust for the cloud-native future. Optimized for Google Cloud Platform.

About

Cloud-native distributed message queue with storage-compute separation architecture. Licensed under BDL 1.0 (academic use permitted, commercial use requires license).

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 2

  •  
  •