Skip to content

Commit 4646cd5

Browse files
authored
fix cold shard discovery (#15)
1 parent 916a0b6 commit 4646cd5

24 files changed

+1387
-270
lines changed

.claude/settings.local.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@
2121
"Bash(dprint fmt:*)",
2222
"Bash(DEBUG=true go test -run TestIndexMetrics -v)",
2323
"Bash(mkdir:*)",
24-
"Bash(COMET_DEBUG=true go test -timeout 30s -run \"TestRealtimeVisibilityDebug\" -v)"
24+
"Bash(COMET_DEBUG=true go test -timeout 30s -run \"TestRealtimeVisibilityDebug\" -v)",
25+
"Bash(od:*)"
2526
],
2627
"deny": []
2728
}
28-
}
29+
}

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -408,9 +408,12 @@ config.Concurrency.ProcessCount = 4 // 64 shards per process
408408
```go
409409
// Helper functions for shard management
410410
stream := client.PickShardStream("events:v1", uniqueKey, 256) // One-liner
411-
shardID := client.PickShard(uniqueKey, 256) // Get shard ID (0-255)
411+
shardID := client.PickShard(uniqueKey, 256) // Get shard ID
412412
streamName := comet.ShardStreamName("events:v1", shardID) // "events:v1:00FF"
413413

414+
// In multi-process mode, PickShard returns only shards owned by this process
415+
// In single-process mode, returns any shard from 0 to shardCount-1
416+
414417
// Get all shards for parallel processing
415418
shardIDs := comet.AllShardsRange(256) // [0, 1, ..., 255]
416419
streams := comet.AllShardStreams("events", "v1", 256) // All stream names

alignment_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,15 @@ func TestStructAlignment(t *testing.T) {
5151
{"ConsumerOffsetHeader", ConsumerOffsetHeader{}},
5252
{"ConsumerEntry", ConsumerEntry{}},
5353
{"ConsumerOffsetMmap", ConsumerOffsetMmap{}},
54+
// Additional structs that were missing
55+
{"CometHealth", CometHealth{}},
56+
{"EntryIndexMetadata", EntryIndexMetadata{}},
57+
{"LogConfig", LogConfig{}},
58+
{"StdLogger", StdLogger{}},
59+
{"SlogAdapter", SlogAdapter{}},
60+
{"processConfig", processConfig{}},
61+
{"ShardDiagnostics", ShardDiagnostics{}},
62+
// NoOpLogger is empty so no alignment concerns
5463
}
5564

5665
for _, tt := range tests {

benchmarks_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,7 @@ func BenchmarkConsumer_SequentialRead(b *testing.B) {
605605

606606
for i := 0; i < b.N; i++ {
607607
// Reset consumer offset for each iteration
608-
err := consumer.ResetOffset(ctx, 1, 0)
608+
err := consumer.ResetOffset(ctx, 0, 0)
609609
if err != nil {
610610
b.Fatalf("failed to reset offset: %v", err)
611611
}
@@ -677,7 +677,7 @@ func BenchmarkConsumer_RandomAccess(b *testing.B) {
677677
for i := 0; i < b.N; i++ {
678678
// Seek to a random position
679679
randomOffset := int64(i % 400) // Random offset within range
680-
err := consumer.ResetOffset(ctx, 1, randomOffset)
680+
err := consumer.ResetOffset(ctx, 0, randomOffset)
681681
if err != nil {
682682
b.Fatalf("failed to reset offset: %v", err)
683683
}
@@ -885,7 +885,7 @@ func BenchmarkSimpleConsumerRead(b *testing.B) {
885885
}
886886
} else {
887887
// Reset to beginning if we've consumed all
888-
err = consumer.ResetOffset(ctx, 1, 0)
888+
err = consumer.ResetOffset(ctx, 0, 0)
889889
if err != nil {
890890
b.Fatalf("failed to reset offset: %v", err)
891891
}
@@ -2280,10 +2280,10 @@ func runBenchmarkWorker(b *testing.B) {
22802280
// In multi-process mode, each process writes to shards it owns
22812281
// Use multiple stream names to distribute across shards
22822282
streamNames := []string{
2283-
"bench:v1:user:0",
2284-
"bench:v1:user:1",
2285-
"bench:v1:user:2",
2286-
"bench:v1:user:3",
2283+
"bench:v1:user:0000",
2284+
"bench:v1:user:0001",
2285+
"bench:v1:user:0002",
2286+
"bench:v1:user:0003",
22872287
}
22882288

22892289
batch := make([][]byte, 100)

0 commit comments

Comments
 (0)