Skip to content

Commit f455bc0

Browse files
committed
cre-1601: store in two states, steady and transition; enque for allocation trigger post round;
1 parent 18161c1 commit f455bc0

File tree

4 files changed

+194
-23
lines changed

4 files changed

+194
-23
lines changed

pkg/workflows/ring/plugin.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,17 @@ func (p *Plugin) Query(_ context.Context, _ ocr3types.OutcomeContext) (types.Que
101101
func (p *Plugin) Observation(_ context.Context, _ ocr3types.OutcomeContext, _ types.Query) (types.Observation, error) {
102102
shardHealth := p.store.GetShardHealth()
103103

104+
// Collect workflow IDs from cache and pending allocation requests
104105
allWorkflowIDs := make([]string, 0)
105106
for wfID := range p.store.GetAllRoutingState() {
106107
allWorkflowIDs = append(allWorkflowIDs, wfID)
107108
}
108-
slices.Sort(allWorkflowIDs)
109+
110+
// Include any pending allocation requests (workflows waiting for assignment during transition)
111+
pendingAllocs := p.store.GetPendingAllocations()
112+
allWorkflowIDs = append(allWorkflowIDs, pendingAllocs...)
113+
114+
allWorkflowIDs = uniqueSorted(allWorkflowIDs)
109115

110116
observation := &pb.Observation{
111117
ShardHealthStatus: shardHealth,

pkg/workflows/ring/store.go

Lines changed: 107 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,39 @@
11
package ring
22

33
import (
4+
"context"
45
"slices"
56
"sync"
7+
8+
"github.com/smartcontractkit/chainlink-common/pkg/workflows/ring/pb"
69
)
710

11+
// AllocationRequest represents a pending workflow allocation request during transition
12+
type AllocationRequest struct {
13+
WorkflowID string
14+
Result chan uint32
15+
}
16+
817
// Store manages shard routing state and workflow mappings
918
type Store struct {
10-
routingState map[string]uint32 // workflow_id -> shard_id
19+
routingState map[string]uint32 // workflow_id -> shard_id (cache of allocated workflows)
1120
shardHealth map[uint32]bool // shard_id -> is_healthy
1221
healthyShards []uint32 // Sorted list of healthy shards
13-
mu sync.Mutex
22+
currentState *pb.RoutingState // Current routing state (steady or transition)
23+
24+
pendingAllocs map[string][]chan uint32 // workflow_id -> waiting channels
25+
allocRequests chan AllocationRequest // Channel for new allocation requests
26+
27+
mu sync.Mutex
1428
}
1529

1630
func NewStore() *Store {
1731
return &Store{
1832
routingState: make(map[string]uint32),
1933
shardHealth: make(map[uint32]bool),
2034
healthyShards: make([]uint32, 0),
35+
pendingAllocs: make(map[string][]chan uint32),
36+
allocRequests: make(chan AllocationRequest, 1000),
2137
mu: sync.Mutex{},
2238
}
2339
}
@@ -44,19 +60,104 @@ func (s *Store) updateHealthyShards() {
4460
}
4561
}
4662

47-
// GetShardForWorkflow deterministically assigns a workflow to a shard using consistent hashing.
48-
func (s *Store) GetShardForWorkflow(workflowID string) uint32 {
63+
// GetShardForWorkflow returns the shard for a workflow.
64+
// In steady state, it uses consistent hashing from cache.
65+
// In transition state, it enqueues an allocation request and waits for OCR to process it.
66+
func (s *Store) GetShardForWorkflow(ctx context.Context, workflowID string) (uint32, error) {
4967
s.mu.Lock()
50-
shardCount := uint32(len(s.healthyShards))
68+
69+
// Check if already allocated in cache
70+
if shard, ok := s.routingState[workflowID]; ok {
71+
s.mu.Unlock()
72+
return shard, nil
73+
}
74+
75+
// In steady state, compute locally using consistent hashing
76+
if s.currentState == nil || s.isInSteadyState() {
77+
shardCount := uint32(len(s.healthyShards))
78+
s.mu.Unlock()
79+
return getShardForWorkflow(workflowID, shardCount), nil
80+
}
81+
82+
// In transition state, enqueue request and wait for allocation
83+
resultCh := make(chan uint32, 1)
84+
s.pendingAllocs[workflowID] = append(s.pendingAllocs[workflowID], resultCh)
5185
s.mu.Unlock()
5286

53-
return getShardForWorkflow(workflowID, shardCount)
87+
// Submit allocation request
88+
select {
89+
case s.allocRequests <- AllocationRequest{WorkflowID: workflowID, Result: resultCh}:
90+
case <-ctx.Done():
91+
return 0, ctx.Err()
92+
}
93+
94+
// Wait for result
95+
select {
96+
case shard := <-resultCh:
97+
return shard, nil
98+
case <-ctx.Done():
99+
return 0, ctx.Err()
100+
}
101+
}
102+
103+
func (s *Store) isInSteadyState() bool {
104+
if s.currentState == nil {
105+
return true
106+
}
107+
_, ok := s.currentState.State.(*pb.RoutingState_RoutableShards)
108+
return ok
54109
}
55110

56111
func (s *Store) SetShardForWorkflow(workflowID string, shardID uint32) {
57112
s.mu.Lock()
58113
defer s.mu.Unlock()
114+
59115
s.routingState[workflowID] = shardID
116+
117+
// Signal any waiting allocation requests
118+
if waiters, ok := s.pendingAllocs[workflowID]; ok {
119+
for _, ch := range waiters {
120+
select {
121+
case ch <- shardID:
122+
default:
123+
}
124+
}
125+
delete(s.pendingAllocs, workflowID)
126+
}
127+
}
128+
129+
// SetRoutingState updates the current routing state (steady or transition)
130+
func (s *Store) SetRoutingState(state *pb.RoutingState) {
131+
s.mu.Lock()
132+
defer s.mu.Unlock()
133+
s.currentState = state
134+
}
135+
136+
// GetRoutingState returns the current routing state
137+
func (s *Store) GetRoutingState() *pb.RoutingState {
138+
s.mu.Lock()
139+
defer s.mu.Unlock()
140+
return s.currentState
141+
}
142+
143+
// GetPendingAllocations returns workflow IDs that need allocation (non-blocking)
144+
func (s *Store) GetPendingAllocations() []string {
145+
var pending []string
146+
for {
147+
select {
148+
case req := <-s.allocRequests:
149+
pending = append(pending, req.WorkflowID)
150+
default:
151+
return pending
152+
}
153+
}
154+
}
155+
156+
// IsInTransition returns true if the store is in transition state
157+
func (s *Store) IsInTransition() bool {
158+
s.mu.Lock()
159+
defer s.mu.Unlock()
160+
return !s.isInSteadyState()
60161
}
61162

62163
func (s *Store) GetShardHealth() map[uint32]bool {

pkg/workflows/ring/store_test.go

Lines changed: 76 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package ring
22

33
import (
4+
"context"
45
"testing"
6+
"time"
57

8+
"github.com/smartcontractkit/chainlink-common/pkg/workflows/ring/pb"
69
"github.com/stretchr/testify/require"
710
)
811

@@ -17,10 +20,15 @@ func TestStore_DeterministicHashing(t *testing.T) {
1720
2: true,
1821
})
1922

23+
ctx := context.Background()
24+
2025
// Test determinism: same workflow always gets same shard
21-
shard1 := store.GetShardForWorkflow("workflow-123")
22-
shard2 := store.GetShardForWorkflow("workflow-123")
23-
shard3 := store.GetShardForWorkflow("workflow-123")
26+
shard1, err := store.GetShardForWorkflow(ctx, "workflow-123")
27+
require.NoError(t, err)
28+
shard2, err := store.GetShardForWorkflow(ctx, "workflow-123")
29+
require.NoError(t, err)
30+
shard3, err := store.GetShardForWorkflow(ctx, "workflow-123")
31+
require.NoError(t, err)
2432

2533
require.Equal(t, shard1, shard2, "Same workflow should get same shard (call 2)")
2634
require.Equal(t, shard2, shard3, "Same workflow should get same shard (call 3)")
@@ -39,12 +47,17 @@ func TestStore_ConsistentRingConsistency(t *testing.T) {
3947
store2.SetAllShardHealth(healthyShards)
4048
store3.SetAllShardHealth(healthyShards)
4149

50+
ctx := context.Background()
51+
4252
// All compute same assignments
4353
workflows := []string{"workflow-A", "workflow-B", "workflow-C", "workflow-D"}
4454
for _, wf := range workflows {
45-
s1 := store1.GetShardForWorkflow(wf)
46-
s2 := store2.GetShardForWorkflow(wf)
47-
s3 := store3.GetShardForWorkflow(wf)
55+
s1, err := store1.GetShardForWorkflow(ctx, wf)
56+
require.NoError(t, err)
57+
s2, err := store2.GetShardForWorkflow(ctx, wf)
58+
require.NoError(t, err)
59+
s3, err := store3.GetShardForWorkflow(ctx, wf)
60+
require.NoError(t, err)
4861

4962
require.Equal(t, s1, s2, "All nodes should agree on %s assignment", wf)
5063
require.Equal(t, s2, s3, "All nodes should agree on %s assignment", wf)
@@ -54,21 +67,26 @@ func TestStore_ConsistentRingConsistency(t *testing.T) {
5467
// TestStore_Rebalancing verifies rebalancing when shard health changes
5568
func TestStore_Rebalancing(t *testing.T) {
5669
store := NewStore()
70+
ctx := context.Background()
5771

5872
// Start with 3 healthy shards
5973
store.SetAllShardHealth(map[uint32]bool{0: true, 1: true, 2: true})
6074
assignments1 := make(map[string]uint32)
6175
for i := 1; i <= 10; i++ {
6276
wfID := "workflow-" + string(rune(i))
63-
assignments1[wfID] = store.GetShardForWorkflow(wfID)
77+
shard, err := store.GetShardForWorkflow(ctx, wfID)
78+
require.NoError(t, err)
79+
assignments1[wfID] = shard
6480
}
6581

6682
// Shard 1 fails
6783
store.SetShardHealth(1, false)
6884
assignments2 := make(map[string]uint32)
6985
for i := 1; i <= 10; i++ {
7086
wfID := "workflow-" + string(rune(i))
71-
assignments2[wfID] = store.GetShardForWorkflow(wfID)
87+
shard, err := store.GetShardForWorkflow(ctx, wfID)
88+
require.NoError(t, err)
89+
assignments2[wfID] = shard
7290
}
7391

7492
// Check that rebalancing occurred (some workflows moved)
@@ -95,20 +113,19 @@ func TestStore_GetHealthyShards(t *testing.T) {
95113

96114
// TestStore_NilHashRingFallback verifies fallback when hash ring is uninitialized
97115
func TestStore_NilHashRingFallback(t *testing.T) {
98-
store := &Store{
99-
routingState: make(map[string]uint32),
100-
shardHealth: make(map[uint32]bool),
101-
healthyShards: make([]uint32, 0),
102-
}
116+
store := NewStore()
117+
ctx := context.Background()
103118

104-
// Should not panic, should return 0 as fallback
105-
shard := store.GetShardForWorkflow("workflow-123")
119+
// Should not panic, should return 0 as fallback (no healthy shards set)
120+
shard, err := store.GetShardForWorkflow(ctx, "workflow-123")
121+
require.NoError(t, err)
106122
require.Equal(t, uint32(0), shard)
107123
}
108124

109125
// TestStore_DistributionAcrossShards verifies that workflows are distributed across shards
110126
func TestStore_DistributionAcrossShards(t *testing.T) {
111127
store := NewStore()
128+
ctx := context.Background()
112129

113130
store.SetAllShardHealth(map[uint32]bool{
114131
0: true,
@@ -120,7 +137,8 @@ func TestStore_DistributionAcrossShards(t *testing.T) {
120137
distribution := make(map[uint32]int)
121138
for i := 0; i < 100; i++ {
122139
wfID := "workflow-" + string(rune(i))
123-
shard := store.GetShardForWorkflow(wfID)
140+
shard, err := store.GetShardForWorkflow(ctx, wfID)
141+
require.NoError(t, err)
124142
distribution[shard]++
125143
}
126144

@@ -137,3 +155,45 @@ func sum(distribution map[uint32]int) int {
137155
}
138156
return total
139157
}
158+
159+
// TestStore_PendingAllocsDuringTransition verifies that allocation requests block during transition
160+
// and are fulfilled when SetShardForWorkflow is called
161+
func TestStore_PendingAllocsDuringTransition(t *testing.T) {
162+
store := NewStore()
163+
store.SetAllShardHealth(map[uint32]bool{0: true, 1: true})
164+
165+
// Put store in transition state
166+
store.SetRoutingState(&pb.RoutingState{
167+
State: &pb.RoutingState_Transition{
168+
Transition: &pb.Transition{WantShards: 3},
169+
},
170+
})
171+
172+
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
173+
defer cancel()
174+
175+
// Start a goroutine that requests allocation (will block)
176+
resultCh := make(chan uint32)
177+
go func() {
178+
shard, _ := store.GetShardForWorkflow(ctx, "workflow-X")
179+
resultCh <- shard
180+
}()
181+
182+
// Give goroutine time to enqueue request
183+
time.Sleep(10 * time.Millisecond)
184+
185+
// Verify request is pending
186+
pending := store.GetPendingAllocations()
187+
require.Contains(t, pending, "workflow-X")
188+
189+
// Fulfill the allocation (simulates transmitter receiving OCR outcome)
190+
store.SetShardForWorkflow("workflow-X", 2)
191+
192+
// Blocked goroutine should now receive result
193+
select {
194+
case shard := <-resultCh:
195+
require.Equal(t, uint32(2), shard)
196+
case <-time.After(50 * time.Millisecond):
197+
t.Fatal("allocation was not fulfilled")
198+
}
199+
}

pkg/workflows/ring/transmitter.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ func (t *Transmitter) Transmit(ctx context.Context, _ types.ConfigDigest, _ uint
3737
return err
3838
}
3939

40+
// Update routing state (steady vs transition)
41+
t.store.SetRoutingState(outcome.State)
42+
43+
// Update workflow shard mappings (this also signals any waiting allocation requests)
4044
for workflowID, route := range outcome.Routes {
4145
t.store.SetShardForWorkflow(workflowID, route.Shard)
4246
t.lggr.Debugw("Updated workflow shard mapping", "workflowID", workflowID, "shard", route.Shard)

0 commit comments

Comments
 (0)