Skip to content

Commit 26cbc3b

Browse files
committed
cre-1601: refactor of state; state verification tests; related changes;
1 parent e042bb2 commit 26cbc3b

File tree

4 files changed

+344
-51
lines changed

4 files changed

+344
-51
lines changed

pkg/workflows/ring/plugin.go

Lines changed: 1 addition & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
172172

173173
healthyShards := p.getHealthyShards(currentShardHealth)
174174

175-
nextState, err := p.calculateNextState(prior.State, uint32(len(healthyShards)), now)
175+
nextState, err := NextState(prior.State, uint32(len(healthyShards)), now, p.timeToSync)
176176
if err != nil {
177177
return nil, err
178178
}
@@ -197,46 +197,6 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
197197
return proto.MarshalOptions{Deterministic: true}.Marshal(outcome)
198198
}
199199

200-
func (p *Plugin) calculateNextState(priorState *pb.RoutingState, wantShards uint32, now time.Time) (*pb.RoutingState, error) {
201-
switch ps := priorState.State.(type) {
202-
case *pb.RoutingState_RoutableShards:
203-
// No transition needed; avoid unnecessary workflow redistribution
204-
if ps.RoutableShards == wantShards {
205-
return priorState, nil
206-
}
207-
208-
// Shard count changed; start transition with safety period for workflow redistribution
209-
return &pb.RoutingState{
210-
Id: priorState.Id + 1,
211-
State: &pb.RoutingState_Transition{
212-
Transition: &pb.Transition{
213-
WantShards: wantShards,
214-
LastStableCount: ps.RoutableShards,
215-
ChangesSafeAfter: timestamppb.New(now.Add(p.timeToSync)),
216-
},
217-
},
218-
}, nil
219-
220-
case *pb.RoutingState_Transition:
221-
// Cannot commit to new routing until safety period elapses: some nodes may still
222-
// be on the prior stable state and changing routes now would cause misalignment
223-
if now.Before(ps.Transition.ChangesSafeAfter.AsTime()) {
224-
return priorState, nil
225-
}
226-
227-
// All nodes have synced; commit to new routing configuration
228-
return &pb.RoutingState{
229-
Id: priorState.Id + 1,
230-
State: &pb.RoutingState_RoutableShards{
231-
RoutableShards: ps.Transition.WantShards,
232-
},
233-
}, nil
234-
235-
default:
236-
return nil, errors.New("unknown prior state type")
237-
}
238-
}
239-
240200
func (p *Plugin) Reports(_ context.Context, _ uint64, outcome ocr3types.Outcome) ([]ocr3types.ReportPlus[[]byte], error) {
241201
allOraclesTransmitNow := &ocr3types.TransmissionSchedule{
242202
Transmitters: make([]commontypes.OracleID, p.config.N),

pkg/workflows/ring/state.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package ring
2+
3+
import (
4+
"errors"
5+
"time"
6+
7+
"google.golang.org/protobuf/types/known/timestamppb"
8+
9+
"github.com/smartcontractkit/chainlink-common/pkg/workflows/ring/pb"
10+
)
11+
12+
func IsInSteadyState(state *pb.RoutingState) bool {
13+
if state == nil {
14+
return false
15+
}
16+
_, ok := state.State.(*pb.RoutingState_RoutableShards)
17+
return ok
18+
}
19+
20+
func NextStateFromSteady(currentID uint64, currentShards, wantShards uint32, now time.Time, timeToSync time.Duration) *pb.RoutingState {
21+
if currentShards == wantShards {
22+
return &pb.RoutingState{
23+
Id: currentID,
24+
State: &pb.RoutingState_RoutableShards{RoutableShards: currentShards},
25+
}
26+
}
27+
28+
return &pb.RoutingState{
29+
Id: currentID + 1,
30+
State: &pb.RoutingState_Transition{
31+
Transition: &pb.Transition{
32+
WantShards: wantShards,
33+
LastStableCount: currentShards,
34+
ChangesSafeAfter: timestamppb.New(now.Add(timeToSync)),
35+
},
36+
},
37+
}
38+
}
39+
40+
func NextStateFromTransition(currentID uint64, transition *pb.Transition, now time.Time) *pb.RoutingState {
41+
safeAfter := transition.ChangesSafeAfter.AsTime()
42+
43+
if now.Before(safeAfter) {
44+
return &pb.RoutingState{
45+
Id: currentID,
46+
State: &pb.RoutingState_Transition{
47+
Transition: transition,
48+
},
49+
}
50+
}
51+
52+
return &pb.RoutingState{
53+
Id: currentID + 1,
54+
State: &pb.RoutingState_RoutableShards{
55+
RoutableShards: transition.WantShards,
56+
},
57+
}
58+
}
59+
60+
func NextState(current *pb.RoutingState, wantShards uint32, now time.Time, timeToSync time.Duration) (*pb.RoutingState, error) {
61+
if current == nil {
62+
return nil, errors.New("current state is nil")
63+
}
64+
65+
switch s := current.State.(type) {
66+
case *pb.RoutingState_RoutableShards:
67+
return NextStateFromSteady(current.Id, s.RoutableShards, wantShards, now, timeToSync), nil
68+
69+
case *pb.RoutingState_Transition:
70+
return NextStateFromTransition(current.Id, s.Transition, now), nil
71+
72+
// coverage:ignore
73+
default:
74+
return nil, errors.New("unknown state type")
75+
}
76+
}

pkg/workflows/ring/state_test.go

Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
package ring
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/require"
8+
"google.golang.org/protobuf/types/known/timestamppb"
9+
10+
"github.com/smartcontractkit/chainlink-common/pkg/workflows/ring/pb"
11+
)
12+
13+
func TestStateTransitionDeterminism(t *testing.T) {
14+
now := time.Unix(0, 0)
15+
timeToSync := 5 * time.Minute
16+
17+
current := &pb.RoutingState{
18+
Id: 1,
19+
State: &pb.RoutingState_RoutableShards{RoutableShards: 2},
20+
}
21+
22+
// Same inputs should produce identical outputs
23+
result1, err := NextState(current, 4, now, timeToSync)
24+
require.NoError(t, err)
25+
26+
result2, err := NextState(current, 4, now, timeToSync)
27+
require.NoError(t, err)
28+
29+
require.Equal(t, result1.Id, result2.Id)
30+
require.Equal(t, result1.GetTransition().WantShards, result2.GetTransition().WantShards)
31+
require.Equal(t, result1.GetTransition().LastStableCount, result2.GetTransition().LastStableCount)
32+
require.Equal(t, result1.GetTransition().ChangesSafeAfter.AsTime(), result2.GetTransition().ChangesSafeAfter.AsTime())
33+
}
34+
35+
// ∀ state, inputs: NextState(state, inputs).Id >= state.Id
36+
func TestFV_StateIDMonotonicity(t *testing.T) {
37+
timeToSync := 5 * time.Minute
38+
baseTime := time.Unix(0, 0)
39+
40+
testCases := []struct {
41+
name string
42+
state *pb.RoutingState
43+
now time.Time
44+
}{
45+
// Steady state cases
46+
{"steady_same_shards", steadyState(10, 3), baseTime},
47+
{"steady_more_shards", steadyState(10, 3), baseTime},
48+
{"steady_fewer_shards", steadyState(10, 3), baseTime},
49+
// Transition state cases
50+
{"transition_before_safe", transitionState(10, 3, 5, baseTime.Add(1*time.Hour)), baseTime},
51+
{"transition_at_safe", transitionState(10, 3, 5, baseTime), baseTime},
52+
{"transition_after_safe", transitionState(10, 3, 5, baseTime.Add(-1*time.Second)), baseTime},
53+
}
54+
55+
shardCounts := []uint32{1, 2, 3, 5, 10}
56+
57+
for _, tc := range testCases {
58+
for _, wantShards := range shardCounts {
59+
t.Run(tc.name, func(t *testing.T) {
60+
result, err := NextState(tc.state, wantShards, tc.now, timeToSync)
61+
require.NoError(t, err)
62+
63+
// INVARIANT: ID never decreases
64+
require.GreaterOrEqual(t, result.Id, tc.state.Id,
65+
"state ID must be monotonically non-decreasing")
66+
})
67+
}
68+
}
69+
}
70+
71+
// The state machine only produces valid transitions:
72+
// - Steady → Steady (when shards unchanged)
73+
// - Steady → Transition (when shards change)
74+
// - Transition → Transition (before safety period)
75+
// - Transition → Steady (after safety period)
76+
func TestFV_ValidStateTransitions(t *testing.T) {
77+
timeToSync := 5 * time.Minute
78+
baseTime := time.Unix(0, 0)
79+
80+
t.Run("steady_to_steady_when_unchanged", func(t *testing.T) {
81+
for _, shards := range []uint32{1, 2, 3, 5, 10} {
82+
state := steadyState(1, shards)
83+
result, err := NextState(state, shards, baseTime, timeToSync)
84+
require.NoError(t, err)
85+
86+
// Must remain steady with same shard count
87+
require.True(t, IsInSteadyState(result))
88+
require.Equal(t, shards, result.GetRoutableShards())
89+
require.Equal(t, state.Id, result.Id, "ID unchanged when no transition")
90+
}
91+
})
92+
93+
t.Run("steady_to_transition_when_changed", func(t *testing.T) {
94+
transitions := [][2]uint32{{1, 2}, {2, 1}, {3, 5}, {5, 3}, {1, 10}}
95+
for _, tr := range transitions {
96+
current, want := tr[0], tr[1]
97+
state := steadyState(1, current)
98+
result, err := NextState(state, want, baseTime, timeToSync)
99+
require.NoError(t, err)
100+
101+
// Must enter transition
102+
require.False(t, IsInSteadyState(result))
103+
require.NotNil(t, result.GetTransition())
104+
require.Equal(t, want, result.GetTransition().WantShards)
105+
require.Equal(t, current, result.GetTransition().LastStableCount)
106+
require.Equal(t, state.Id+1, result.Id)
107+
}
108+
})
109+
110+
t.Run("transition_stays_before_safe_time", func(t *testing.T) {
111+
safeAfter := baseTime.Add(1 * time.Hour)
112+
for _, wantShards := range []uint32{1, 2, 5} {
113+
state := transitionState(5, 2, wantShards, safeAfter)
114+
result, err := NextState(state, wantShards, baseTime, timeToSync)
115+
require.NoError(t, err)
116+
117+
// Must remain in transition
118+
require.False(t, IsInSteadyState(result))
119+
require.Equal(t, state.Id, result.Id, "ID unchanged while waiting")
120+
}
121+
})
122+
123+
t.Run("transition_completes_after_safe_time", func(t *testing.T) {
124+
safeAfter := baseTime.Add(-1 * time.Second)
125+
for _, wantShards := range []uint32{1, 2, 5} {
126+
state := transitionState(5, 2, wantShards, safeAfter)
127+
result, err := NextState(state, wantShards, baseTime, timeToSync)
128+
require.NoError(t, err)
129+
130+
// Must complete to steady
131+
require.True(t, IsInSteadyState(result))
132+
require.Equal(t, wantShards, result.GetRoutableShards())
133+
require.Equal(t, state.Id+1, result.Id)
134+
}
135+
})
136+
}
137+
138+
// ∀ transition: completion occurs iff now >= safeAfter
139+
func TestFV_SafetyPeriodEnforcement(t *testing.T) {
140+
timeToSync := 5 * time.Minute
141+
baseTime := time.Unix(0, 0)
142+
143+
// Test various time offsets relative to safeAfter
144+
offsets := []time.Duration{
145+
-1 * time.Hour,
146+
-1 * time.Minute,
147+
-1 * time.Second,
148+
-1 * time.Nanosecond,
149+
0,
150+
1 * time.Nanosecond,
151+
1 * time.Second,
152+
1 * time.Minute,
153+
1 * time.Hour,
154+
}
155+
156+
for _, offset := range offsets {
157+
safeAfter := baseTime
158+
now := baseTime.Add(offset)
159+
state := transitionState(1, 2, 5, safeAfter)
160+
161+
result, err := NextState(state, 5, now, timeToSync)
162+
require.NoError(t, err)
163+
164+
shouldComplete := !now.Before(safeAfter)
165+
didComplete := IsInSteadyState(result)
166+
167+
require.Equal(t, shouldComplete, didComplete,
168+
"offset=%v: safety period enforcement failed", offset)
169+
}
170+
}
171+
172+
// When entering transition, WantShards equals the requested shard count
173+
// When completing transition, final shard count equals WantShards
174+
func TestFV_TransitionPreservesTarget(t *testing.T) {
175+
timeToSync := 5 * time.Minute
176+
baseTime := time.Unix(0, 0)
177+
178+
for _, currentShards := range []uint32{1, 2, 3, 5} {
179+
for _, wantShards := range []uint32{1, 2, 3, 5} {
180+
if currentShards == wantShards {
181+
continue // No transition occurs
182+
}
183+
184+
// Step 1: Enter transition
185+
state := steadyState(0, currentShards)
186+
afterEnter, err := NextState(state, wantShards, baseTime, timeToSync)
187+
require.NoError(t, err)
188+
require.Equal(t, wantShards, afterEnter.GetTransition().WantShards,
189+
"transition must preserve target shard count")
190+
191+
// Step 2: Complete transition (after safety period)
192+
afterComplete, err := NextState(afterEnter, wantShards, baseTime.Add(timeToSync+time.Second), timeToSync)
193+
require.NoError(t, err)
194+
require.Equal(t, wantShards, afterComplete.GetRoutableShards(),
195+
"completed state must have target shard count")
196+
}
197+
}
198+
}
199+
200+
// ∀ transition: ∃ time t where transition completes (no infinite loops)
201+
func TestFV_EventualCompletion(t *testing.T) {
202+
timeToSync := 5 * time.Minute
203+
baseTime := time.Unix(0, 0)
204+
205+
state := steadyState(0, 2)
206+
207+
// Enter transition
208+
state, err := NextState(state, 5, baseTime, timeToSync)
209+
require.NoError(t, err)
210+
require.False(t, IsInSteadyState(state))
211+
212+
// Simulate time progression - must complete within safety period
213+
completionTime := baseTime.Add(timeToSync)
214+
state, err = NextState(state, 5, completionTime, timeToSync)
215+
require.NoError(t, err)
216+
217+
require.True(t, IsInSteadyState(state), "transition must eventually complete")
218+
}
219+
220+
// ∀ state: exactly one of (IsInSteadyState, IsInTransition) is true
221+
func TestFV_StateTypeExclusivity(t *testing.T) {
222+
states := []*pb.RoutingState{
223+
steadyState(0, 1),
224+
steadyState(5, 3),
225+
transitionState(0, 1, 2, time.Now()),
226+
transitionState(5, 3, 5, time.Now().Add(time.Hour)),
227+
}
228+
229+
for i, state := range states {
230+
isSteady := IsInSteadyState(state)
231+
_, isTransition := state.State.(*pb.RoutingState_Transition)
232+
233+
require.NotEqual(t, isSteady, isTransition,
234+
"state %d: exactly one state type must be true", i)
235+
}
236+
}
237+
238+
// IsInSteadyState(nil) = false (safe handling of nil)
239+
// NextState(nil, ...) returns error (explicit failure)
240+
func TestFV_NilStateSafety(t *testing.T) {
241+
require.False(t, IsInSteadyState(nil), "nil state must not be steady")
242+
243+
_, err := NextState(nil, 1, time.Now(), time.Minute)
244+
require.Error(t, err, "NextState must reject nil input")
245+
}
246+
247+
func steadyState(id uint64, shards uint32) *pb.RoutingState {
248+
return &pb.RoutingState{
249+
Id: id,
250+
State: &pb.RoutingState_RoutableShards{RoutableShards: shards},
251+
}
252+
}
253+
254+
func transitionState(id uint64, lastStable, wantShards uint32, safeAfter time.Time) *pb.RoutingState {
255+
return &pb.RoutingState{
256+
Id: id,
257+
State: &pb.RoutingState_Transition{
258+
Transition: &pb.Transition{
259+
WantShards: wantShards,
260+
LastStableCount: lastStable,
261+
ChangesSafeAfter: timestamppb.New(safeAfter),
262+
},
263+
},
264+
}
265+
}

0 commit comments

Comments
 (0)