Skip to content

Commit 1bb5ccb

Browse files
committed
add orchestration processor idempotency test
Signed-off-by: Fabian Martinez <[email protected]>
1 parent 12041b7 commit 1bb5ccb

File tree

1 file changed

+57
-0
lines changed

1 file changed

+57
-0
lines changed

tests/worker_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,63 @@ func Test_TryProcessSingleOrchestrationWorkItem_BasicFlow(t *testing.T) {
7676
worker.StopAndDrain()
7777
}
7878

79+
func Test_TryProcessSingleOrchestrationWorkItem_Idempotency(t *testing.T) {
80+
ctx := context.Background()
81+
workflowID := "test123"
82+
wi := &backend.OrchestrationWorkItem{
83+
InstanceID: api.InstanceID(workflowID),
84+
NewEvents: []*protos.HistoryEvent{
85+
{
86+
EventId: -1,
87+
Timestamp: timestamppb.New(time.Now()),
88+
EventType: &protos.HistoryEvent_ExecutionStarted{
89+
ExecutionStarted: &protos.ExecutionStartedEvent{
90+
Name: "MyOrch",
91+
OrchestrationInstance: &protos.OrchestrationInstance{
92+
InstanceId: workflowID,
93+
ExecutionId: wrapperspb.String(uuid.New().String()),
94+
},
95+
},
96+
},
97+
},
98+
},
99+
State: runtimestate.NewOrchestrationRuntimeState(workflowID, nil, []*protos.HistoryEvent{}),
100+
}
101+
102+
ctx, cancel := context.WithCancel(ctx)
103+
t.Cleanup(cancel)
104+
105+
completed := atomic.Bool{}
106+
be := mocks.NewBackend(t)
107+
ex := mocks.NewExecutor(t)
108+
109+
be.EXPECT().NextOrchestrationWorkItem(anyContext).Return(wi, nil).Once()
110+
ex.EXPECT().ExecuteOrchestrator(anyContext, wi.InstanceID, wi.State.OldEvents, mock.Anything).Return(nil, errors.New("dummy error")).Once()
111+
be.EXPECT().AbandonOrchestrationWorkItem(anyContext, wi).Return(nil).Once()
112+
113+
be.EXPECT().NextOrchestrationWorkItem(anyContext).Return(wi, nil).Once()
114+
ex.EXPECT().ExecuteOrchestrator(anyContext, wi.InstanceID, wi.State.OldEvents, mock.Anything).Return(&protos.OrchestratorResponse{}, nil).Once()
115+
be.EXPECT().CompleteOrchestrationWorkItem(anyContext, wi).RunAndReturn(func(ctx context.Context, owi *backend.OrchestrationWorkItem) error {
116+
completed.Store(true)
117+
return nil
118+
}).Once()
119+
120+
be.EXPECT().NextOrchestrationWorkItem(anyContext).Return(nil, errors.New("")).Once().Run(func(mock.Arguments) {
121+
cancel()
122+
})
123+
124+
worker := backend.NewOrchestrationWorker(be, ex, logger, backend.WithMaxParallelism(1))
125+
worker.Start(ctx)
126+
127+
require.EventuallyWithT(t, func(collect *assert.CollectT) {
128+
if !completed.Load() {
129+
collect.Errorf("process next not called CompleteOrchestrationWorkItem yet")
130+
}
131+
}, 2*time.Second, 100*time.Millisecond)
132+
133+
worker.StopAndDrain()
134+
}
135+
79136
func Test_TryProcessSingleOrchestrationWorkItem_ExecutionStartedAndCompleted(t *testing.T) {
80137
ctx := context.Background()
81138
iid := api.InstanceID("test123")

0 commit comments

Comments
 (0)