-
Notifications
You must be signed in to change notification settings - Fork 1
Home
- Executive Summary
- System Architecture
- Core Components
- Data Flow
- Storage Design
- Network Protocol
- Performance Optimizations
- Reliability & Fault Tolerance
- API Reference
- Operational Characteristics
- Implementation Details
- Trade-offs & Design Decisions
- Future Enhancements
Obelisk is a high-performance, fault-tolerant message broker designed as a simplified alternative to Apache Kafka. It serves as the single source of truth for microservices, real-time systems, and event-driven architectures. Built in Go, it combines the reliability of Kafka with the simplicity of Redis, delivering enterprise-grade messaging with minimal operational overhead.
- Performance: ~38 byte binary protocol, batched I/O, ring buffers for hot data
- Reliability: At-least-once delivery, crash recovery, corruption handling
- Simplicity: Zero configuration, file-based storage, no external dependencies
- Scalability: Concurrent connections, per-topic isolation, efficient resource pooling
- Event Streaming: Real-time data pipelines between services
- Microservices Communication: Asynchronous service decoupling
- Audit Logging: Immutable event log for compliance
- IoT Data Collection: High-throughput sensor data ingestion
- CQRS/Event Sourcing: Event store for domain events
┌─────────────────────────────────────────────────────────────────────┐
│ OBELISK BROKER │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────┐ │
│ │ TCP Server │ │ HTTP Server │ │ Metrics │ │
│ │ Port 8080 │ │ Port 8081 │ │ /metrics │ │
│ └──────┬──────┘ └──────┬───────┘ └──────┬──────┘ │
│ │ │ │ │
│ └────────────────────┼──────────────────────┘ │
│ │ │
│ ┌─────────▼──────────┐ │
│ │ Broker Service │ │
│ │ (Message Router) │ │
│ └─────────┬──────────┘ │
│ │ │
│ ┌────────────────────┼────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌───────────────┐ ┌──────────────┐ │
│ │ Ring Buffer │ │Topic Batcher │ │ File Pool │ │
│ │ (Fast Read) │ │ (Write Buffer)│ │(Handle Cache)│ │
│ └──────────────┘ └───────┬───────┘ └──────────────┘ │
│ │ │
│ ┌───────▼────────┐ │
│ │ Storage Layer │ │
│ │ (.log + .idx) │ │
│ └────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Producer ──TCP──▶ Message Reception
│
▼
Deserialization
│
├─────▶ Ring Buffer (immediate read availability)
│
└─────▶ Topic Batcher
│
▼
Batch Accumulation
│
┌───────────┴───────────┐
│ │
Size Trigger Time Trigger
(100 msgs) (5 seconds)
│ │
└───────────┬───────────┘
│
▼
Disk Flush
│
┌───────────┴───────────┐
▼ ▼
.log file .idx file
(messages) (offsets)
Purpose: High-throughput message ingestion endpoint
Key Features:
- Concurrent connection handling (goroutine per connection)
- Binary protocol for efficiency
- Graceful shutdown with connection draining
- Automatic client acknowledgments
Implementation Details:
type TCPServer struct {
address string // Bind address (e.g., ":8080")
listener net.Listener // TCP socket listener
quit chan struct{} // Shutdown coordination
wg sync.WaitGroup // Connection tracking
service *BrokerService // Message processing
}Connection Lifecycle:
- Accept connection → Spawn goroutine
- Read message length (4 bytes)
- Read message body (N bytes)
- Deserialize and validate
- Process through BrokerService
- Send acknowledgment ("OK\n")
- Repeat until disconnect or shutdown
Purpose: Administrative and monitoring interface
Endpoints:
-
GET /health- Liveness check -
GET /stats?topic=X- Topic statistics -
POST /topics/{topic}/messages- REST message publishing -
GET /metrics- Prometheus metrics
Middleware Stack:
- Panic recovery (prevents server crashes)
- Request logging (method, path, duration)
- CORS handling (if needed)
Purpose: Accumulate messages in memory for efficient bulk disk writes
Batching Strategy:
- Size trigger: Flush when batch reaches 100 messages
- Time trigger: Flush every 5 seconds regardless of size
- Per-topic isolation: Each topic has independent batch
Key Algorithms:
// Batch decision logic
if len(batch.buffer) >= maxSize {
flush() // Size trigger
}
// Background timer
ticker := time.NewTicker(maxWait)
for range ticker.C {
flushAll() // Time trigger
}Memory Management:
- Pre-allocated buffers to reduce GC pressure
- Atomic buffer swapping during flush
- Copy-on-write semantics for thread safety
Purpose: In-memory cache for recent messages enabling fast reads
Design:
- Fixed-size circular buffer (100 messages default)
- O(1) insertion and retrieval
- Automatic eviction of oldest messages
- Per-topic buffer isolation
Implementation:
type Buffer struct {
data []message.Message // Pre-allocated array
head int // Oldest message index
tail int // Next write position
size int // Current element count
capacity int // Maximum elements
}
// Circular indexing
newTail = (tail + 1) % capacityPurpose: Persistent message storage with fast random access
File Structure:
data/topics/
├── user-events.log # Binary message data
├── user-events.idx # Offset→Position index
├── payments.log
└── payments.idx
Storage Format:
.log file structure:
[Length:4][Timestamp:8][TopicLen:4][Topic:N][KeyLen:4][Key:N][ValueLen:4][Value:N]
[Length:4][Timestamp:8][TopicLen:4][Topic:N][KeyLen:4][Key:N][ValueLen:4][Value:N]
...
.idx file structure:
[Position:8] # Byte position of message 0
[Position:8] # Byte position of message 1
[Position:8] # Byte position of message 2
...
Purpose: Manage file handles efficiently to avoid syscall overhead
Features:
- LRU-based handle caching
- Automatic cleanup of idle files
- RAII pattern for safe access
- Thread-safe operations
RAII Pattern:
pool.WithFile(path, flags, func(f *File) error {
// File is guaranteed available here
// Automatically protected from cleanup
return f.AppendWith(writeFunc)
})
// File automatically releasedPurpose: Track reading progress per consumer with persistence
State Management:
// data/consumers/consumer-1.json
{
"topic-0": 142, // Last committed offset
"topic-1": 89,
"topic-2": 201
}Operations:
-
Poll()- Fetch messages from current offset -
Commit()- Persist progress after processing -
Reset()- Restart from beginning -
Subscribe()- Add topic to subscription set
1. Producer connects via TCP
↓
2. Message serialized to binary
↓
3. Length-prefixed frame sent
↓
4. Server deserializes message
↓
5. Parallel operations:
a. Add to ring buffer (immediate availability)
b. Add to topic batch (durability queue)
↓
6. Send ACK to producer
↓
7. Batch eventually flushed to disk
↓
8. Index updated with message position
1. Consumer polls with topic + offset
↓
2. Check index for byte position
↓
3. Seek to position in .log file
↓
4. Read messages sequentially
↓
5. Return message array to consumer
↓
6. Consumer processes messages
↓
7. Consumer commits new offset
↓
8. Offset persisted to JSON file
Trigger (size OR time)
↓
Lock batch buffer
↓
Copy messages locally
↓
Clear batch buffer
↓
Unlock (new messages can accumulate)
↓
Acquire file from pool
↓
Write messages to .log
↓
Update .idx with positions
↓
Sync to disk
↓
Release file back to pool
- Append-only logs: No in-place updates, natural time ordering
- Separate index files: Enable fast random access without scanning
- Per-topic isolation: Independent storage prevents interference
- Binary encoding: Compact size, fast parsing
- Length-prefixing: Reliable message boundaries
Batching Strategy:
- Accumulate writes in memory
- Single syscall for multiple messages
- Amortize fsync cost across batch
Write Amplification Reduction:
// Instead of:
for msg := range messages {
file.Write(msg) // N syscalls
file.Sync() // N fsyncs
}
// We do:
batch := collectMessages(100)
file.Write(batch) // 1 syscall
file.Sync() // 1 fsyncIndex-based Seeking:
// O(1) position lookup
position := index.Positions[offset]
file.Seek(position, 0)
// Read from exact locationSequential Read Optimization:
- OS page cache benefits
- Prefetching by kernel
- Reduced random I/O
Detection:
- Invalid message length (0 or > max)
- Truncated message body
- Deserialization failure
Recovery Strategy:
func recoverFromCorruption(file) {
for position < fileSize {
msg, err := readMessage(position)
if err != nil {
// Scan forward to find next valid message
position = findNextValidMessage(position)
continue
}
recordValidMessage(position)
position += messageSize
}
}Index Rebuild:
# Automatic rebuild on corruption detection
1. Backup corrupt index: topic.idx → topic.idx.corrupt.20250815-143022
2. Scan log file for valid messages
3. Build new index with discovered positions
4. Atomic rename to replace corrupt indexWire Format:
┌─────────────┬──────────────────────┐
│Length (4B) │ Message Body (N) │
│Little-endian│ Serialized Message │
└─────────────┴──────────────────────┘
Message Serialization:
┌───────────┬───────────┬─────────┬───────┬─────────┬───────┬─────────┬────────┐
│Timestamp │TopicLen │Topic │KeyLen │Key │ValueLen│Value │ │
│(8 bytes) │(4 bytes) │(N bytes)│(4B) │(N bytes)│(4B) │(N bytes)│ │
│UnixNano │uint32 │UTF-8 │uint32 │UTF-8 │uint32 │UTF-8 │ │
└───────────┴───────────┴─────────┴───────┴─────────┴───────┴─────────┴────────┘
Advantages:
- Compact: ~38 bytes vs 90 for JSON
- Fast parsing: No string parsing needed
- Type-safe: Fixed field positions
- Streaming-friendly: Length prefix enables reliable framing
Example Size Comparison:
// JSON (90 bytes)
{"timestamp":1691234567890,"topic":"test","key":"user1","value":"hello"}
// Binary (38 bytes)
[4-byte length][8-byte timestamp][4]["test"][5]["user1"][5]["hello"]Success Response:
OK\n
Error Responses:
NACK:CORRUPTED\n
NACK:PROTOCOL_ERROR\n
NACK:INVALID_FORMAT\n
NACK:PUBLISH_FAILED\n
Pre-allocation:
// Avoid repeated allocations
buffer := make([]message.Message, 0, batchSize)Object Pooling:
var bufferPool = sync.Pool{
New: func() interface{} {
return make([]byte, 4096)
},
}Buffered I/O:
reader := bufio.NewReaderSize(conn, 64*1024) // 64KB buffer
writer := bufio.NewWriterSize(conn, 64*1024)Vectored I/O (future enhancement):
// Write multiple buffers in single syscall
writev(fd, [][]byte{header, body}, 2)Lock-free Reading:
// Atomic operations for hot paths
atomic.LoadInt64(&lastAccessTime)
atomic.StoreInt32(&dirty, 1)Read-Write Lock Separation:
type TopicBuffers struct {
mtx sync.RWMutex // Multiple readers, single writer
}
// Readers don't block each other
func (tb *TopicBuffers) GetRecentByTopic(topic string) []Message {
tb.mtx.RLock()
defer tb.mtx.RUnlock()
// ...
}Batch Operations:
- Write multiple messages per syscall
- Update index in bulk
- Amortize fsync across batch
File Handle Caching:
- Keep files open across operations
- LRU eviction for inactive files
- Avoid repeated open/close overhead
TCP_NODELAY (Nagle's algorithm):
tcpConn.SetNoDelay(true) // Disable for lower latencyKeep-Alive:
tcpConn.SetKeepAlive(true)
tcpConn.SetKeepAlivePeriod(30 * time.Second)At-least-once Delivery Guarantee:
- Message received from producer
- Added to memory buffer
- Queued for disk persistence
- ACK sent only after memory buffer
- Eventually persisted to disk
Failure Scenarios:
Crash before disk flush:
- Messages in memory buffer lost
- Producer can retry on ACK timeout
Crash after disk flush:
- Messages recovered from disk
- Consumer continues from last offset
Retry Strategy:
type RetryConfig struct {
MaxAttempts: 3
InitialDelay: 100ms
MaxDelay: 5s
BackoffFactor: 2.0 // Exponential backoff
}Error Classification:
- Transient: Network issues, temporary disk problems → Retry
- Permanent: Invalid data, logic errors → Don't retry
- Resource: Disk full, out of memory → Alert operator
- Configuration: Missing files, bad config → Fail fast
Shutdown Sequence:
1. Signal received (SIGINT/SIGTERM)
↓
2. Stop accepting new connections
↓
3. Drain existing connections
↓
4. Flush all pending batches
↓
5. Close all file handles
↓
6. Save consumer offsets
↓
7. Exit cleanly
Index Corruption:
- Auto-detection via read failures
- Automatic rebuild from log file
- Backup corrupt index for analysis
Log Corruption:
- Skip corrupted sections
- Continue recovery from next valid message
- Log corruption statistics
Consumer Offset Corruption:
- Checksum validation
- Fallback to offset 0 (replay all)
- Manual reset capability
Message Publishing:
Connect to localhost:8080
Send: [Length:4][SerializedMessage:N]
Receive: "OK\n" or "NACK:reason\n"
Health Check:
GET /health
Response: {"status":"ok"}Topic Statistics:
GET /stats?topic=user-events
Response: {
"topic": "user-events",
"buffered": 42,
"persisted": 1337
}Publish Message (REST):
POST /topics/user-events/messages
Content-Type: application/json
{
"key": "user123",
"value": "User logged in",
"timestamp": "2024-01-15T10:30:00Z"
}
Response: {
"status": "accepted",
"topic": "user-events",
"timestamp": "2024-01-15T10:30:00Z",
"key": "user123"
}Metrics (Prometheus):
GET /metrics
# HELP obelisk_messages_received_total Total messages received
# TYPE obelisk_messages_received_total counter
obelisk_messages_received_total{topic="user-events"} 42// Create consumer
consumer := consumer.NewConsumer(
"data/topics", // Base directory
"my-consumer", // Consumer ID
"topic-1", // Topics to subscribe
"topic-2",
)
// Poll for messages
messages, err := consumer.Poll("topic-1")
// Process messages
for _, msg := range messages {
processMessage(msg)
}
// Commit progress
newOffset := currentOffset + uint64(len(messages))
consumer.Commit("topic-1", newOffset)
// Reset to beginning
consumer.Reset("topic-1")Storage Requirements:
Storage = (AvgMessageSize + 8) × MessagesPerSec × Retention
Example: (100 + 8) × 1000 × 86400 = 9.3GB/day
Memory Requirements:
Memory = BaseOverhead + (BufferSize × Topics × AvgMsgSize)
Example: 50MB + (100 × 10 × 100 bytes) = 51MB
File Handle Limits:
# Check current limit
ulimit -n
# Recommended: 2 × NumTopics + Connections + 100
# For 100 topics with 1000 connections: 2×100 + 1000 + 100 = 1300Key Metrics to Monitor:
-
Message Throughput:
-
obelisk_messages_received_total(rate) -
obelisk_messages_stored_total(rate)
-
-
Batch Performance:
-
obelisk_batch_size(histogram) -
obelisk_flush_duration_seconds(histogram)
-
-
Consumer Lag:
-
obelisk_consumer_lag_messages(gauge)
-
-
System Health:
-
obelisk_active_connections(gauge) -
obelisk_disk_write_bytes_total(rate)
-
Alert Conditions:
alerts:
- name: HighConsumerLag
expr: obelisk_consumer_lag_messages > 10000
for: 5m
- name: DiskWriteFailures
expr: rate(obelisk_messages_failed_total[5m]) > 10
- name: ConnectionSaturation
expr: obelisk_active_connections > 900Batching Configuration:
const (
MaxBatchSize = 100 // Messages per batch
MaxBatchWait = 5*Second // Maximum wait time
)Buffer Configuration:
const (
RingBufferSize = 100 // Messages per topic buffer
TopicLimit = 1000 // Maximum topics
)Network Configuration:
const (
ReadTimeout = 30*Second // Connection read timeout
WriteTimeout = 5*Second // ACK write timeout
MaxMessageSize = 10*MB // Maximum message size
)Synchronization Primitives:
-
Mutexes: Protect shared state
type TopicBatch struct { mtx sync.Mutex buffer []Message }
-
RWMutexes: Optimize read-heavy workloads
type OffsetIndex struct { mtx sync.RWMutex Positions []int64 }
-
Atomic Operations: Lock-free counters
atomic.AddInt32(&messageCount, 1)
-
Channels: Coordinate goroutines
quit := make(chan struct{})
Defensive Copying:
// Prevent data races during flush
local := make([]Message, len(batch.buffer))
copy(local, batch.buffer)
batch.buffer = batch.buffer[:0] // Clear originalBoundary Checking:
if offset >= len(index.Positions) {
return ErrOffsetOutOfBounds
}RAII Pattern:
// Resource automatically released
pool.WithFile(path, flags, func(f *File) error {
// File protected from cleanup here
return useFile(f)
})
// File released hereCleanup Guarantees:
defer func() {
file.Close()
mutex.Unlock()
wg.Done()
}()Fail-Fast vs Fail-Safe:
- Configuration errors: Fail fast
- Transient errors: Retry with backoff
- Data errors: Log and skip
- Resource errors: Degrade gracefully
Error Propagation:
// Wrap errors with context
if err != nil {
return fmt.Errorf("flush topic %s: %w", topic, err)
}Chosen: Simple file-based storage Alternative: Embedded database (BoltDB, BadgerDB)
Rationale:
- ✅ Simplicity and debuggability
- ✅ No external dependencies
- ✅ Direct control over I/O patterns
- ❌ Manual index management required
- ❌ No built-in transactions
Chosen: Custom binary protocol Alternative: Protocol Buffers, MessagePack
Rationale:
- ✅ Minimal overhead (38 bytes)
- ✅ Fast serialization
- ✅ No schema dependencies
- ❌ Less flexible than protobuf
- ❌ Manual versioning needed
Chosen: At-least-once semantics Alternative: Exactly-once delivery
Rationale:
- ✅ Simpler implementation
- ✅ Better performance
- ✅ Suitable for most use cases
- ❌ Requires idempotent consumers
- ❌ Possible duplicate processing
Chosen: Single-node design (current) Alternative: Distributed cluster
Rationale:
- ✅ Operational simplicity
- ✅ No consensus overhead
- ✅ Predictable performance
- ❌ Single point of failure
- ❌ Limited horizontal scaling
Chosen: Consumers poll for messages Alternative: Push-based delivery
Rationale:
- ✅ Consumer controls rate
- ✅ Natural backpressure
- ✅ Simple offset management
- ❌ Polling overhead
- ❌ Potential latency
Chosen: Separate files per topic Alternative: Single shared log
Rationale:
- ✅ Independent scaling
- ✅ Isolated failures
- ✅ Parallel operations
- ❌ More file handles
- ❌ Cross-topic ordering lost
Chosen: Dual trigger (size OR time) Alternative: Size-only or time-only
Rationale:
- ✅ Balances latency and throughput
- ✅ Prevents indefinite buffering
- ✅ Predictable behavior
- ❌ More complex logic
- ❌ Two parameters to tune
- TLS encryption for transport security
- Authentication and authorization
- Comprehensive integration tests
- Performance benchmarks
- Docker containerization
- Consumer groups for work distribution
- Message compression (Snappy/LZ4)
- Schema registry for type safety
- Dead letter queues
- Message TTL and compaction
- Multi-node clustering
- Raft consensus for coordination
- Partition rebalancing
- Cross-datacenter replication
- Geo-distributed deployments
- Client libraries (Python, Java, Node.js)
- Kafka protocol compatibility
- Stream processing framework
- Change data capture (CDC)
- Kubernetes operator
Obelisk represents a pragmatic approach to message brokering, prioritizing simplicity and reliability over feature completeness. Its architecture demonstrates that sophisticated distributed systems concepts can be implemented in a maintainable, understandable codebase.
The design choices reflect a clear philosophy:
- Simple is better than complex: File-based storage over embedded databases
- Explicit is better than implicit: Clear ownership and lifecycle management
- Performance through design: Batching, pooling, and buffering at the architectural level
- Operational simplicity: Zero configuration, clear observability
While Obelisk currently operates as a single-node system, its architecture provides a solid foundation for evolution into a distributed system. The clean separation of concerns, comprehensive error handling, and thoughtful abstractions make it an excellent learning platform and a viable solution for many real-world use cases.
The journey from a simple message broker to a distributed streaming platform is well-mapped, with each phase building naturally on the previous foundation. This design document serves as both a current state description and a roadmap for future development.