Skip to content

Commit 094d4cb

Browse files
Address false positives in replay mode
1 parent 259ef5f commit 094d4cb

File tree

5 files changed

+341
-64
lines changed

5 files changed

+341
-64
lines changed

internal/internal_task_handlers.go

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,28 @@ func isDecisionEvent(eventType s.EventType) bool {
260260
}
261261
}
262262

263+
// isDecisionEventForReplay is different from isDecisionEvent because during replays
264+
// we want to intentionally ignore workflow complete/fail/cancel/continueasnew events so that
265+
// decision tree replays matches with the workflow processing respond tasks
266+
func isDecisionEventForReplay(eventType s.EventType) bool {
267+
switch eventType {
268+
case
269+
s.EventTypeActivityTaskScheduled,
270+
s.EventTypeActivityTaskCancelRequested,
271+
s.EventTypeTimerStarted,
272+
s.EventTypeTimerCanceled,
273+
s.EventTypeCancelTimerFailed,
274+
s.EventTypeMarkerRecorded,
275+
s.EventTypeStartChildWorkflowExecutionInitiated,
276+
s.EventTypeRequestCancelExternalWorkflowExecutionInitiated,
277+
s.EventTypeSignalExternalWorkflowExecutionInitiated,
278+
s.EventTypeUpsertWorkflowSearchAttributes:
279+
return true
280+
default:
281+
return false
282+
}
283+
}
284+
263285
// NextDecisionEvents returns events that there processed as new by the next decision.
264286
// TODO(maxim): Refactor to return a struct instead of multiple parameters
265287
func (eh *history) NextDecisionEvents() (result []*s.HistoryEvent, markers []*s.HistoryEvent, binaryChecksum *string, err error) {
@@ -830,6 +852,19 @@ process_Workflow_Loop:
830852
return response, err
831853
}
832854

855+
// ProcessWorkflowTask processes the given workflow which includes
856+
// - fetching, reordering and replaying historical decision events. (Decision events in this context is an umbrella term for workflow relevant events)
857+
// - state machine is incrementally built with every decision.
858+
// - state machine makes sure that when a workflow restarts for some reason same activities (or timers etc.) are not called again and previous result state is loaded into memory
859+
//
860+
// Note about Replay tests mode:
861+
//
862+
// This mode works by replaying the historical decision events responses (as defined in isDecisionEventForReplay())
863+
// and comparing these with the replays gotten from state machine
864+
//
865+
// Compared to isDecisionEvent(), isDecisionEventForReplay() omits the following events even though they are workflow relevant respond events:
866+
// complete/failed/cancel/continueasnew
867+
// The reason is that state machine doesn't have a correspondong decision for these so they cause false positive non-determinism errors in Replay tests.
833868
func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflowTask) (interface{}, error) {
834869
task := workflowTask.task
835870
historyIterator := workflowTask.historyIterator
@@ -885,8 +920,15 @@ ProcessEvents:
885920
for i, event := range reorderedEvents {
886921
isInReplay := reorderedHistory.IsReplayEvent(event)
887922
isLast := !isInReplay && i == len(reorderedEvents)-1
888-
if !skipReplayCheck && isDecisionEvent(event.GetEventType()) {
889-
respondEvents = append(respondEvents, event)
923+
if !skipReplayCheck {
924+
isDecisionEventFn := isDecisionEvent
925+
if isInReplay {
926+
isDecisionEventFn = isDecisionEventForReplay
927+
}
928+
929+
if isDecisionEventFn(event.GetEventType()) {
930+
respondEvents = append(respondEvents, event)
931+
}
890932
}
891933

892934
if isPreloadMarkerEvent(event) {
@@ -904,7 +946,8 @@ ProcessEvents:
904946
if err != nil {
905947
return nil, err
906948
}
907-
if w.isWorkflowCompleted {
949+
950+
if w.isWorkflowCompleted && !isInReplay {
908951
break ProcessEvents
909952
}
910953
}
@@ -922,8 +965,7 @@ ProcessEvents:
922965
}
923966
}
924967
isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1])
925-
lastDecisionEventsForReplayTest := isReplayTest && !reorderedHistory.HasNextDecisionEvents()
926-
if isReplay && !lastDecisionEventsForReplayTest {
968+
if isReplay {
927969
eventDecisions := eventHandler.decisionsHelper.getDecisions(true)
928970
if len(eventDecisions) > 0 && !skipReplayCheck {
929971
replayDecisions = append(replayDecisions, eventDecisions...)
@@ -970,7 +1012,6 @@ ProcessEvents:
9701012
}
9711013

9721014
if nonDeterministicErr != nil {
973-
9741015
w.wth.metricsScope.GetTaggedScope(tagWorkflowType, task.WorkflowType.GetName()).Counter(metrics.NonDeterministicError).Inc(1)
9751016
w.wth.logger.Error("non-deterministic-error",
9761017
zap.String(tagWorkflowType, task.WorkflowType.GetName()),

test/replaytests/replay_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func TestGreetingsWorkflow(t *testing.T) {
7373
}
7474

7575
// Should have failed but passed. Maybe, because the result recorded in history still matches the return type of the workflow.
76+
// TODO(remove comment): Passes now
7677
func TestGreetingsWorkflow3(t *testing.T) {
7778
replayer := worker.NewWorkflowReplayer()
7879
replayer.RegisterActivityWithOptions(getNameActivity3, activity.RegisterOptions{Name: "main.getNameActivity", DisableAlreadyRegisteredCheck: true})
@@ -121,14 +122,15 @@ func TestExclusiveChoiceWorkflowWithUnregisteredActivity(t *testing.T) {
121122
// that registered activity is different from executed activity.
122123
// The replayer relies on whatever is recorded in the History so as long as the main activity name in the options matched partially
123124
// it doesn't raise errors.
125+
// TODO(remove comment): Replayer now catches this
124126
func TestExclusiveChoiceWorkflowWithDifferentActvityCombo(t *testing.T) {
125127
replayer := worker.NewWorkflowReplayer()
126128

127129
replayer.RegisterWorkflowWithOptions(exclusiveChoiceWorkflow2, workflow.RegisterOptions{Name: "choice"})
128130
replayer.RegisterActivityWithOptions(getAppleOrderActivity, activity.RegisterOptions{Name: "main.getOrderActivity"})
129131
replayer.RegisterActivityWithOptions(orderAppleActivity, activity.RegisterOptions{Name: "testactivity"})
130132
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "choice.json")
131-
require.NoError(t, err)
133+
assert.ErrorContains(t, err, "nondeterministic workflow")
132134
}
133135

134136
func TestBranchWorkflow(t *testing.T) {
@@ -155,9 +157,10 @@ func TestBranchWorkflowWithExtraBranch(t *testing.T) {
155157
func TestSequentialStepsWorkflow(t *testing.T) {
156158
replayer := worker.NewWorkflowReplayer()
157159

158-
replayer.RegisterWorkflowWithOptions(sequantialStepsWorkflow, workflow.RegisterOptions{Name: "sequentialStepsWorkflow"})
160+
replayer.RegisterWorkflowWithOptions(replayerHelloWorldWorkflow, workflow.RegisterOptions{Name: "fx.ReplayerHelloWorldWorkflow"})
161+
replayer.RegisterActivityWithOptions(replayerHelloWorldActivity, activity.RegisterOptions{Name: "replayerhello"})
159162

160-
// branch.json file contains history of a run with 3 activity calls
163+
// sequential.json file contains history of a run with 2 activity calls sequentially
161164
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "sequential.json")
162165
assert.ErrorContains(t, err, "nondeterministic workflow")
163166
}
@@ -173,11 +176,12 @@ func TestParallel(t *testing.T) {
173176

174177
// Should have failed since the first go routine has only one branch whereas the history has two branches.
175178
// The replayer totally misses this change.
179+
// TODO(remove comment): Replayer now catches this
176180
func TestParallel2(t *testing.T) {
177181
replayer := worker.NewWorkflowReplayer()
178182

179183
replayer.RegisterWorkflowWithOptions(sampleParallelWorkflow2, workflow.RegisterOptions{Name: "branch2"})
180184

181185
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "branch2.json")
182-
require.NoError(t, err)
186+
assert.ErrorContains(t, err, "nondeterministic workflow")
183187
}

test/replaytests/sequantial_steps_workflow.go

Lines changed: 0 additions & 54 deletions
This file was deleted.

0 commit comments

Comments
 (0)