|
| 1 | +# RingMPSC |
| 2 | + |
| 3 | +A high-performance lock-free Multi-Producer Single-Consumer (MPSC) channel implementation in Zig, achieving **~180 billion messages per second** (u32) and **~93 billion messages per second** (u64) on commodity hardware. |
| 4 | + |
| 5 | +> **Platform**: Linux only. Uses futex, eventfd, epoll, memfd_create, NUMA sysfs, and SCM_RIGHTS — all Linux-specific APIs. macOS and Windows are not supported. |
| 6 | +
|
| 7 | +## Performance |
| 8 | + |
| 9 | +Throughput on AMD Ryzen 7 5700 (8 cores, 16 threads, DDR4-3200): |
| 10 | + |
| 11 | +| Config | u32 (4B msg) | u64 (8B msg) | |
| 12 | +|--------|-------------|-------------| |
| 13 | +| 1P1C | 29 B/s | 15 B/s | |
| 14 | +| 2P1C | 58 B/s | 29 B/s | |
| 15 | +| 4P1C | 112 B/s | 58 B/s | |
| 16 | +| 6P1C | 160 B/s | 85 B/s | |
| 17 | +| **8P1C** | **180 B/s** | **93 B/s** | |
| 18 | + |
| 19 | +*B/s = billion messages per second. 500M msgs/producer, 32K batch, 32K ring, @memset fill. Each per-producer ring (128–256 KB) fits within the 512 KB dedicated L2 per Zen 3 core — throughput reflects cache-resident end-to-end message movement. See [docs/methodology.md](docs/methodology.md) for full methodology.* |
| 20 | + |
| 21 | +**How these numbers are achieved:** Each producer calls `reserve(32768)` → `@memset` (fills 128KB for u32) → `commit(32768)`. The consumer advances head pointers. At 1P1C, a single core writes 128KB into L2 cache per batch at near-peak L2 bandwidth (~118 GB/s on Zen 3), yielding ~29 B/s. At 8P, each producer has its own L2 cache (512KB per Zen 3 core) with zero cross-producer contention, scaling to ~180 B/s (~80% efficiency — the ~20% loss is L3 coherency traffic and consumer polling overhead, not lock contention). Ring coordination (`reserve`/`commit`/`consume`) takes <0.1% of total time — 99.9% is the `@memset` write. See [docs/ringmpsc.md](docs/ringmpsc.md) §5.3 for the complete first-principles derivation. |
| 22 | + |
| 23 | +### vs Baselines (single-item, 5M u32) |
| 24 | + |
| 25 | +| Config | ringmpsc | CAS MPSC | Mutex | vs Mutex | |
| 26 | +|--------|---------|----------|-------|----------| |
| 27 | +| 1P1C | 54 M/s | 115 M/s | 24 M/s | 2x | |
| 28 | +| 2P1C | 117 M/s | 23 M/s | 29 M/s | 4x | |
| 29 | +| 4P1C | 237 M/s | 20 M/s | 10 M/s | 24x | |
| 30 | +| **8P1C** | **514 M/s** | **14 M/s** | **4 M/s** | **129x** | |
| 31 | + |
| 32 | +At 8 producers, both the mutex and CAS queue collapse under contention while ringmpsc scales to 416+ M/s (8x over 1P1C). The CAS MPSC (Vyukov-style bounded array) wins at 1P1C where there is no contention, but degrades from 115 M/s to 14 M/s at 8P — an 8x degradation. See [docs/benchmarks.md](docs/benchmarks.md) for full competitive analysis including batch throughput, latency, and comparison against rigtorp-style queues. |
| 33 | + |
| 34 | +## Algorithm |
| 35 | + |
| 36 | +RingMPSC uses a **ring-decomposed** architecture where each producer has a dedicated SPSC ring buffer, eliminating producer-producer contention entirely. |
| 37 | + |
| 38 | +``` |
| 39 | +Producer 0 ──► [Ring 0] ──┐ |
| 40 | +Producer 1 ──► [Ring 1] ──┼──► Consumer (polls all rings) |
| 41 | +Producer 2 ──► [Ring 2] ──┤ |
| 42 | +Producer N ──► [Ring N] ──┘ |
| 43 | +``` |
| 44 | + |
| 45 | +### Key Optimizations |
| 46 | + |
| 47 | +1. **128-byte Cache Line Alignment** — Head and tail pointers separated by 128 bytes to prevent prefetcher-induced false sharing |
| 48 | +2. **Cached Sequence Numbers** — Producers cache the consumer's head position, refreshing only when the ring appears full |
| 49 | +3. **Batch Operations** — `consumeBatch` processes all available items with a single atomic head update |
| 50 | +4. **Adaptive Backoff** — Crossbeam-style exponential backoff (spin -> yield -> park) |
| 51 | +5. **Zero-Copy API** — `reserve`/`commit` pattern for direct ring buffer writes |
| 52 | +6. **NUMA-Aware Allocation** — Per-ring heap allocation with producer-local NUMA binding |
| 53 | +7. **Futex-Based Wait Strategies** — 6 pluggable strategies from busy-spin to futex-blocking with zero CPU when idle |
| 54 | +8. **EventNotifier** — Optional eventfd-based consumer wake for epoll/io_uring event loop integration (zero CPU when idle, instant wake on data) |
| 55 | +9. **Adaptive Consumer Skip** — Exponential backoff on empty rings, reducing consumer polling from O(P) to O(active_producers) |
| 56 | +10. **Zero-Copy BufferPool** — Lock-free slab allocator for large message passing via pointer-ring pattern. Ring carries 4-byte handles; payloads stay in pre-allocated pool with zero copies. |
| 57 | +11. **Cross-Process SharedRing** — SPSC ring buffer shared across process boundaries via memfd_create + MAP_SHARED. Automatic cleanup on fd close, heartbeat-based crash detection, cross-process futex wake. |
| 58 | +12. **SCM_RIGHTS fd Passing** — Send/receive file descriptors between processes over Unix sockets for SharedRing establishment. |
| 59 | +13. **Graceful Shutdown** — Coordinated close with drain and fully-drained detection |
| 60 | +14. **Latency Tracking** — Per-batch min/max/avg nanosecond latency (comptime-gated, zero overhead when disabled) |
| 61 | +15. **Channel Stats Snapshot** — Single `snapshot()` call returning all health metrics |
| 62 | +16. **Thin Event Loop** — Managed consumer loop with automatic drain on stop |
| 63 | + |
| 64 | +See [docs/ringmpsc.md](docs/ringmpsc.md) for the full algorithm writeup with memory ordering analysis. |
| 65 | + |
| 66 | +## Quick Start |
| 67 | + |
| 68 | +```zig |
| 69 | +const ringmpsc = @import("ringmpsc"); |
| 70 | +
|
| 71 | +// SPSC ring buffer (zero-overhead, type alias for Ring) |
| 72 | +var ring = ringmpsc.spsc.Channel(u64, .{}){}; |
| 73 | +_ = ring.send(&[_]u64{1, 2, 3}); |
| 74 | +
|
| 75 | +// MPSC channel (multi-producer, heap-allocated rings) |
| 76 | +var channel = try ringmpsc.mpsc.Channel(u64, .{}).init(allocator); |
| 77 | +defer channel.deinit(); |
| 78 | +const producer = try channel.register(); |
| 79 | +_ = producer.sendOne(42); |
| 80 | +
|
| 81 | +// MPMC channel (work-stealing consumers) |
| 82 | +var mpmc = ringmpsc.mpmc.MpmcChannel(u64, .{}){}; |
| 83 | +const prod = try mpmc.register(); |
| 84 | +var cons = try mpmc.registerConsumer(); |
| 85 | +_ = prod.sendOne(42); |
| 86 | +var buf: [16]u64 = undefined; |
| 87 | +_ = cons.steal(&buf); |
| 88 | +``` |
| 89 | + |
| 90 | +## Build |
| 91 | + |
| 92 | +```bash |
| 93 | +# Build |
| 94 | +zig build -Doptimize=ReleaseFast |
| 95 | + |
| 96 | +# Unit tests |
| 97 | +zig build test |
| 98 | + |
| 99 | +# Integration tests |
| 100 | +zig build test-spsc |
| 101 | +zig build test-mpsc |
| 102 | +zig build test-fifo |
| 103 | +zig build test-chaos |
| 104 | +zig build test-determinism |
| 105 | +zig build test-stress |
| 106 | +zig build test-fuzz |
| 107 | +zig build test-mpmc |
| 108 | +zig build test-spmc |
| 109 | +zig build test-simd |
| 110 | +zig build test-blocking |
| 111 | +zig build test-wraparound |
| 112 | + |
| 113 | +# Benchmarks |
| 114 | +zig build bench-high-perf # Primary throughput benchmark (u32 + u64) |
| 115 | +zig build bench-mpsc # MPSC baseline |
| 116 | +zig build bench-spsc # SPSC latency |
| 117 | +zig build bench-simd # SIMD operations |
| 118 | +zig build bench-suite # Full benchmark suite with HTML report |
| 119 | +zig build bench-analysis # Bottleneck analysis |
| 120 | +zig build bench-cross-process # SharedRing vs in-process (4.6% overhead) |
| 121 | +zig build bench-zero-copy # BufferPool crossover analysis by message size |
| 122 | +zig build bench-wake-latency # Spin vs eventfd wake latency (110ns vs 6us p50) |
| 123 | +zig build bench # Full suite: multi-run stats, baselines, HTML report |
| 124 | + |
| 125 | +# Examples |
| 126 | +zig build example-basic # SPSC send/recv + reserve/commit |
| 127 | +zig build example-mpsc # Multi-producer pipeline with handler |
| 128 | +zig build example-backpressure # Blocking send with timeout |
| 129 | +zig build example-shutdown # Graceful shutdown with drain |
| 130 | +zig build example-event-loop # Managed consumer loop |
| 131 | +zig build example-event-notifier # eventfd + epoll consumer |
| 132 | +zig build example-zero-copy # Zero-copy large messages via BufferPool |
| 133 | +zig build example-shared-memory # Cross-process ring via memfd + mmap |
| 134 | +zig build example-cross-process # Full IPC with fd passing over Unix socket |
| 135 | +zig build example-logging # Structured logging pipeline (real-world pattern) |
| 136 | + |
| 137 | +# Clean benchmark run (recommended) |
| 138 | +sudo taskset -c 0-15 nice -n -20 zig-out/bin/bench-high-perf |
| 139 | +``` |
| 140 | + |
| 141 | +## Documentation |
| 142 | + |
| 143 | +- [docs/architecture.md](docs/architecture.md) — System architecture: type hierarchy, memory layout, data flow, memory ordering model |
| 144 | +- [docs/api.md](docs/api.md) — Complete API reference for all channel types, primitives, and utilities |
| 145 | +- [docs/ringmpsc.md](docs/ringmpsc.md) — Algorithm design: ring decomposition, cache layout, batch consumption |
| 146 | +- [docs/methodology.md](docs/methodology.md) — Benchmark methodology: measurement procedure, threats to validity, reproducibility |
| 147 | +- [docs/benchmarks.md](docs/benchmarks.md) — Results and analysis: scaling, bottleneck breakdown, configuration sensitivity |
| 148 | +- [docs/correctness.md](docs/correctness.md) — Correctness properties, memory ordering model, test coverage |
| 149 | + |
| 150 | +For clean benchmark runs: |
| 151 | +```bash |
| 152 | +echo performance | sudo tee /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor |
| 153 | +sudo taskset -c 0-15 nice -n -20 zig-out/bin/bench-high-perf |
| 154 | +``` |
| 155 | + |
| 156 | +## Channel Types |
| 157 | + |
| 158 | +| | Single-Consumer | Multi-Consumer | |
| 159 | +|---|---|---| |
| 160 | +| **Single-Producer** | `spsc.Channel` | `spmc.Channel` | |
| 161 | +| **Multi-Producer** | `mpsc.Channel` | `mpmc.MpmcChannel` | |
| 162 | + |
| 163 | +- **SPSC** — Zero-overhead type alias for the core Ring. Fastest possible path. |
| 164 | +- **MPSC** — Heap-allocated per-producer rings with NUMA support. Dynamic producer registration (grow at runtime). The primary use case. |
| 165 | +- **SPMC** — Single ring, CAS-based multi-consumer stealing. |
| 166 | +- **MPMC** — Per-producer rings with progressive work-stealing (local shard -> hottest ring -> random fallback). |
| 167 | + |
| 168 | +## Configuration |
| 169 | + |
| 170 | +```zig |
| 171 | +pub const Config = struct { |
| 172 | + ring_bits: u6 = 16, // 2^16 = 64K slots per ring |
| 173 | + max_producers: usize = 16, |
| 174 | + enable_metrics: bool = false, |
| 175 | + track_contention: bool = false, |
| 176 | + prefetch_threshold: usize = 16, |
| 177 | + numa_aware: bool = true, |
| 178 | + numa_node: i8 = -1, // -1 = auto-detect |
| 179 | +}; |
| 180 | +``` |
| 181 | + |
| 182 | +## Correctness |
| 183 | + |
| 184 | +Four properties verified by the test suite. See [docs/correctness.md](docs/correctness.md) for details. |
| 185 | + |
| 186 | +1. **Per-Producer FIFO** — Messages from a single producer are received in order |
| 187 | +2. **No Data Loss** — All sent messages are eventually received |
| 188 | +3. **Thread Safety** — No data races under concurrent access |
| 189 | +4. **Determinism** — Identical inputs produce identical outputs |
| 190 | + |
| 191 | +## Project Structure |
| 192 | + |
| 193 | +``` |
| 194 | +ringmpsc/ |
| 195 | +├── src/ |
| 196 | +│ ├── ringmpsc.zig # Public API root module |
| 197 | +│ ├── primitives/ |
| 198 | +│ │ ├── ring_buffer.zig # Core SPSC ring buffer |
| 199 | +│ │ ├── wait_strategy.zig # Futex-based wait strategies |
| 200 | +│ │ ├── event_notifier.zig # eventfd for epoll/io_uring integration |
| 201 | +│ │ ├── buffer_pool.zig # Lock-free slab allocator for zero-copy |
| 202 | +│ │ ├── shared_ring.zig # Cross-process ring via memfd + MAP_SHARED |
| 203 | +│ │ ├── backoff.zig # Exponential backoff |
| 204 | +│ │ ├── simd.zig # SIMD batch operations |
| 205 | +│ │ └── blocking.zig # Blocking wrappers with timeout |
| 206 | +│ ├── platform/ |
| 207 | +│ │ ├── numa.zig # NUMA topology + allocation |
| 208 | +│ │ └── fd_passing.zig # SCM_RIGHTS fd send/recv over Unix socket |
| 209 | +│ ├── mpsc/channel.zig # MPSC channel (ring-decomposed) |
| 210 | +│ ├── spsc/channel.zig # SPSC channel (Ring type alias) |
| 211 | +│ ├── mpmc/channel.zig # MPMC channel (work-stealing) |
| 212 | +│ ├── spmc/channel.zig # SPMC channel (CAS consumers) |
| 213 | +│ ├── event_loop.zig # Thin managed consumer loop |
| 214 | +│ └── metrics/collector.zig # Throughput + latency metrics |
| 215 | +├── tests/ # Integration, stress, fuzz, blocking tests |
| 216 | +├── benchmarks/src/ # Benchmark suite + analysis |
| 217 | +├── examples/ # Usage examples (basic, pipeline, logging, backpressure, shutdown, event loop, eventfd+epoll) |
| 218 | +├── bindings/ |
| 219 | +│ ├── c/ # C header-only implementation |
| 220 | +│ └── rust/ # Rust port |
| 221 | +├── docs/ |
| 222 | +│ ├── architecture.md # System architecture |
| 223 | +│ ├── api.md # API reference |
| 224 | +│ ├── ringmpsc.md # Algorithm writeup |
| 225 | +│ ├── methodology.md # Benchmark methodology |
| 226 | +│ ├── benchmarks.md # Results and analysis |
| 227 | +│ └── correctness.md # Correctness verification |
| 228 | +└── scripts/ |
| 229 | + ├── verify.sh # One-click verification (19 checks) |
| 230 | + └── bench-multirun.sh # Multi-run statistics (median, stddev) |
| 231 | +``` |
| 232 | + |
| 233 | +## Bindings |
| 234 | + |
| 235 | +- **C** — Header-only library in `bindings/c/ringmpsc.h` |
| 236 | +- **Rust** — Full port in `bindings/rust/` |
| 237 | + |
| 238 | +## Glossary |
| 239 | + |
| 240 | +| Term | Definition | |
| 241 | +|------|-----------| |
| 242 | +| **MPSC** | Multi-Producer Single-Consumer — many threads send, one thread receives | |
| 243 | +| **SPSC** | Single-Producer Single-Consumer — one sender, one receiver (fastest possible) | |
| 244 | +| **B/s** | Billion messages per second (10⁹ messages/sec) | |
| 245 | +| **M/s** | Million messages per second (10⁶ messages/sec) | |
| 246 | +| **Ring decomposition** | Giving each producer its own SPSC ring instead of sharing one queue | |
| 247 | +| **Batch consumption** | Processing many messages with a single atomic head update (amortizes sync cost) | |
| 248 | +| **Cache-resident** | Data fits in CPU cache (L1/L2/L3), not main memory — much faster access | |
| 249 | +| **False sharing** | Two threads' data on the same cache line causing spurious invalidations | |
| 250 | +| **Acquire/Release** | Memory ordering that ensures writes before a release-store are visible after an acquire-load | |
| 251 | +| **CAS** | Compare-And-Swap — atomic read-modify-write, fails under contention | |
| 252 | +| **Futex** | Fast userspace mutex — OS-level sleep/wake with zero overhead when uncontested | |
| 253 | + |
| 254 | +## Related Work |
| 255 | + |
| 256 | +- [LMAX Disruptor](https://github.com/LMAX-Exchange/disruptor) — Original batch-consumption pattern |
| 257 | +- [rigtorp/SPSCQueue](https://github.com/rigtorp/SPSCQueue) — High-performance C++ SPSC queue |
| 258 | +- [crossbeam-channel](https://github.com/crossbeam-rs/crossbeam) — Rust concurrent channels |
| 259 | +- [moodycamel::ConcurrentQueue](https://github.com/cameron314/concurrentqueue) — C++ lock-free queue |
| 260 | + |
| 261 | +## License |
| 262 | + |
| 263 | +[MIT](LICENSE) |
0 commit comments