Skip to content

Commit effb57a

Browse files
committed
cre-1601: hash ring pure function refactor for both storage and observation
1 parent d7db767 commit effb57a

File tree

3 files changed

+72
-70
lines changed

3 files changed

+72
-70
lines changed

pkg/workflows/ring/plugin.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,10 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
191191
}
192192

193193
// Use deterministic hashing to assign workflows to shards
194+
// This must be a pure function of consensus-derived data to avoid protocol failures
194195
routes := make(map[string]*pb.WorkflowRoute)
195196
for _, wfID := range allWorkflows {
196-
assignedShard := p.store.GetShardForWorkflow(wfID)
197+
assignedShard := getShardForWorkflow(wfID, healthyShardCount)
197198
routes[wfID] = &pb.WorkflowRoute{
198199
Shard: assignedShard,
199200
}

pkg/workflows/ring/store.go

Lines changed: 14 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -2,34 +2,15 @@ package ring
22

33
import (
44
"slices"
5-
"strconv"
65
"sync"
7-
8-
"github.com/buraksezer/consistent"
9-
"github.com/cespare/xxhash/v2"
106
)
117

12-
// xxhashHasher implements the consistent.Hasher interface using xxhash
13-
type xxhashHasher struct{}
14-
15-
func (h xxhashHasher) Sum64(data []byte) uint64 {
16-
return xxhash.Sum64(data)
17-
}
18-
19-
// ShardMember implements consistent.Member for shard IDs
20-
type ShardMember string
21-
22-
func (m ShardMember) String() string {
23-
return string(m)
24-
}
25-
268
// Store manages shard routing state and workflow mappings
279
type Store struct {
28-
routingState map[string]uint32 // workflow_id -> shard_id
29-
shardHealth map[uint32]bool // shard_id -> is_healthy
30-
consistentHash *consistent.Consistent // Consistent hash ring for routing
31-
healthyShards []uint32 // Sorted list of healthy shards
32-
mu sync.Mutex
10+
routingState map[string]uint32 // workflow_id -> shard_id
11+
shardHealth map[uint32]bool // shard_id -> is_healthy
12+
healthyShards []uint32 // Sorted list of healthy shards
13+
mu sync.Mutex
3314
}
3415

3516
func NewStore() *Store {
@@ -41,29 +22,15 @@ func NewStore() *Store {
4122
}
4223
}
4324

44-
// consistentHashConfig returns the configuration for consistent hashing
45-
// Matches prototype: PartitionCount=997 (prime), ReplicationFactor=50, Load=1.1
46-
func consistentHashConfig() consistent.Config {
47-
return consistent.Config{
48-
PartitionCount: 997, // Prime number for better distribution
49-
ReplicationFactor: 50, // Number of replicas per node
50-
Load: 1.1, // Load factor for bounded loads
51-
Hasher: xxhashHasher{},
52-
}
53-
}
54-
55-
// updateConsistentHash rebuilds the consistent hash ring based on healthy shards
56-
func (s *Store) updateConsistentHash() {
25+
// updateHealthyShards rebuilds the sorted list of healthy shards
26+
func (s *Store) updateHealthyShards() {
5727
s.mu.Lock()
5828
defer s.mu.Unlock()
5929

60-
// Get list of healthy shards and create members
61-
healthyMembers := make([]consistent.Member, 0)
6230
s.healthyShards = make([]uint32, 0)
6331

6432
for shardID, healthy := range s.shardHealth {
6533
if healthy {
66-
healthyMembers = append(healthyMembers, ShardMember(strconv.FormatUint(uint64(shardID), 10)))
6734
s.healthyShards = append(s.healthyShards, shardID)
6835
}
6936
}
@@ -72,39 +39,18 @@ func (s *Store) updateConsistentHash() {
7239
slices.Sort(s.healthyShards)
7340

7441
// If no healthy shards, add shard 0 as fallback
75-
if len(healthyMembers) == 0 {
76-
healthyMembers = append(healthyMembers, ShardMember("0"))
42+
if len(s.healthyShards) == 0 {
7743
s.healthyShards = []uint32{0}
7844
}
79-
80-
// Create consistent hash ring
81-
s.consistentHash = consistent.New(healthyMembers, consistentHashConfig())
8245
}
8346

8447
// GetShardForWorkflow deterministically assigns a workflow to a shard using consistent hashing.
85-
// The assignment uses the same algorithm as the prototype: xxhash + consistent hashing ring.
8648
func (s *Store) GetShardForWorkflow(workflowID string) uint32 {
8749
s.mu.Lock()
88-
defer s.mu.Unlock()
89-
90-
if s.consistentHash == nil {
91-
// Fallback if hash ring not initialized
92-
return 0
93-
}
94-
95-
// Use consistent hashing to find the member for this workflow
96-
member := s.consistentHash.LocateKey([]byte(workflowID))
97-
if member == nil {
98-
return 0
99-
}
100-
101-
// Parse shard ID from member name
102-
shardID, err := strconv.ParseUint(member.String(), 10, 32)
103-
if err != nil {
104-
return 0
105-
}
50+
shardCount := uint32(len(s.healthyShards))
51+
s.mu.Unlock()
10652

107-
return uint32(shardID)
53+
return getShardForWorkflow(workflowID, shardCount)
10854
}
10955

11056
func (s *Store) SetShardForWorkflow(workflowID string, shardID uint32) {
@@ -128,8 +74,8 @@ func (s *Store) SetShardHealth(shardID uint32, healthy bool) {
12874
s.shardHealth[shardID] = healthy
12975
s.mu.Unlock()
13076

131-
// Rebuild consistent hash ring when shard health changes
132-
s.updateConsistentHash()
77+
// Rebuild healthy shards list when shard health changes
78+
s.updateHealthyShards()
13379
}
13480

13581
func (s *Store) SetAllShardHealth(health map[uint32]bool) {
@@ -140,8 +86,8 @@ func (s *Store) SetAllShardHealth(health map[uint32]bool) {
14086
}
14187
s.mu.Unlock()
14288

143-
// Rebuild consistent hash ring
144-
s.updateConsistentHash()
89+
// Rebuild healthy shards list
90+
s.updateHealthyShards()
14591
}
14692

14793
func (s *Store) GetAllRoutingState() map[string]uint32 {

pkg/workflows/ring/utils.go

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,65 @@
11
package ring
22

3-
import "slices"
3+
import (
4+
"slices"
5+
"strconv"
6+
7+
"github.com/buraksezer/consistent"
8+
"github.com/cespare/xxhash/v2"
9+
)
410

511
func uniqueSorted(s []string) []string {
612
result := slices.Clone(s)
713
slices.Sort(result)
814
return slices.Compact(result)
915
}
1016

17+
type xxhashHasher struct{}
18+
19+
func (h xxhashHasher) Sum64(data []byte) uint64 {
20+
return xxhash.Sum64(data)
21+
}
22+
23+
type ShardMember string
24+
25+
func (m ShardMember) String() string {
26+
return string(m)
27+
}
28+
29+
func consistentHashConfig() consistent.Config {
30+
return consistent.Config{
31+
PartitionCount: 997, // Prime number for better distribution
32+
ReplicationFactor: 50, // Number of replicas per node
33+
Load: 1.1, // Load factor for bounded loads
34+
Hasher: xxhashHasher{},
35+
}
36+
}
37+
38+
func getShardForWorkflow(workflowID string, shardCount uint32) uint32 {
39+
if shardCount == 0 {
40+
return 0
41+
}
42+
43+
// Create members for shards 0 to shardCount-1
44+
members := make([]consistent.Member, shardCount)
45+
for i := uint32(0); i < shardCount; i++ {
46+
members[i] = ShardMember(strconv.FormatUint(uint64(i), 10))
47+
}
48+
49+
// Create consistent hash ring
50+
ring := consistent.New(members, consistentHashConfig())
51+
52+
// Use consistent hashing to find the member for this workflow
53+
member := ring.LocateKey([]byte(workflowID))
54+
if member == nil {
55+
return 0
56+
}
57+
58+
// Parse shard ID from member name
59+
shardID, err := strconv.ParseUint(member.String(), 10, 32)
60+
if err != nil {
61+
return 0
62+
}
63+
64+
return uint32(shardID)
65+
}

0 commit comments

Comments
 (0)