Skip to content

Commit 916a0b6

Browse files
authored
fix realtime (#14)
1 parent f22fc7d commit 916a0b6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+3125
-414
lines changed

.claude/settings.local.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@
2020
"Bash(git diff:*)",
2121
"Bash(dprint fmt:*)",
2222
"Bash(DEBUG=true go test -run TestIndexMetrics -v)",
23-
"Bash(mkdir:*)"
23+
"Bash(mkdir:*)",
24+
"Bash(COMET_DEBUG=true go test -timeout 30s -run \"TestRealtimeVisibilityDebug\" -v)"
2425
],
2526
"deny": []
2627
}
27-
}
28+
}

ARCHITECTURE.md

Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ Comet is a high-performance embedded segmented log designed for edge observabili
2323
├─────────────────────┤ ├──────────────────────┤
2424
│ - nextEntryNumber │ │ - Index │
2525
│ - pendingWrites │ │ - Data Files │
26-
│ - writeBuffers │ │ - Consumer Offsets
27-
│ - fileSize │ │ - State File
26+
│ - writeBuffers │ │ - State File
27+
│ - fileSize │ │
2828
└─────────────────────┘ └──────────────────────┘
2929
│ ▲
3030
│ │
@@ -93,13 +93,20 @@ Each shard represents an independent data stream partition with strict state sep
9393
│ Durable State: │
9494
│ - Binary searchable index │
9595
│ - Memory-mapped state file │
96-
│ - Consumer offsets │
9796
│ - File metadata │
97+
│ │
98+
│ Consumer State (Separate): │
99+
│ - Memory-mapped offset file │
100+
│ - Lock-free consumer tracking │
98101
└──────────────────────────────────────┘
99102
│ │ │
100103
▼ ▼ ▼
101104
Data Files Index File State File
102105
(.comet) (.bin) (.state)
106+
107+
108+
Consumer Offsets
109+
(offsets.state)
103110
```
104111

105112
**Critical State Management:**
@@ -142,7 +149,8 @@ Key features:
142149

143150
- **Exactly-once semantics**: Through ACK tracking
144151
- **Deterministic assignment**: Consistent hashing for multi-consumer
145-
- **In-memory offsets**: Fast read-after-ACK consistency
152+
- **Separate offset storage**: Consumer offsets stored independently from writer's index
153+
- **Lock-free offset updates**: Memory-mapped storage for multi-process safety
146154
- **Batch operations**: Amortizes overhead across messages
147155

148156
### 4. Reader
@@ -244,14 +252,14 @@ For each shard:
244252
├─→ Binary search index (durable entries only)
245253
├─→ Read entries from mapped files
246254
├─→ Decompress if needed
247-
└─→ Update consumer offsets (on ACK)
255+
└─→ Update consumer offsets in mmap (on ACK)
248256
```
249257

250258
**Key Points:**
251259

252260
- Consumers only see entries in index.CurrentEntryNumber
253261
- Reader cache automatically detects stale mappings
254-
- Consumer offsets are persisted with index
262+
- Consumer offsets are stored separately in memory-mapped files
255263
- No unflushed/pending data is ever visible
256264

257265
### Retention Path
@@ -271,6 +279,8 @@ Client.cleanupShard()
271279

272280
## Memory-Mapped State
273281

282+
### Writer State (comet.state)
283+
274284
Each shard maintains a 1KB state file with cache-line aligned sections:
275285

276286
```
@@ -362,6 +372,43 @@ Benefits:
362372
- **Atomic access**: Lock-free updates
363373
- **Fixed size**: Predictable memory usage
364374

375+
### Consumer Offset Storage (offsets.state)
376+
377+
Each shard maintains a separate 64KB memory-mapped file for consumer offsets:
378+
379+
```
380+
┌─────────────────────────┐ Header (64 bytes)
381+
│ Version (4B) │ Format version (1)
382+
│ Magic (4B) │ 0xC0FE0FF5
383+
│ Reserved (56B) │ Future expansion
384+
├─────────────────────────┤ Consumer Entries (512 × 128 bytes)
385+
│ Entry 0 (128B) │ First consumer group
386+
│ ├─ GroupName (48B) │ Null-terminated string
387+
│ ├─ Offset (8B) │ Current consumer offset
388+
│ ├─ LastUpdate (8B) │ Unix nano timestamp
389+
│ ├─ AckCount (8B) │ Total acknowledgments
390+
│ └─ Reserved (56B) │ Future use
391+
│ Entry 1 (128B) │ Second consumer group
392+
│ ... │
393+
│ Entry 511 (128B) │ Last consumer group
394+
└─────────────────────────┘
395+
```
396+
397+
**Key Features:**
398+
399+
- **Lock-free access**: Atomic operations for multi-process safety
400+
- **512 consumer groups**: Per shard with linear probing hash table
401+
- **Memory-mapped**: Changes visible immediately across processes
402+
- **Cache-line aligned**: Each entry is exactly 2 cache lines (128 bytes)
403+
- **Automatic migration**: From old file-based format to mmap format
404+
405+
**Consumer Group Management:**
406+
407+
- Groups allocated using FNV-1a hash with linear probing
408+
- Empty slots detected by null GroupName[0]
409+
- Atomic slot claiming prevents race conditions
410+
- No explicit locking required for reads or writes
411+
365412
## Wire Format
366413

367414
Each entry follows a simple, efficient format:
@@ -403,11 +450,6 @@ Binary format for fast lookups and persistence:
403450
├────────────────────────────┤
404451
│ File count (4B) │
405452
├────────────────────────────┤
406-
│ Consumer offsets │ For each consumer:
407-
│ - Group name length (1B) │ - Length of group name
408-
│ - Group name (N bytes) │ - UTF-8 group name
409-
│ - Offset (8B) │ - Consumer offset
410-
├────────────────────────────┤
411453
│ Binary search nodes │ For each node (20B):
412454
│ - Entry number (8B) │ - Entry number
413455
│ - File index (4B) │ - Index into files array
@@ -511,13 +553,13 @@ For each shard:
511553

512554
- All data flushed to disk
513555
- Index reflects actual state
514-
- Consumer offsets saved
556+
- Consumer offsets preserved in separate memory-mapped files
515557

516558
**After Crash:**
517559

518560
- Only synced data is recoverable
519561
- Index rebuilt from actual files
520-
- Consumer offsets reset to last known good state
562+
- Consumer offsets preserved independently in offsets.state files
521563
- Unflushed writes are lost (by design)
522564

523565
### Recovery Scenarios
@@ -564,6 +606,6 @@ This design ensures that even after catastrophic failures, Comet can recover to
564606

565607
### Resource Usage
566608

567-
- **Memory**: ~1KB state + configurable cache
609+
- **Memory**: ~65KB state + configurable memory-mapped file cache
568610
- **Disk**: Efficient compression, automatic cleanup
569611
- **CPU**: Minimal (compression optional)

CLAUDE.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,23 @@ This architecture ensures:
8383

8484
For a more detailed explanation, please refer to the [Architecture](ARCHITECTURE.md) document.
8585

86+
### Consumer Offset Storage
87+
88+
**IMPORTANT**: Consumer offsets are stored separately from the writer's index to maintain architectural separation:
89+
90+
- **Location**: `shard-XXXX/offsets.state` (memory-mapped file)
91+
- **Structure**: Fixed-size slots for up to 512 consumer groups per shard
92+
- **Access**: Lock-free using atomic operations for multi-process safety
93+
- **Migration**: Automatically migrates from old `offsets.bin` format if found
94+
95+
Key points:
96+
97+
- Consumers NEVER write to the writer's `index.bin` file
98+
- Offsets are immediately visible across processes (memory-mapped)
99+
- No explicit sync needed - OS handles memory-mapped file persistence
100+
- Consumer group names limited to 48 bytes
101+
- Maximum 512 consumer groups per shard
102+
86103
## Configuration
87104

88105
The package uses a hierarchical configuration structure:
@@ -139,3 +156,4 @@ deadcode -test ./...
139156
- Tests should be comprehensive and cover failure modes, resource exhaustion, and the _critical_ edge cases.
140157
- To run the benchmarks we care _most_ about not regressing, use `mise run bench:core`
141158
- Run `go vet ./...`, `go test ./...`, `go test ./... -bench=. -benchmem` after significant changes.
159+
- Use `COMET_DEBUG=1` to enable debug logging.

ack_concurrent_test.go

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,12 @@ func TestACKConcurrentPersistence(t *testing.T) {
7373
// Check offsets before close
7474
shard, _ := client.getOrCreateShard(0)
7575
shard.mu.RLock()
76-
offset0 := shard.index.ConsumerOffsets["consumer-0"]
77-
offset1 := shard.index.ConsumerOffsets["consumer-1"]
76+
offset0 := int64(0)
77+
offset1 := int64(0)
78+
if shard.offsetMmap != nil {
79+
offset0, _ = shard.offsetMmap.Get("consumer-0")
80+
offset1, _ = shard.offsetMmap.Get("consumer-1")
81+
}
7882
shard.mu.RUnlock()
7983

8084
t.Logf("Before close: consumer-0 offset=%d, consumer-1 offset=%d", offset0, offset1)
@@ -97,15 +101,23 @@ func TestACKConcurrentPersistence(t *testing.T) {
97101

98102
shard2.mu.RLock()
99103
// Both should be persisted when using the same client
100-
if shard2.index.ConsumerOffsets["consumer-0"] != 10 {
101-
t.Errorf("Consumer 0: Expected offset 10, got %d", shard2.index.ConsumerOffsets["consumer-0"])
102-
}
103-
if shard2.index.ConsumerOffsets["consumer-1"] != 10 {
104-
t.Errorf("Consumer 1: Expected offset 10, got %d", shard2.index.ConsumerOffsets["consumer-1"])
104+
offset0Check := int64(0)
105+
offset1Check := int64(0)
106+
if shard2.offsetMmap != nil {
107+
offset0Check, _ = shard2.offsetMmap.Get("consumer-0")
108+
offset1Check, _ = shard2.offsetMmap.Get("consumer-1")
105109
}
106-
t.Logf("✓ Sequential test passed: both consumer offsets persisted correctly")
107-
108110
shard2.mu.RUnlock()
111+
112+
if offset0Check != 10 {
113+
t.Errorf("Consumer 0: Expected offset 10, got %d", offset0Check)
114+
}
115+
if offset1Check != 10 {
116+
t.Errorf("Consumer 1: Expected offset 10, got %d", offset1Check)
117+
}
118+
if offset0Check == 10 && offset1Check == 10 {
119+
t.Logf("✓ Sequential test passed: both consumer offsets persisted correctly")
120+
}
109121
client2.Close()
110122

111123
// Now test the problematic scenario with separate clients
@@ -118,10 +130,11 @@ func TestACKConcurrentPersistence(t *testing.T) {
118130
}
119131
shard3, _ := client3.getOrCreateShard(0)
120132
shard3.mu.Lock()
121-
delete(shard3.index.ConsumerOffsets, "consumer-0")
122-
delete(shard3.index.ConsumerOffsets, "consumer-1")
133+
if shard3.offsetMmap != nil {
134+
shard3.offsetMmap.Remove("consumer-0")
135+
shard3.offsetMmap.Remove("consumer-1")
136+
}
123137
shard3.mu.Unlock()
124-
shard3.persistIndex()
125138
client3.Close()
126139

127140
// Run concurrent consumers with separate clients
@@ -177,8 +190,12 @@ func TestACKConcurrentPersistence(t *testing.T) {
177190

178191
shard4, _ := client4.getOrCreateShard(0)
179192
shard4.mu.RLock()
180-
offset0Final := shard4.index.ConsumerOffsets["consumer-0"]
181-
offset1Final := shard4.index.ConsumerOffsets["consumer-1"]
193+
offset0Final := int64(0)
194+
offset1Final := int64(0)
195+
if shard4.offsetMmap != nil {
196+
offset0Final, _ = shard4.offsetMmap.Get("consumer-0")
197+
offset1Final, _ = shard4.offsetMmap.Get("consumer-1")
198+
}
182199
shard4.mu.RUnlock()
183200

184201
t.Logf("After concurrent clients: consumer-0 offset=%d, consumer-1 offset=%d", offset0Final, offset1Final)
@@ -262,13 +279,19 @@ func TestACKConcurrentPersistence(t *testing.T) {
262279
// Check shard 0
263280
shard0, _ := client6.getOrCreateShard(0)
264281
shard0.mu.RLock()
265-
offset0MP := shard0.index.ConsumerOffsets["mp-consumer-0"]
282+
offset0MP := int64(0)
283+
if shard0.offsetMmap != nil {
284+
offset0MP, _ = shard0.offsetMmap.Get("mp-consumer-0")
285+
}
266286
shard0.mu.RUnlock()
267287

268288
// Check shard 1
269289
shard1, _ := client6.getOrCreateShard(1)
270290
shard1.mu.RLock()
271-
offset1MP := shard1.index.ConsumerOffsets["mp-consumer-1"]
291+
offset1MP := int64(0)
292+
if shard1.offsetMmap != nil {
293+
offset1MP, _ = shard1.offsetMmap.Get("mp-consumer-1")
294+
}
272295
shard1.mu.RUnlock()
273296

274297
t.Logf("With proper multi-process config: mp-consumer-0 offset=%d, mp-consumer-1 offset=%d", offset0MP, offset1MP)

ack_persistence_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ func TestACKPersistenceBug(t *testing.T) {
3131
if err != nil {
3232
t.Fatal(err)
3333
}
34+
35+
// Sync to ensure messages are durable
36+
if err := client1.Sync(ctx); err != nil {
37+
t.Fatal(err)
38+
}
39+
3440
client1.Close()
3541

3642
// Step 2: Process and ACK some messages

alignment_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ func TestStructAlignment(t *testing.T) {
4747
// Reader internal structs
4848
{"recentFileCache", recentFileCache{}},
4949
{"cacheItem", cacheItem{}},
50+
// Consumer offset structs
51+
{"ConsumerOffsetHeader", ConsumerOffsetHeader{}},
52+
{"ConsumerEntry", ConsumerEntry{}},
53+
{"ConsumerOffsetMmap", ConsumerOffsetMmap{}},
5054
}
5155

5256
for _, tt := range tests {

benchmarks_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1367,7 +1367,7 @@ func BenchmarkAllocation_CompressionImpact(b *testing.B) {
13671367
cfg.Compression.MinCompressSize = config.threshold
13681368
cfg.Indexing.BoundaryInterval = 100
13691369
cfg.Storage.MaxFileSize = 1 << 30
1370-
cfg.Storage.CheckpointTime = 2000
1370+
cfg.Storage.CheckpointInterval = 2 * time.Second
13711371
client, err := NewClient(dir, cfg)
13721372
if err != nil {
13731373
b.Fatalf("failed to create client: %v", err)

browse_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ func TestBrowseConcurrentAccess(t *testing.T) {
295295
config := DefaultCometConfig()
296296
config.Concurrency.ProcessCount = 0 // Ensure single-process mode
297297
// Use frequent checkpoints to ensure data is persisted
298-
config.Storage.CheckpointTime = 10
298+
config.Storage.CheckpointInterval = 10 * time.Millisecond
299299
client, err := NewClient(dir, config)
300300
if err != nil {
301301
t.Fatal(err)

0 commit comments

Comments
 (0)