Skip to content

Commit de0d9f0

Browse files
committed
cre-1601: consistent hashing and plugin test
1 parent 9c4741b commit de0d9f0

File tree

4 files changed

+403
-32
lines changed

4 files changed

+403
-32
lines changed

pkg/workflows/shardorchestrator/plugin.go

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ func NewPlugin(store *Store, config ocr3types.ReportingPluginConfig, lggr logger
4444
if cfg == nil {
4545
cfg = &ConsensusConfig{
4646
MinShardCount: 1,
47-
MaxShardCount: 10,
48-
BatchSize: 100,
47+
MaxShardCount: 100,
48+
BatchSize: 1000,
4949
}
5050
}
5151

@@ -101,8 +101,8 @@ func (p *Plugin) ObservationQuorum(_ context.Context, _ ocr3types.OutcomeContext
101101

102102
func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) {
103103
prevOutcome := &pb.Outcome{}
104-
if err := proto.Unmarshal(outctx.PreviousOutcome, prevOutcome); err != nil {
105-
p.lggr.Warnf("failed to unmarshal previous outcome: %v", err)
104+
if err := proto.Unmarshal(outctx.PreviousOutcome, prevOutcome); err != nil || prevOutcome.State == nil {
105+
p.lggr.Warnf("failed to unmarshal previous outcome or state is nil")
106106
prevOutcome = &pb.Outcome{
107107
State: &pb.RoutingState{
108108
Id: 0,
@@ -116,8 +116,9 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
116116

117117
currentShardHealth := make(map[uint32]int)
118118
totalObservations := len(aos)
119+
allWorkflows := []string{}
119120

120-
// Collect shard health observations
121+
// Collect shard health observations and workflows
121122
for _, ao := range aos {
122123
observation := &pb.Observation{}
123124
if err := proto.Unmarshal(ao.Observation, observation); err != nil {
@@ -130,13 +131,29 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
130131
currentShardHealth[shardID]++
131132
}
132133
}
134+
135+
// Collect workflow IDs
136+
allWorkflows = append(allWorkflows, observation.Hashes...)
133137
}
134138

139+
// Deduplicate workflows
140+
workflowMap := make(map[string]bool)
141+
for _, wf := range allWorkflows {
142+
workflowMap[wf] = true
143+
}
144+
allWorkflows = make([]string, 0, len(workflowMap))
145+
for wf := range workflowMap {
146+
allWorkflows = append(allWorkflows, wf)
147+
}
148+
slices.Sort(allWorkflows) // Ensure deterministic order
149+
135150
// Determine desired shard count based on observations
136151
healthyShardCount := uint32(0)
137-
for _, count := range currentShardHealth {
152+
for shardID, count := range currentShardHealth {
138153
if count > int(p.config.F) {
139154
healthyShardCount++
155+
// Update store with healthy shard
156+
p.store.SetShardHealth(shardID, true)
140157
}
141158
}
142159

@@ -148,6 +165,15 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
148165
healthyShardCount = p.maxShardCount
149166
}
150167

168+
// Use deterministic hashing to assign workflows to shards
169+
routes := make(map[string]*pb.WorkflowRoute)
170+
for _, wfID := range allWorkflows {
171+
assignedShard := p.store.GetShardForWorkflow(wfID)
172+
routes[wfID] = &pb.WorkflowRoute{
173+
Shard: assignedShard,
174+
}
175+
}
176+
151177
// Update routing state
152178
outcome := &pb.Outcome{
153179
State: &pb.RoutingState{
@@ -156,17 +182,10 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
156182
RoutableShards: healthyShardCount,
157183
},
158184
},
159-
Routes: make(map[string]*pb.WorkflowRoute),
185+
Routes: routes,
160186
}
161187

162-
p.lggr.Infow("Consensus Outcome", "healthyShards", healthyShardCount, "totalObservations", totalObservations)
163-
164-
// Copy previous routes
165-
if prevOutcome.Routes != nil {
166-
for wfID, route := range prevOutcome.Routes {
167-
outcome.Routes[wfID] = route
168-
}
169-
}
188+
p.lggr.Infow("Consensus Outcome", "healthyShards", healthyShardCount, "totalObservations", totalObservations, "workflowCount", len(routes))
170189

171190
return proto.MarshalOptions{Deterministic: true}.Marshal(outcome)
172191
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package shardorchestrator
2+
3+
import (
4+
"testing"
5+
6+
"github.com/smartcontractkit/libocr/commontypes"
7+
"github.com/stretchr/testify/require"
8+
"google.golang.org/protobuf/proto"
9+
10+
"github.com/smartcontractkit/chainlink-common/pkg/logger"
11+
"github.com/smartcontractkit/chainlink-common/pkg/workflows/shardorchestrator/pb"
12+
"github.com/smartcontractkit/libocr/offchainreporting2/types"
13+
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
14+
)
15+
16+
func TestPlugin_OutcomeWithMultiNodeObservations(t *testing.T) {
17+
lggr := logger.Test(t)
18+
store := NewStore()
19+
store.SetAllShardHealth(map[uint32]bool{0: true, 1: true, 2: true})
20+
21+
config := ocr3types.ReportingPluginConfig{
22+
N: 4, F: 1,
23+
OffchainConfig: []byte{},
24+
MaxDurationObservation: 0,
25+
MaxDurationShouldAcceptAttestedReport: 0,
26+
MaxDurationShouldTransmitAcceptedReport: 0,
27+
}
28+
29+
plugin, err := NewPlugin(store, config, lggr, nil)
30+
require.NoError(t, err)
31+
32+
ctx := t.Context()
33+
outcomeCtx := ocr3types.OutcomeContext{PreviousOutcome: []byte("")}
34+
35+
// Observations from 4 NOPs reporting health and workflows
36+
observations := []struct {
37+
name string
38+
shardHealth map[uint32]bool
39+
workflows []string
40+
}{
41+
{
42+
name: "NOP 0",
43+
shardHealth: map[uint32]bool{0: true, 1: true, 2: true},
44+
workflows: []string{"wf-A", "wf-B", "wf-C"},
45+
},
46+
{
47+
name: "NOP 1",
48+
shardHealth: map[uint32]bool{0: true, 1: true, 2: true},
49+
workflows: []string{"wf-B", "wf-C", "wf-D"},
50+
},
51+
{
52+
name: "NOP 2",
53+
shardHealth: map[uint32]bool{0: true, 1: true, 2: false}, // shard 2 unhealthy
54+
workflows: []string{"wf-A", "wf-C"},
55+
},
56+
{
57+
name: "NOP 3",
58+
shardHealth: map[uint32]bool{0: true, 1: true, 2: true},
59+
workflows: []string{"wf-A", "wf-B", "wf-D"},
60+
},
61+
}
62+
63+
// Build attributed observations
64+
aos := make([]types.AttributedObservation, 0)
65+
for _, obs := range observations {
66+
pbObs := &pb.Observation{
67+
Status: obs.shardHealth,
68+
Hashes: obs.workflows,
69+
}
70+
rawObs, err := proto.Marshal(pbObs)
71+
require.NoError(t, err)
72+
73+
aos = append(aos, types.AttributedObservation{
74+
Observation: rawObs,
75+
Observer: commontypes.OracleID(len(aos)),
76+
})
77+
}
78+
79+
// Execute Outcome phase
80+
outcome, err := plugin.Outcome(ctx, outcomeCtx, nil, aos)
81+
require.NoError(t, err)
82+
require.NotNil(t, outcome)
83+
84+
// Verify outcome
85+
outcomeProto := &pb.Outcome{}
86+
err = proto.Unmarshal(outcome, outcomeProto)
87+
require.NoError(t, err)
88+
89+
// Check consensus results
90+
t.Logf("Outcome - ID: %d, HealthyShards: %d", outcomeProto.State.Id, outcomeProto.State.GetRoutableShards())
91+
t.Logf("Workflows assigned: %d", len(outcomeProto.Routes))
92+
93+
// Verify all workflows are assigned
94+
expectedWorkflows := map[string]bool{"wf-A": true, "wf-B": true, "wf-C": true, "wf-D": true}
95+
require.Equal(t, len(expectedWorkflows), len(outcomeProto.Routes))
96+
for wf := range expectedWorkflows {
97+
route, exists := outcomeProto.Routes[wf]
98+
require.True(t, exists, "workflow %s should be assigned", wf)
99+
require.True(t, route.Shard <= 2, "shard should be healthy (0-2)")
100+
t.Logf(" %s → shard %d", wf, route.Shard)
101+
}
102+
103+
// Verify determinism: run again, should get same assignments
104+
outcome2, err := plugin.Outcome(ctx, outcomeCtx, nil, aos)
105+
require.NoError(t, err)
106+
107+
outcomeProto2 := &pb.Outcome{}
108+
err = proto.Unmarshal(outcome2, outcomeProto2)
109+
require.NoError(t, err)
110+
111+
// Same workflows → same shards
112+
for wf, route1 := range outcomeProto.Routes {
113+
route2, exists := outcomeProto2.Routes[wf]
114+
require.True(t, exists)
115+
require.Equal(t, route1.Shard, route2.Shard, "workflow %s should assign to same shard", wf)
116+
}
117+
}

pkg/workflows/shardorchestrator/store.go

Lines changed: 113 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,110 @@
11
package shardorchestrator
22

33
import (
4-
"maps"
4+
"slices"
5+
"strconv"
56
"sync"
6-
"time"
7+
8+
"github.com/buraksezer/consistent"
9+
"github.com/cespare/xxhash/v2"
710
)
811

9-
var DefaultRequestTimeout = 20 * time.Minute
12+
// xxhashHasher implements the consistent.Hasher interface using xxhash
13+
type xxhashHasher struct{}
14+
15+
func (h xxhashHasher) Sum64(data []byte) uint64 {
16+
return xxhash.Sum64(data)
17+
}
18+
19+
// ShardMember implements consistent.Member for shard IDs
20+
type ShardMember string
21+
22+
func (m ShardMember) String() string {
23+
return string(m)
24+
}
1025

1126
// Store manages shard routing state and workflow mappings
1227
type Store struct {
13-
routingState map[string]uint32 // workflow_id -> shard_id
14-
shardHealth map[uint32]bool // shard_id -> is_healthy
15-
mu sync.Mutex
28+
routingState map[string]uint32 // workflow_id -> shard_id
29+
shardHealth map[uint32]bool // shard_id -> is_healthy
30+
consistentHash *consistent.Consistent // Consistent hash ring for routing
31+
healthyShards []uint32 // Sorted list of healthy shards
32+
mu sync.Mutex
1633
}
1734

1835
func NewStore() *Store {
1936
return &Store{
20-
routingState: make(map[string]uint32),
21-
shardHealth: make(map[uint32]bool),
22-
mu: sync.Mutex{},
37+
routingState: make(map[string]uint32),
38+
shardHealth: make(map[uint32]bool),
39+
healthyShards: make([]uint32, 0),
40+
mu: sync.Mutex{},
41+
}
42+
}
43+
44+
// consistentHashConfig returns the configuration for consistent hashing
45+
// Matches prototype: PartitionCount=997 (prime), ReplicationFactor=50, Load=1.1
46+
func consistentHashConfig() consistent.Config {
47+
return consistent.Config{
48+
PartitionCount: 997, // Prime number for better distribution
49+
ReplicationFactor: 50, // Number of replicas per node
50+
Load: 1.1, // Load factor for bounded loads
51+
Hasher: xxhashHasher{},
52+
}
53+
}
54+
55+
// updateConsistentHash rebuilds the consistent hash ring based on healthy shards
56+
func (s *Store) updateConsistentHash() {
57+
s.mu.Lock()
58+
defer s.mu.Unlock()
59+
60+
// Get list of healthy shards and create members
61+
healthyMembers := make([]consistent.Member, 0)
62+
s.healthyShards = make([]uint32, 0)
63+
64+
for shardID, healthy := range s.shardHealth {
65+
if healthy {
66+
healthyMembers = append(healthyMembers, ShardMember(strconv.FormatUint(uint64(shardID), 10)))
67+
s.healthyShards = append(s.healthyShards, shardID)
68+
}
69+
}
70+
71+
// Sort for determinism
72+
slices.Sort(s.healthyShards)
73+
74+
// If no healthy shards, add shard 0 as fallback
75+
if len(healthyMembers) == 0 {
76+
healthyMembers = append(healthyMembers, ShardMember("0"))
77+
s.healthyShards = []uint32{0}
2378
}
79+
80+
// Create consistent hash ring
81+
s.consistentHash = consistent.New(healthyMembers, consistentHashConfig())
2482
}
2583

26-
func (s *Store) GetShardForWorkflow(workflowID string) (uint32, bool) {
84+
// GetShardForWorkflow deterministically assigns a workflow to a shard using consistent hashing.
85+
// The assignment uses the same algorithm as the prototype: xxhash + consistent hashing ring.
86+
func (s *Store) GetShardForWorkflow(workflowID string) uint32 {
2787
s.mu.Lock()
2888
defer s.mu.Unlock()
29-
shardID, ok := s.routingState[workflowID]
30-
return shardID, ok
89+
90+
if s.consistentHash == nil {
91+
// Fallback if hash ring not initialized
92+
return 0
93+
}
94+
95+
// Use consistent hashing to find the member for this workflow
96+
member := s.consistentHash.LocateKey([]byte(workflowID))
97+
if member == nil {
98+
return 0
99+
}
100+
101+
// Parse shard ID from member name
102+
shardID, err := strconv.ParseUint(member.String(), 10, 32)
103+
if err != nil {
104+
return 0
105+
}
106+
107+
return uint32(shardID)
31108
}
32109

33110
func (s *Store) SetShardForWorkflow(workflowID string, shardID uint32) {
@@ -40,28 +117,40 @@ func (s *Store) GetShardHealth() map[uint32]bool {
40117
s.mu.Lock()
41118
defer s.mu.Unlock()
42119
copied := make(map[uint32]bool)
43-
maps.Copy(copied, s.shardHealth)
120+
for k, v := range s.shardHealth {
121+
copied[k] = v
122+
}
44123
return copied
45124
}
46125

47126
func (s *Store) SetShardHealth(shardID uint32, healthy bool) {
48127
s.mu.Lock()
49-
defer s.mu.Unlock()
50128
s.shardHealth[shardID] = healthy
129+
s.mu.Unlock()
130+
131+
// Rebuild consistent hash ring when shard health changes
132+
s.updateConsistentHash()
51133
}
52134

53135
func (s *Store) SetAllShardHealth(health map[uint32]bool) {
54136
s.mu.Lock()
55-
defer s.mu.Unlock()
56137
s.shardHealth = make(map[uint32]bool)
57-
maps.Copy(s.shardHealth, health)
138+
for k, v := range health {
139+
s.shardHealth[k] = v
140+
}
141+
s.mu.Unlock()
142+
143+
// Rebuild consistent hash ring
144+
s.updateConsistentHash()
58145
}
59146

60147
func (s *Store) GetAllRoutingState() map[string]uint32 {
61148
s.mu.Lock()
62149
defer s.mu.Unlock()
63150
copied := make(map[string]uint32)
64-
maps.Copy(copied, s.routingState)
151+
for k, v := range s.routingState {
152+
copied[k] = v
153+
}
65154
return copied
66155
}
67156

@@ -82,3 +171,10 @@ func (s *Store) GetHealthyShardCount() int {
82171
}
83172
return count
84173
}
174+
175+
// GetHealthyShards returns a sorted list of healthy shards for inspection
176+
func (s *Store) GetHealthyShards() []uint32 {
177+
s.mu.Lock()
178+
defer s.mu.Unlock()
179+
return slices.Clone(s.healthyShards)
180+
}

0 commit comments

Comments
 (0)