Skip to content

Commit 4b0684f

Browse files
committed
cre-1601: transition state machine
1 parent e3e2e02 commit 4b0684f

File tree

2 files changed

+325
-10
lines changed

2 files changed

+325
-10
lines changed

pkg/workflows/shardorchestrator/plugin.go

Lines changed: 77 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"google.golang.org/protobuf/proto"
1111
"google.golang.org/protobuf/types/known/structpb"
12+
"google.golang.org/protobuf/types/known/timestamppb"
1213

1314
"github.com/smartcontractkit/chainlink-common/pkg/logger"
1415
"github.com/smartcontractkit/chainlink-common/pkg/workflows/shardorchestrator/pb"
@@ -28,6 +29,7 @@ type Plugin struct {
2829
batchSize int
2930
minShardCount uint32
3031
maxShardCount uint32
32+
timeToSync time.Duration
3133
}
3234

3335
var _ ocr3types.ReportingPlugin[[]byte] = (*Plugin)(nil)
@@ -37,6 +39,7 @@ type ConsensusConfig struct {
3739
MinShardCount uint32
3840
MaxShardCount uint32
3941
BatchSize int
42+
TimeToSync time.Duration
4043
}
4144

4245
// NewPlugin creates a consensus reporting plugin for shard orchestration
@@ -46,6 +49,7 @@ func NewPlugin(store *Store, config ocr3types.ReportingPluginConfig, lggr logger
4649
MinShardCount: 1,
4750
MaxShardCount: 100,
4851
BatchSize: 1000,
52+
TimeToSync: 5 * time.Minute,
4953
}
5054
}
5155

@@ -58,6 +62,9 @@ func NewPlugin(store *Store, config ocr3types.ReportingPluginConfig, lggr logger
5862
if cfg.BatchSize <= 0 {
5963
cfg.BatchSize = 100
6064
}
65+
if cfg.TimeToSync <= 0 {
66+
cfg.TimeToSync = 5 * time.Minute
67+
}
6168

6269
return &Plugin{
6370
store: store,
@@ -66,6 +73,7 @@ func NewPlugin(store *Store, config ocr3types.ReportingPluginConfig, lggr logger
6673
batchSize: cfg.BatchSize,
6774
minShardCount: cfg.MinShardCount,
6875
maxShardCount: cfg.MaxShardCount,
76+
timeToSync: cfg.TimeToSync,
6977
}, nil
7078
}
7179

@@ -85,7 +93,7 @@ func (p *Plugin) Observation(_ context.Context, outctx ocr3types.OutcomeContext,
8593
observation := &pb.Observation{
8694
Status: shardHealth,
8795
Hashes: allWorkflowIDs,
88-
Now: nil,
96+
Now: timestamppb.Now(),
8997
}
9098

9199
return proto.MarshalOptions{Deterministic: true}.Marshal(observation)
@@ -100,9 +108,19 @@ func (p *Plugin) ObservationQuorum(_ context.Context, _ ocr3types.OutcomeContext
100108
}
101109

102110
func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) {
111+
// Load prior state
112+
prior := &pb.Outcome{}
113+
if outctx.PreviousOutcome == nil {
114+
prior.Routes = map[string]*pb.WorkflowRoute{}
115+
prior.State = &pb.RoutingState{Id: outctx.SeqNr, State: &pb.RoutingState_RoutableShards{RoutableShards: p.minShardCount}}
116+
} else if err := proto.Unmarshal(outctx.PreviousOutcome, prior); err != nil {
117+
return nil, err
118+
}
119+
103120
currentShardHealth := make(map[uint32]int)
104121
totalObservations := len(aos)
105122
allWorkflows := []string{}
123+
nows := []time.Time{}
106124

107125
// Collect shard health observations and workflows
108126
for _, ao := range aos {
@@ -120,6 +138,18 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
120138

121139
// Collect workflow IDs
122140
allWorkflows = append(allWorkflows, observation.Hashes...)
141+
142+
// Collect timestamps
143+
if observation.Now != nil {
144+
nows = append(nows, observation.Now.AsTime())
145+
}
146+
}
147+
148+
// Calculate median time
149+
now := time.Now()
150+
if len(nows) > 0 {
151+
slices.SortFunc(nows, time.Time.Compare)
152+
now = nows[len(nows)/2]
123153
}
124154

125155
// Deduplicate workflows
@@ -151,6 +181,12 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
151181
healthyShardCount = p.maxShardCount
152182
}
153183

184+
// Calculate next state using state machine
185+
nextState, err := p.calculateNextState(prior.State, healthyShardCount, now)
186+
if err != nil {
187+
return nil, err
188+
}
189+
154190
// Use deterministic hashing to assign workflows to shards
155191
routes := make(map[string]*pb.WorkflowRoute)
156192
for _, wfID := range allWorkflows {
@@ -162,12 +198,7 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
162198

163199
// Update routing state
164200
outcome := &pb.Outcome{
165-
State: &pb.RoutingState{
166-
Id: outctx.SeqNr,
167-
State: &pb.RoutingState_RoutableShards{
168-
RoutableShards: healthyShardCount,
169-
},
170-
},
201+
State: nextState,
171202
Routes: routes,
172203
}
173204

@@ -176,6 +207,45 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
176207
return proto.MarshalOptions{Deterministic: true}.Marshal(outcome)
177208
}
178209

210+
func (p *Plugin) calculateNextState(priorState *pb.RoutingState, wantShards uint32, now time.Time) (*pb.RoutingState, error) {
211+
switch ps := priorState.State.(type) {
212+
case *pb.RoutingState_RoutableShards:
213+
// If already at desired count, stay in stable state
214+
if ps.RoutableShards == wantShards {
215+
return priorState, nil
216+
}
217+
218+
// Otherwise, initiate transition
219+
return &pb.RoutingState{
220+
Id: priorState.Id + 1,
221+
State: &pb.RoutingState_Transition{
222+
Transition: &pb.Transition{
223+
WantShards: wantShards,
224+
LastStableCount: ps.RoutableShards,
225+
ChangesSafeAfter: timestamppb.New(now.Add(p.timeToSync)),
226+
},
227+
},
228+
}, nil
229+
230+
case *pb.RoutingState_Transition:
231+
// If still in safety period, stay in transition
232+
if now.Before(ps.Transition.ChangesSafeAfter.AsTime()) {
233+
return priorState, nil
234+
}
235+
236+
// Safety period elapsed, transition to stable state
237+
return &pb.RoutingState{
238+
Id: priorState.Id + 1,
239+
State: &pb.RoutingState_RoutableShards{
240+
RoutableShards: ps.Transition.WantShards,
241+
},
242+
}, nil
243+
244+
default:
245+
return nil, errors.New("unknown prior state type")
246+
}
247+
}
248+
179249
func (p *Plugin) Reports(_ context.Context, _ uint64, outcome ocr3types.Outcome) ([]ocr3types.ReportPlus[[]byte], error) {
180250
allOraclesTransmitNow := &ocr3types.TransmissionSchedule{
181251
Transmitters: make([]commontypes.OracleID, p.config.N),

0 commit comments

Comments
 (0)