Skip to content

Commit cf0bb98

Browse files
committed
cre-1601: deterministic time; f check for round; improved time median; improved workflows dedup; improved comments;
1 parent 26f8c8c commit cf0bb98

File tree

3 files changed

+49
-35
lines changed

3 files changed

+49
-35
lines changed

pkg/workflows/ring/plugin.go

Lines changed: 27 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -124,22 +124,8 @@ func (p *Plugin) ObservationQuorum(_ context.Context, _ ocr3types.OutcomeContext
124124
return quorumhelper.ObservationCountReachesObservationQuorum(quorumhelper.QuorumTwoFPlusOne, p.config.N, p.config.F, aos), nil
125125
}
126126

127-
func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) {
128-
// Bootstrap with minimum shards on first round; subsequent rounds build on prior outcome
129-
prior := &pb.Outcome{}
130-
if outctx.PreviousOutcome == nil {
131-
prior.Routes = map[string]*pb.WorkflowRoute{}
132-
prior.State = &pb.RoutingState{Id: outctx.SeqNr, State: &pb.RoutingState_RoutableShards{RoutableShards: p.minShardCount}}
133-
} else if err := proto.Unmarshal(outctx.PreviousOutcome, prior); err != nil {
134-
return nil, err
135-
}
136-
137-
currentShardHealth := make(map[uint32]int)
138-
totalObservations := len(aos)
139-
allWorkflows := make([]string, 0)
140-
nows := make([]time.Time, 0)
141-
142-
// Collect shard health observations and workflows
127+
func (p *Plugin) collectShardInfo(aos []types.AttributedObservation) (shardHealth map[uint32]int, workflows []string, timestamps []time.Time) {
128+
shardHealth = make(map[uint32]int)
143129
for _, ao := range aos {
144130
observation := &pb.Observation{}
145131
if err := proto.Unmarshal(ao.Observation, observation); err != nil {
@@ -149,36 +135,42 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
149135

150136
for shardID, healthy := range observation.ShardHealthStatus {
151137
if healthy {
152-
currentShardHealth[shardID]++
138+
shardHealth[shardID]++
153139
}
154140
}
155141

156-
// Collect workflow IDs
157-
allWorkflows = append(allWorkflows, observation.WorkflowIds...)
142+
workflows = append(workflows, observation.WorkflowIds...)
158143

159-
// Collect timestamps
160144
if observation.Now != nil {
161-
nows = append(nows, observation.Now.AsTime())
145+
timestamps = append(timestamps, observation.Now.AsTime())
162146
}
163147
}
148+
return shardHealth, workflows, timestamps
149+
}
164150

165-
// Calculate median time
166-
now := time.Now()
167-
if len(nows) > 0 {
168-
slices.SortFunc(nows, time.Time.Compare)
169-
now = nows[len(nows)/2]
151+
func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) {
152+
// Bootstrap with minimum shards on first round; subsequent rounds build on prior outcome
153+
prior := &pb.Outcome{}
154+
if outctx.PreviousOutcome == nil {
155+
prior.Routes = map[string]*pb.WorkflowRoute{}
156+
prior.State = &pb.RoutingState{Id: outctx.SeqNr, State: &pb.RoutingState_RoutableShards{RoutableShards: p.minShardCount}}
157+
} else if err := proto.Unmarshal(outctx.PreviousOutcome, prior); err != nil {
158+
return nil, err
170159
}
171160

172-
// Deduplicate workflows
173-
workflowMap := make(map[string]bool)
174-
for _, wf := range allWorkflows {
175-
workflowMap[wf] = true
176-
}
177-
allWorkflows = make([]string, 0, len(workflowMap))
178-
for wf := range workflowMap {
179-
allWorkflows = append(allWorkflows, wf)
161+
totalObservations := len(aos)
162+
currentShardHealth, allWorkflows, nows := p.collectShardInfo(aos)
163+
164+
// Need at least F+1 timestamps; fewer means >F faulty nodes and we can't trust this round
165+
if len(nows) < p.config.F+1 {
166+
return nil, errors.New("insufficient observation timestamps")
180167
}
181-
slices.Sort(allWorkflows) // Ensure deterministic order
168+
slices.SortFunc(nows, time.Time.Compare)
169+
170+
// Use the median timestamp to determine the current time
171+
now := nows[len(nows)/2]
172+
173+
allWorkflows = uniqueSorted(allWorkflows)
182174

183175
// Determine desired shard count based on observations
184176
healthyShardCount := uint32(0)

pkg/workflows/ring/utils.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package ring
2+
3+
import "slices"
4+
5+
func uniqueSorted(s []string) []string {
6+
result := slices.Clone(s)
7+
slices.Sort(result)
8+
return slices.Compact(result)
9+
}
10+

pkg/workflows/ring/utils_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package ring
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
)
8+
9+
func TestUniqueSorted(t *testing.T) {
10+
got := uniqueSorted([]string{"c", "a", "b", "a", "c"})
11+
require.Equal(t, []string{"a", "b", "c"}, got)
12+
}

0 commit comments

Comments
 (0)