Skip to content

Commit 7ec83e0

Browse files
authored
Merge pull request #43 from famarting/orchestration-idempotency-test-attemp1
Orchestration idempotency test attemp1
2 parents 12041b7 + d8ca214 commit 7ec83e0

File tree

2 files changed

+79
-5
lines changed

2 files changed

+79
-5
lines changed

backend/orchestration.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -170,16 +170,13 @@ func (w *orchestratorProcessor) applyWorkItem(ctx context.Context, wi *Orchestra
170170
// New events from the work item are appended to the orchestration state, with duplicates automatically
171171
// filtered out. If all events are filtered out, return false so that the caller knows not to execute
172172
// the orchestration logic for an empty set of events.
173-
added := 0
174173
for _, e := range wi.NewEvents {
175174
if err := runtimestate.AddEvent(wi.State, e); err != nil {
176175
if err == runtimestate.ErrDuplicateEvent {
177176
w.logger.Warnf("%v: dropping duplicate event: %v", wi.InstanceID, e)
178177
} else {
179178
w.logger.Warnf("%v: dropping event: %v, %v", wi.InstanceID, e, err)
180179
}
181-
} else {
182-
added++
183180
}
184181

185182
// Special case logic for specific event types
@@ -194,8 +191,8 @@ func (w *orchestratorProcessor) applyWorkItem(ctx context.Context, wi *Orchestra
194191
}
195192
}
196193

197-
if added == 0 {
198-
w.logger.Warnf("%v: all new events were dropped", wi.InstanceID)
194+
if len(wi.State.NewEvents) == 0 {
195+
w.logger.Warnf("%v: no new events to process", wi.InstanceID)
199196
return ctx, span, false
200197
}
201198

tests/worker_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,77 @@ func Test_TryProcessSingleOrchestrationWorkItem_BasicFlow(t *testing.T) {
7474
}, 1*time.Second, 100*time.Millisecond)
7575

7676
worker.StopAndDrain()
77+
78+
t.Logf("state.NewEvents: %v", state.NewEvents)
79+
require.Len(t, state.NewEvents, 2)
80+
require.NotNil(t, wi.State.NewEvents[0].GetOrchestratorStarted())
81+
require.NotNil(t, wi.State.NewEvents[1].GetExecutionStarted())
82+
}
83+
84+
func Test_TryProcessSingleOrchestrationWorkItem_Idempotency(t *testing.T) {
85+
workflowID := "test123"
86+
wi := &backend.OrchestrationWorkItem{
87+
InstanceID: api.InstanceID(workflowID),
88+
NewEvents: []*protos.HistoryEvent{
89+
{
90+
EventId: -1,
91+
Timestamp: timestamppb.New(time.Now()),
92+
EventType: &protos.HistoryEvent_ExecutionStarted{
93+
ExecutionStarted: &protos.ExecutionStartedEvent{
94+
Name: "MyOrch",
95+
OrchestrationInstance: &protos.OrchestrationInstance{
96+
InstanceId: workflowID,
97+
ExecutionId: wrapperspb.String(uuid.New().String()),
98+
},
99+
},
100+
},
101+
},
102+
},
103+
State: runtimestate.NewOrchestrationRuntimeState(workflowID, nil, []*protos.HistoryEvent{}),
104+
}
105+
106+
ctx, cancel := context.WithCancel(context.Background())
107+
t.Cleanup(cancel)
108+
109+
completed := atomic.Bool{}
110+
be := mocks.NewBackend(t)
111+
ex := mocks.NewExecutor(t)
112+
113+
callNumber := 0
114+
ex.EXPECT().ExecuteOrchestrator(anyContext, wi.InstanceID, wi.State.OldEvents, mock.Anything).RunAndReturn(func(ctx context.Context, iid api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) (*protos.OrchestratorResponse, error) {
115+
callNumber++
116+
logger.Debugf("execute orchestrator called %d times", callNumber)
117+
if callNumber == 1 {
118+
return nil, errors.New("dummy error")
119+
}
120+
return &protos.OrchestratorResponse{}, nil
121+
}).Times(2)
122+
123+
be.EXPECT().NextOrchestrationWorkItem(anyContext).Return(wi, nil).Once()
124+
be.EXPECT().AbandonOrchestrationWorkItem(anyContext, wi).Return(nil).Once()
125+
126+
be.EXPECT().NextOrchestrationWorkItem(anyContext).Return(wi, nil).Once()
127+
be.EXPECT().CompleteOrchestrationWorkItem(anyContext, wi).RunAndReturn(func(ctx context.Context, owi *backend.OrchestrationWorkItem) error {
128+
completed.Store(true)
129+
return nil
130+
}).Once()
131+
132+
be.EXPECT().NextOrchestrationWorkItem(anyContext).Return(nil, errors.New("")).Once().Run(func(mock.Arguments) {
133+
cancel()
134+
})
135+
136+
worker := backend.NewOrchestrationWorker(be, ex, logger, backend.WithMaxParallelism(1))
137+
worker.Start(ctx)
138+
139+
require.Eventually(t, completed.Load, 2*time.Second, 10*time.Millisecond)
140+
141+
worker.StopAndDrain()
142+
143+
t.Logf("state.NewEvents: %v", wi.State.NewEvents)
144+
require.Len(t, wi.State.NewEvents, 3)
145+
require.NotNil(t, wi.State.NewEvents[0].GetOrchestratorStarted())
146+
require.NotNil(t, wi.State.NewEvents[1].GetExecutionStarted())
147+
require.NotNil(t, wi.State.NewEvents[2].GetOrchestratorStarted())
77148
}
78149

79150
func Test_TryProcessSingleOrchestrationWorkItem_ExecutionStartedAndCompleted(t *testing.T) {
@@ -156,6 +227,12 @@ func Test_TryProcessSingleOrchestrationWorkItem_ExecutionStartedAndCompleted(t *
156227
}, 1*time.Second, 100*time.Millisecond)
157228

158229
worker.StopAndDrain()
230+
231+
t.Logf("state.NewEvents: %v", state.NewEvents)
232+
require.Len(t, state.NewEvents, 3)
233+
require.NotNil(t, wi.State.NewEvents[0].GetOrchestratorStarted())
234+
require.NotNil(t, wi.State.NewEvents[1].GetExecutionStarted())
235+
require.NotNil(t, wi.State.NewEvents[2].GetExecutionCompleted())
159236
}
160237

161238
func Test_TaskWorker(t *testing.T) {

0 commit comments

Comments
 (0)