Skip to content

Commit f85fe93

Browse files
Address false positives in replay mode
1 parent ec3ecd5 commit f85fe93

File tree

5 files changed

+341
-63
lines changed

5 files changed

+341
-63
lines changed

internal/internal_task_handlers.go

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,28 @@ func isDecisionEvent(eventType s.EventType) bool {
262262
}
263263
}
264264

265+
// isDecisionEventForReplay is different from isDecisionEvent because during replays
266+
// we want to intentionally ignore workflow complete/fail/cancel/continueasnew events so that
267+
// decision tree replays matches with the workflow processing respond tasks
268+
func isDecisionEventForReplay(eventType s.EventType) bool {
269+
switch eventType {
270+
case
271+
s.EventTypeActivityTaskScheduled,
272+
s.EventTypeActivityTaskCancelRequested,
273+
s.EventTypeTimerStarted,
274+
s.EventTypeTimerCanceled,
275+
s.EventTypeCancelTimerFailed,
276+
s.EventTypeMarkerRecorded,
277+
s.EventTypeStartChildWorkflowExecutionInitiated,
278+
s.EventTypeRequestCancelExternalWorkflowExecutionInitiated,
279+
s.EventTypeSignalExternalWorkflowExecutionInitiated,
280+
s.EventTypeUpsertWorkflowSearchAttributes:
281+
return true
282+
default:
283+
return false
284+
}
285+
}
286+
265287
// NextDecisionEvents returns events that there processed as new by the next decision.
266288
// TODO(maxim): Refactor to return a struct instead of multiple parameters
267289
func (eh *history) NextDecisionEvents() (result []*s.HistoryEvent, markers []*s.HistoryEvent, binaryChecksum *string, err error) {
@@ -840,6 +862,19 @@ process_Workflow_Loop:
840862
return response, err
841863
}
842864

865+
// ProcessWorkflowTask processes the given workflow which includes
866+
// - fetching, reordering and replaying historical decision events. (Decision events in this context is an umbrella term for workflow relevant events)
867+
// - state machine is incrementally built with every decision.
868+
// - state machine makes sure that when a workflow restarts for some reason same activities (or timers etc.) are not called again and previous result state is loaded into memory
869+
//
870+
// Note about Replay tests mode:
871+
//
872+
// This mode works by replaying the historical decision events responses (as defined in isDecisionEventForReplay())
873+
// and comparing these with the replays gotten from state machine
874+
//
875+
// Compared to isDecisionEvent(), isDecisionEventForReplay() omits the following events even though they are workflow relevant respond events:
876+
// complete/failed/cancel/continueasnew
877+
// The reason is that state machine doesn't have a correspondong decision for these so they cause false positive non-determinism errors in Replay tests.
843878
func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflowTask) (interface{}, error) {
844879
task := workflowTask.task
845880
historyIterator := workflowTask.historyIterator
@@ -899,8 +934,15 @@ ProcessEvents:
899934
for i, event := range reorderedEvents {
900935
isInReplay := reorderedHistory.IsReplayEvent(event)
901936
isLast := !isInReplay && i == len(reorderedEvents)-1
902-
if !skipReplayCheck && isDecisionEvent(event.GetEventType()) {
903-
respondEvents = append(respondEvents, event)
937+
if !skipReplayCheck {
938+
isDecisionEventFn := isDecisionEvent
939+
if isInReplay {
940+
isDecisionEventFn = isDecisionEventForReplay
941+
}
942+
943+
if isDecisionEventFn(event.GetEventType()) {
944+
respondEvents = append(respondEvents, event)
945+
}
904946
}
905947

906948
if isPreloadMarkerEvent(event) {
@@ -918,7 +960,8 @@ ProcessEvents:
918960
if err != nil {
919961
return nil, err
920962
}
921-
if w.isWorkflowCompleted {
963+
964+
if w.isWorkflowCompleted && !isInReplay {
922965
break ProcessEvents
923966
}
924967
}
@@ -936,8 +979,7 @@ ProcessEvents:
936979
}
937980
}
938981
isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1])
939-
lastDecisionEventsForReplayTest := isReplayTest && !reorderedHistory.HasNextDecisionEvents()
940-
if isReplay && !lastDecisionEventsForReplayTest {
982+
if isReplay {
941983
eventDecisions := eventHandler.decisionsHelper.getDecisions(true)
942984
if len(eventDecisions) > 0 && !skipReplayCheck {
943985
replayDecisions = append(replayDecisions, eventDecisions...)

test/replaytests/replay_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func TestGreetingsWorkflow(t *testing.T) {
7373
}
7474

7575
// Should have failed but passed. Maybe, because the result recorded in history still matches the return type of the workflow.
76+
// TODO(remove comment): Passes now
7677
func TestGreetingsWorkflow3(t *testing.T) {
7778
replayer := worker.NewWorkflowReplayer()
7879
replayer.RegisterActivityWithOptions(getNameActivity3, activity.RegisterOptions{Name: "main.getNameActivity", DisableAlreadyRegisteredCheck: true})
@@ -121,14 +122,15 @@ func TestExclusiveChoiceWorkflowWithUnregisteredActivity(t *testing.T) {
121122
// that registered activity is different from executed activity.
122123
// The replayer relies on whatever is recorded in the History so as long as the main activity name in the options matched partially
123124
// it doesn't raise errors.
125+
// TODO(remove comment): Replayer now catches this
124126
func TestExclusiveChoiceWorkflowWithDifferentActvityCombo(t *testing.T) {
125127
replayer := worker.NewWorkflowReplayer()
126128

127129
replayer.RegisterWorkflowWithOptions(exclusiveChoiceWorkflow2, workflow.RegisterOptions{Name: "choice"})
128130
replayer.RegisterActivityWithOptions(getAppleOrderActivity, activity.RegisterOptions{Name: "main.getOrderActivity"})
129131
replayer.RegisterActivityWithOptions(orderAppleActivity, activity.RegisterOptions{Name: "testactivity"})
130132
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "choice.json")
131-
require.NoError(t, err)
133+
assert.ErrorContains(t, err, "nondeterministic workflow")
132134
}
133135

134136
func TestBranchWorkflow(t *testing.T) {
@@ -155,9 +157,10 @@ func TestBranchWorkflowWithExtraBranch(t *testing.T) {
155157
func TestSequentialStepsWorkflow(t *testing.T) {
156158
replayer := worker.NewWorkflowReplayer()
157159

158-
replayer.RegisterWorkflowWithOptions(sequantialStepsWorkflow, workflow.RegisterOptions{Name: "sequentialStepsWorkflow"})
160+
replayer.RegisterWorkflowWithOptions(replayerHelloWorldWorkflow, workflow.RegisterOptions{Name: "fx.ReplayerHelloWorldWorkflow"})
161+
replayer.RegisterActivityWithOptions(replayerHelloWorldActivity, activity.RegisterOptions{Name: "replayerhello"})
159162

160-
// branch.json file contains history of a run with 3 activity calls
163+
// sequential.json file contains history of a run with 2 activity calls sequentially
161164
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "sequential.json")
162165
assert.ErrorContains(t, err, "nondeterministic workflow")
163166
}
@@ -173,11 +176,12 @@ func TestParallel(t *testing.T) {
173176

174177
// Should have failed since the first go routine has only one branch whereas the history has two branches.
175178
// The replayer totally misses this change.
179+
// TODO(remove comment): Replayer now catches this
176180
func TestParallel2(t *testing.T) {
177181
replayer := worker.NewWorkflowReplayer()
178182

179183
replayer.RegisterWorkflowWithOptions(sampleParallelWorkflow2, workflow.RegisterOptions{Name: "branch2"})
180184

181185
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "branch2.json")
182-
require.NoError(t, err)
186+
assert.ErrorContains(t, err, "nondeterministic workflow")
183187
}

test/replaytests/sequantial_steps_workflow.go

Lines changed: 0 additions & 54 deletions
This file was deleted.

0 commit comments

Comments
 (0)