Skip to content

Commit e042bb2

Browse files
committed
cre-1601: more tests; more coverage
1 parent 57f0423 commit e042bb2

File tree

4 files changed

+206
-18
lines changed

4 files changed

+206
-18
lines changed

pkg/workflows/ring/plugin_test.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,113 @@ func TestPlugin_getHealthyShards(t *testing.T) {
378378
}
379379
}
380380

381+
func TestPlugin_NoHealthyShardsFallbackToShardZero(t *testing.T) {
382+
lggr := logger.Test(t)
383+
store := NewStore()
384+
385+
// Set all shards unhealthy - store starts in transition state
386+
store.SetAllShardHealth(map[uint32]bool{0: false, 1: false, 2: false})
387+
388+
config := ocr3types.ReportingPluginConfig{
389+
N: 4, F: 1,
390+
}
391+
392+
plugin, err := NewPlugin(store, config, lggr, &ConsensusConfig{
393+
BatchSize: 100,
394+
TimeToSync: 1 * time.Second,
395+
})
396+
require.NoError(t, err)
397+
398+
transmitter := NewTransmitter(lggr, store, nil, "test-account")
399+
400+
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
401+
defer cancel()
402+
403+
// Start a goroutine that requests allocation (will block waiting for OCR)
404+
resultCh := make(chan uint32)
405+
errCh := make(chan error, 1)
406+
go func() {
407+
shard, err := store.GetShardForWorkflow(ctx, "workflow-123")
408+
if err != nil {
409+
errCh <- err
410+
return
411+
}
412+
resultCh <- shard
413+
}()
414+
415+
// Give goroutine time to enqueue request
416+
time.Sleep(10 * time.Millisecond)
417+
418+
// Verify request is pending for OCR consensus
419+
pending := store.GetPendingAllocations()
420+
require.Contains(t, pending, "workflow-123")
421+
422+
// Simulate OCR round with observations showing no healthy shards
423+
// The pending allocation "workflow-123" should be included in observation
424+
now := time.Now()
425+
aos := make([]types.AttributedObservation, 3)
426+
for i := 0; i < 3; i++ {
427+
pbObs := &pb.Observation{
428+
ShardHealthStatus: map[uint32]bool{0: false, 1: false, 2: false},
429+
WorkflowIds: []string{"workflow-123"},
430+
Now: timestamppb.New(now),
431+
}
432+
rawObs, err := proto.Marshal(pbObs)
433+
require.NoError(t, err)
434+
aos[i] = types.AttributedObservation{
435+
Observation: rawObs,
436+
Observer: commontypes.OracleID(i),
437+
}
438+
}
439+
440+
// Use a previous outcome in steady state so we can test the fallback
441+
priorOutcome := &pb.Outcome{
442+
State: &pb.RoutingState{
443+
Id: 1,
444+
State: &pb.RoutingState_RoutableShards{RoutableShards: 3},
445+
},
446+
Routes: map[string]*pb.WorkflowRoute{},
447+
}
448+
priorBytes, err := proto.Marshal(priorOutcome)
449+
require.NoError(t, err)
450+
451+
outcomeCtx := ocr3types.OutcomeContext{
452+
SeqNr: 2,
453+
PreviousOutcome: priorBytes,
454+
}
455+
456+
// Run plugin Outcome phase
457+
outcome, err := plugin.Outcome(ctx, outcomeCtx, nil, aos)
458+
require.NoError(t, err)
459+
460+
// Transmit the outcome (applies routes to store)
461+
reports, err := plugin.Reports(ctx, 2, outcome)
462+
require.NoError(t, err)
463+
require.Len(t, reports, 1)
464+
465+
err = transmitter.Transmit(ctx, types.ConfigDigest{}, 2, reports[0].ReportWithInfo, nil)
466+
require.NoError(t, err)
467+
468+
// Blocked goroutine should now receive result from OCR - should be shard 0 (fallback)
469+
select {
470+
case shard := <-resultCh:
471+
require.Equal(t, uint32(0), shard, "should fallback to shard 0 when no healthy shards")
472+
case err := <-errCh:
473+
t.Fatalf("unexpected error: %v", err)
474+
case <-time.After(100 * time.Millisecond):
475+
t.Fatal("allocation was not fulfilled by OCR")
476+
}
477+
478+
// Verify the outcome assigned workflow-123 to shard 0
479+
outcomeProto := &pb.Outcome{}
480+
err = proto.Unmarshal(outcome, outcomeProto)
481+
require.NoError(t, err)
482+
483+
route, exists := outcomeProto.Routes["workflow-123"]
484+
require.True(t, exists, "workflow-123 should be in routes")
485+
require.Equal(t, uint32(0), route.Shard, "workflow-123 should be assigned to shard 0 (fallback)")
486+
}
487+
381488
func TestPlugin_ObservationQuorum(t *testing.T) {
382489
lggr := logger.Test(t)
383490
store := NewStore()

pkg/workflows/ring/store.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,15 @@ type Store struct {
2828
mu sync.Mutex
2929
}
3030

31+
const AllocationRequestChannelCapacity = 1000
32+
3133
func NewStore() *Store {
3234
return &Store{
3335
routingState: make(map[string]uint32),
3436
shardHealth: make(map[uint32]bool),
3537
healthyShards: make([]uint32, 0),
3638
pendingAllocs: make(map[string][]chan uint32),
37-
allocRequests: make(chan AllocationRequest, 1000),
39+
allocRequests: make(chan AllocationRequest, AllocationRequestChannelCapacity),
3840
mu: sync.Mutex{},
3941
}
4042
}

pkg/workflows/ring/store_test.go

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -128,23 +128,6 @@ func TestStore_GetHealthyShards(t *testing.T) {
128128
require.Equal(t, []uint32{1, 2, 3}, healthyShards)
129129
}
130130

131-
func TestStore_NilHashRingFallback(t *testing.T) {
132-
store := NewStore()
133-
ctx := context.Background()
134-
135-
// Set all shards unhealthy
136-
store.SetAllShardHealth(map[uint32]bool{0: false, 1: false, 2: false})
137-
// Simulate OCR moved to steady (even with no healthy shards)
138-
store.SetRoutingState(&pb.RoutingState{
139-
State: &pb.RoutingState_RoutableShards{RoutableShards: 0},
140-
})
141-
142-
// Should not panic, should return 0 as fallback (no healthy shards)
143-
shard, err := store.GetShardForWorkflow(ctx, "workflow-123")
144-
require.NoError(t, err)
145-
require.Equal(t, uint32(0), shard)
146-
}
147-
148131
func TestStore_DistributionAcrossShards(t *testing.T) {
149132
store := NewStore()
150133
ctx := context.Background()
@@ -186,6 +169,50 @@ func sum(distribution map[uint32]int) int {
186169
return total
187170
}
188171

172+
func TestStore_GetShardForWorkflow_CacheHit(t *testing.T) {
173+
store := NewStore()
174+
ctx := context.Background()
175+
176+
// Set up steady state
177+
store.SetAllShardHealth(map[uint32]bool{0: true, 1: true, 2: true})
178+
store.SetRoutingState(&pb.RoutingState{
179+
State: &pb.RoutingState_RoutableShards{RoutableShards: 3},
180+
})
181+
182+
// Pre-populate cache with a specific shard assignment
183+
store.SetShardForWorkflow("cached-workflow", 2)
184+
185+
// Should return cached value, not recompute
186+
shard, err := store.GetShardForWorkflow(ctx, "cached-workflow")
187+
require.NoError(t, err)
188+
require.Equal(t, uint32(2), shard)
189+
}
190+
191+
func TestStore_GetShardForWorkflow_ContextCancelledDuringSend(t *testing.T) {
192+
store := NewStore()
193+
194+
// Put store in transition state
195+
store.SetAllShardHealth(map[uint32]bool{0: true})
196+
store.SetRoutingState(&pb.RoutingState{
197+
State: &pb.RoutingState_Transition{
198+
Transition: &pb.Transition{WantShards: 2},
199+
},
200+
})
201+
202+
// Fill up the allocRequests channel
203+
for i := 0; i < AllocationRequestChannelCapacity; i++ {
204+
store.allocRequests <- AllocationRequest{WorkflowID: "filler"}
205+
}
206+
207+
// Context that's already cancelled
208+
ctx, cancel := context.WithCancel(context.Background())
209+
cancel()
210+
211+
// Should fail: channel is full and context is cancelled
212+
_, err := store.GetShardForWorkflow(ctx, "workflow-123")
213+
require.ErrorIs(t, err, context.Canceled)
214+
}
215+
189216
func TestStore_PendingAllocsDuringTransition(t *testing.T) {
190217
store := NewStore()
191218
store.SetAllShardHealth(map[uint32]bool{0: true, 1: true})

pkg/workflows/ring/transmitter_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
type mockArbiterScaler struct {
1919
called bool
2020
nShards uint32
21+
err error
2122
}
2223

2324
func (m *mockArbiterScaler) Status(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*pb.ReplicaStatus, error) {
@@ -27,6 +28,9 @@ func (m *mockArbiterScaler) Status(ctx context.Context, in *emptypb.Empty, opts
2728
func (m *mockArbiterScaler) ConsensusWantShards(ctx context.Context, req *pb.ConsensusWantShardsRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
2829
m.called = true
2930
m.nShards = req.NShards
31+
if m.err != nil {
32+
return nil, m.err
33+
}
3034
return &emptypb.Empty{}, nil
3135
}
3236

@@ -119,3 +123,51 @@ func TestTransmitter_Transmit_TransitionState(t *testing.T) {
119123
require.NoError(t, err)
120124
require.Equal(t, uint32(5), mock.nShards)
121125
}
126+
127+
func TestTransmitter_Transmit_InvalidReport(t *testing.T) {
128+
lggr := logger.Test(t)
129+
store := NewStore()
130+
tx := NewTransmitter(lggr, store, nil, "test-account")
131+
132+
// Send invalid protobuf data
133+
report := ocr3types.ReportWithInfo[[]byte]{Report: []byte("invalid protobuf")}
134+
err := tx.Transmit(context.Background(), types.ConfigDigest{}, 0, report, nil)
135+
require.Error(t, err)
136+
}
137+
138+
func TestTransmitter_Transmit_ArbiterError(t *testing.T) {
139+
lggr := logger.Test(t)
140+
store := NewStore()
141+
mock := &mockArbiterScaler{err: context.DeadlineExceeded}
142+
tx := NewTransmitter(lggr, store, mock, "test-account")
143+
144+
outcome := &pb.Outcome{
145+
State: &pb.RoutingState{
146+
Id: 1,
147+
State: &pb.RoutingState_RoutableShards{RoutableShards: 3},
148+
},
149+
}
150+
outcomeBytes, _ := proto.Marshal(outcome)
151+
152+
err := tx.Transmit(context.Background(), types.ConfigDigest{}, 0, ocr3types.ReportWithInfo[[]byte]{Report: outcomeBytes}, nil)
153+
require.ErrorIs(t, err, context.DeadlineExceeded)
154+
}
155+
156+
func TestTransmitter_Transmit_NilState(t *testing.T) {
157+
lggr := logger.Test(t)
158+
store := NewStore()
159+
tx := NewTransmitter(lggr, store, nil, "test-account")
160+
161+
outcome := &pb.Outcome{
162+
State: nil,
163+
Routes: map[string]*pb.WorkflowRoute{"wf-1": {Shard: 0}},
164+
}
165+
outcomeBytes, _ := proto.Marshal(outcome)
166+
167+
err := tx.Transmit(context.Background(), types.ConfigDigest{}, 0, ocr3types.ReportWithInfo[[]byte]{Report: outcomeBytes}, nil)
168+
require.NoError(t, err)
169+
170+
// Routes should still be applied
171+
routes := store.GetAllRoutingState()
172+
require.Equal(t, uint32(0), routes["wf-1"])
173+
}

0 commit comments

Comments
 (0)