Skip to content

Commit 9599327

Browse files
committed
cre-1601: test extension to validate workflows to shards eassignments
1 parent 457af17 commit 9599327

File tree

5 files changed

+43
-27
lines changed

5 files changed

+43
-27
lines changed

pkg/workflows/ring/plugin.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -151,15 +151,31 @@ func (p *Plugin) collectShardInfo(aos []types.AttributedObservation) (shardHealt
151151
return shardHealth, workflows, timestamps
152152
}
153153

154-
func (p *Plugin) countHealthyShards(shardHealth map[uint32]int) uint32 {
155-
var count uint32
154+
func (p *Plugin) getHealthyShards(shardHealth map[uint32]int) []uint32 {
155+
var healthyShards []uint32
156156
for shardID, votes := range shardHealth {
157157
if votes > p.config.F {
158-
count++
158+
healthyShards = append(healthyShards, shardID)
159159
p.store.SetShardHealth(shardID, true)
160160
}
161161
}
162-
return max(p.minShardCount, min(count, p.maxShardCount))
162+
slices.Sort(healthyShards)
163+
164+
// Apply min/max bounds on shard count
165+
if uint32(len(healthyShards)) < p.minShardCount {
166+
// Pad with sequential shard IDs if below minimum
167+
for i := uint32(0); uint32(len(healthyShards)) < p.minShardCount; i++ {
168+
if !slices.Contains(healthyShards, i) {
169+
healthyShards = append(healthyShards, i)
170+
}
171+
}
172+
slices.Sort(healthyShards)
173+
} else if uint32(len(healthyShards)) > p.maxShardCount {
174+
// Truncate to max (keep lowest shard IDs for determinism)
175+
healthyShards = healthyShards[:p.maxShardCount]
176+
}
177+
178+
return healthyShards
163179
}
164180

165181
func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) {
@@ -185,9 +201,9 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
185201

186202
allWorkflows = uniqueSorted(allWorkflows)
187203

188-
healthyShardCount := p.countHealthyShards(currentShardHealth)
204+
healthyShards := p.getHealthyShards(currentShardHealth)
189205

190-
nextState, err := p.calculateNextState(prior.State, healthyShardCount, now)
206+
nextState, err := p.calculateNextState(prior.State, uint32(len(healthyShards)), now)
191207
if err != nil {
192208
return nil, err
193209
}
@@ -196,7 +212,7 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
196212
// This must be a pure function of consensus-derived data to avoid protocol failures
197213
routes := make(map[string]*pb.WorkflowRoute)
198214
for _, wfID := range allWorkflows {
199-
assignedShard := getShardForWorkflow(wfID, healthyShardCount)
215+
assignedShard := getShardForWorkflow(wfID, healthyShards)
200216
routes[wfID] = &pb.WorkflowRoute{
201217
Shard: assignedShard,
202218
}
@@ -207,7 +223,7 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
207223
Routes: routes,
208224
}
209225

210-
p.lggr.Infow("Consensus Outcome", "healthyShards", healthyShardCount, "totalObservations", len(aos), "workflowCount", len(routes))
226+
p.lggr.Infow("Consensus Outcome", "healthyShards", len(healthyShards), "totalObservations", len(aos), "workflowCount", len(routes))
211227

212228
return proto.MarshalOptions{Deterministic: true}.Marshal(outcome)
213229
}

pkg/workflows/ring/plugin_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -352,13 +352,13 @@ func makeObservations(t *testing.T, shardHealths []map[uint32]bool, workflows []
352352
return aos
353353
}
354354

355-
func TestPlugin_countHealthyShards(t *testing.T) {
355+
func TestPlugin_getHealthyShards(t *testing.T) {
356356
tests := []struct {
357357
name string
358358
min, max uint32
359359
votes map[uint32]int // shardID -> vote count
360360
f int
361-
want uint32
361+
want int
362362
}{
363363
{"below min", 3, 10, map[uint32]int{0: 2, 1: 2}, 1, 3},
364364
{"above max", 1, 2, map[uint32]int{0: 2, 1: 2, 2: 2, 3: 2}, 1, 2},
@@ -374,8 +374,8 @@ func TestPlugin_countHealthyShards(t *testing.T) {
374374
minShardCount: tc.min,
375375
maxShardCount: tc.max,
376376
}
377-
got := plugin.countHealthyShards(tc.votes)
378-
require.Equal(t, tc.want, got)
377+
got := plugin.getHealthyShards(tc.votes)
378+
require.Equal(t, tc.want, len(got))
379379
})
380380
}
381381
}

pkg/workflows/ring/store.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,9 @@ func (s *Store) GetShardForWorkflow(ctx context.Context, workflowID string) (uin
7373

7474
// In steady state, compute locally using consistent hashing
7575
if s.currentState == nil || s.isInSteadyState() {
76-
shardCount := uint32(len(s.healthyShards))
76+
healthyShards := slices.Clone(s.healthyShards)
7777
s.mu.Unlock()
78-
return getShardForWorkflow(workflowID, shardCount), nil
78+
return getShardForWorkflow(workflowID, healthyShards), nil
7979
}
8080

8181
// In transition state, enqueue request and wait for allocation

pkg/workflows/ring/store_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"github.com/stretchr/testify/require"
1010
)
1111

12-
// TestStore_DeterministicHashing verifies that workflow assignments are deterministic
1312
func TestStore_DeterministicHashing(t *testing.T) {
1413
store := NewStore()
1514

@@ -35,7 +34,6 @@ func TestStore_DeterministicHashing(t *testing.T) {
3534
require.True(t, shard1 >= 0 && shard1 <= 2, "Shard should be in healthy set")
3635
}
3736

38-
// TestStore_ConsistentRingConsistency verifies that all nodes with same healthy shards agree
3937
func TestStore_ConsistentRingConsistency(t *testing.T) {
4038
store1 := NewStore()
4139
store2 := NewStore()
@@ -64,7 +62,6 @@ func TestStore_ConsistentRingConsistency(t *testing.T) {
6462
}
6563
}
6664

67-
// TestStore_Rebalancing verifies rebalancing when shard health changes
6865
func TestStore_Rebalancing(t *testing.T) {
6966
store := NewStore()
7067
ctx := context.Background()
@@ -93,9 +90,16 @@ func TestStore_Rebalancing(t *testing.T) {
9390
healthyShards := store.GetHealthyShards()
9491
require.Equal(t, 2, len(healthyShards), "Should have 2 healthy shards")
9592
require.NotContains(t, healthyShards, uint32(1), "Shard 1 should not be healthy")
93+
94+
// Verify that workflows on healthy shards did not move
95+
for wfID, originalShard := range assignments1 {
96+
if originalShard == 0 || originalShard == 2 {
97+
require.Equal(t, originalShard, assignments2[wfID],
98+
"Workflow %s on healthy shard %d should not have moved", wfID, originalShard)
99+
}
100+
}
96101
}
97102

98-
// TestStore_GetHealthyShards verifies that healthy shards list is correctly maintained
99103
func TestStore_GetHealthyShards(t *testing.T) {
100104
store := NewStore()
101105

@@ -111,7 +115,6 @@ func TestStore_GetHealthyShards(t *testing.T) {
111115
require.Equal(t, []uint32{1, 2, 3}, healthyShards)
112116
}
113117

114-
// TestStore_NilHashRingFallback verifies fallback when hash ring is uninitialized
115118
func TestStore_NilHashRingFallback(t *testing.T) {
116119
store := NewStore()
117120
ctx := context.Background()
@@ -122,7 +125,6 @@ func TestStore_NilHashRingFallback(t *testing.T) {
122125
require.Equal(t, uint32(0), shard)
123126
}
124127

125-
// TestStore_DistributionAcrossShards verifies that workflows are distributed across shards
126128
func TestStore_DistributionAcrossShards(t *testing.T) {
127129
store := NewStore()
128130
ctx := context.Background()
@@ -156,8 +158,6 @@ func sum(distribution map[uint32]int) int {
156158
return total
157159
}
158160

159-
// TestStore_PendingAllocsDuringTransition verifies that allocation requests block during transition
160-
// and are fulfilled when SetShardForWorkflow is called
161161
func TestStore_PendingAllocsDuringTransition(t *testing.T) {
162162
store := NewStore()
163163
store.SetAllShardHealth(map[uint32]bool{0: true, 1: true})

pkg/workflows/ring/utils.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,14 @@ func consistentHashConfig() consistent.Config {
3535
}
3636
}
3737

38-
func getShardForWorkflow(workflowID string, shardCount uint32) uint32 {
39-
if shardCount == 0 {
38+
func getShardForWorkflow(workflowID string, healthyShards []uint32) uint32 {
39+
if len(healthyShards) == 0 {
4040
return 0
4141
}
4242

43-
members := make([]consistent.Member, shardCount)
44-
for i := uint32(0); i < shardCount; i++ {
45-
members[i] = ShardMember(strconv.FormatUint(uint64(i), 10))
43+
members := make([]consistent.Member, len(healthyShards))
44+
for i, shardID := range healthyShards {
45+
members[i] = ShardMember(strconv.FormatUint(uint64(shardID), 10))
4646
}
4747

4848
ring := consistent.New(members, consistentHashConfig())

0 commit comments

Comments
 (0)