Skip to content

Commit 455259f

Browse files
Address false positives in replay mode
1 parent 55f58be commit 455259f

File tree

5 files changed

+341
-64
lines changed

5 files changed

+341
-64
lines changed

internal/internal_task_handlers.go

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,28 @@ func isDecisionEvent(eventType s.EventType) bool {
261261
}
262262
}
263263

264+
// isDecisionEventForReplay is different from isDecisionEvent because during replays
265+
// we want to intentionally ignore workflow complete/fail/cancel/continueasnew events so that
266+
// decision tree replays matches with the workflow processing respond tasks
267+
func isDecisionEventForReplay(eventType s.EventType) bool {
268+
switch eventType {
269+
case
270+
s.EventTypeActivityTaskScheduled,
271+
s.EventTypeActivityTaskCancelRequested,
272+
s.EventTypeTimerStarted,
273+
s.EventTypeTimerCanceled,
274+
s.EventTypeCancelTimerFailed,
275+
s.EventTypeMarkerRecorded,
276+
s.EventTypeStartChildWorkflowExecutionInitiated,
277+
s.EventTypeRequestCancelExternalWorkflowExecutionInitiated,
278+
s.EventTypeSignalExternalWorkflowExecutionInitiated,
279+
s.EventTypeUpsertWorkflowSearchAttributes:
280+
return true
281+
default:
282+
return false
283+
}
284+
}
285+
264286
// NextDecisionEvents returns events that there processed as new by the next decision.
265287
// TODO(maxim): Refactor to return a struct instead of multiple parameters
266288
func (eh *history) NextDecisionEvents() (result []*s.HistoryEvent, markers []*s.HistoryEvent, binaryChecksum *string, err error) {
@@ -840,6 +862,19 @@ process_Workflow_Loop:
840862
return response, err
841863
}
842864

865+
// ProcessWorkflowTask processes the given workflow which includes
866+
// - fetching, reordering and replaying historical decision events. (Decision events in this context is an umbrella term for workflow relevant events)
867+
// - state machine is incrementally built with every decision.
868+
// - state machine makes sure that when a workflow restarts for some reason same activities (or timers etc.) are not called again and previous result state is loaded into memory
869+
//
870+
// Note about Replay tests mode:
871+
//
872+
// This mode works by replaying the historical decision events responses (as defined in isDecisionEventForReplay())
873+
// and comparing these with the replays gotten from state machine
874+
//
875+
// Compared to isDecisionEvent(), isDecisionEventForReplay() omits the following events even though they are workflow relevant respond events:
876+
// complete/failed/cancel/continueasnew
877+
// The reason is that state machine doesn't have a correspondong decision for these so they cause false positive non-determinism errors in Replay tests.
843878
func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflowTask) (interface{}, error) {
844879
task := workflowTask.task
845880
historyIterator := workflowTask.historyIterator
@@ -895,8 +930,15 @@ ProcessEvents:
895930
for i, event := range reorderedEvents {
896931
isInReplay := reorderedHistory.IsReplayEvent(event)
897932
isLast := !isInReplay && i == len(reorderedEvents)-1
898-
if !skipReplayCheck && isDecisionEvent(event.GetEventType()) {
899-
respondEvents = append(respondEvents, event)
933+
if !skipReplayCheck {
934+
isDecisionEventFn := isDecisionEvent
935+
if isInReplay {
936+
isDecisionEventFn = isDecisionEventForReplay
937+
}
938+
939+
if isDecisionEventFn(event.GetEventType()) {
940+
respondEvents = append(respondEvents, event)
941+
}
900942
}
901943

902944
if isPreloadMarkerEvent(event) {
@@ -914,7 +956,8 @@ ProcessEvents:
914956
if err != nil {
915957
return nil, err
916958
}
917-
if w.isWorkflowCompleted {
959+
960+
if w.isWorkflowCompleted && !isInReplay {
918961
break ProcessEvents
919962
}
920963
}
@@ -932,8 +975,7 @@ ProcessEvents:
932975
}
933976
}
934977
isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1])
935-
lastDecisionEventsForReplayTest := isReplayTest && !reorderedHistory.HasNextDecisionEvents()
936-
if isReplay && !lastDecisionEventsForReplayTest {
978+
if isReplay {
937979
eventDecisions := eventHandler.decisionsHelper.getDecisions(true)
938980
if len(eventDecisions) > 0 && !skipReplayCheck {
939981
replayDecisions = append(replayDecisions, eventDecisions...)
@@ -980,7 +1022,6 @@ ProcessEvents:
9801022
}
9811023

9821024
if nonDeterministicErr != nil {
983-
9841025
w.wth.metricsScope.GetTaggedScope(tagWorkflowType, task.WorkflowType.GetName()).Counter(metrics.NonDeterministicError).Inc(1)
9851026
w.wth.logger.Error("non-deterministic-error",
9861027
zap.String(tagWorkflowType, task.WorkflowType.GetName()),

test/replaytests/replay_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func TestGreetingsWorkflow(t *testing.T) {
7373
}
7474

7575
// Should have failed but passed. Maybe, because the result recorded in history still matches the return type of the workflow.
76+
// TODO(remove comment): Passes now
7677
func TestGreetingsWorkflow3(t *testing.T) {
7778
replayer := worker.NewWorkflowReplayer()
7879
replayer.RegisterActivityWithOptions(getNameActivity3, activity.RegisterOptions{Name: "main.getNameActivity", DisableAlreadyRegisteredCheck: true})
@@ -121,14 +122,15 @@ func TestExclusiveChoiceWorkflowWithUnregisteredActivity(t *testing.T) {
121122
// that registered activity is different from executed activity.
122123
// The replayer relies on whatever is recorded in the History so as long as the main activity name in the options matched partially
123124
// it doesn't raise errors.
125+
// TODO(remove comment): Replayer now catches this
124126
func TestExclusiveChoiceWorkflowWithDifferentActvityCombo(t *testing.T) {
125127
replayer := worker.NewWorkflowReplayer()
126128

127129
replayer.RegisterWorkflowWithOptions(exclusiveChoiceWorkflow2, workflow.RegisterOptions{Name: "choice"})
128130
replayer.RegisterActivityWithOptions(getAppleOrderActivity, activity.RegisterOptions{Name: "main.getOrderActivity"})
129131
replayer.RegisterActivityWithOptions(orderAppleActivity, activity.RegisterOptions{Name: "testactivity"})
130132
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "choice.json")
131-
require.NoError(t, err)
133+
assert.ErrorContains(t, err, "nondeterministic workflow")
132134
}
133135

134136
func TestBranchWorkflow(t *testing.T) {
@@ -155,9 +157,10 @@ func TestBranchWorkflowWithExtraBranch(t *testing.T) {
155157
func TestSequentialStepsWorkflow(t *testing.T) {
156158
replayer := worker.NewWorkflowReplayer()
157159

158-
replayer.RegisterWorkflowWithOptions(sequantialStepsWorkflow, workflow.RegisterOptions{Name: "sequentialStepsWorkflow"})
160+
replayer.RegisterWorkflowWithOptions(replayerHelloWorldWorkflow, workflow.RegisterOptions{Name: "fx.ReplayerHelloWorldWorkflow"})
161+
replayer.RegisterActivityWithOptions(replayerHelloWorldActivity, activity.RegisterOptions{Name: "replayerhello"})
159162

160-
// branch.json file contains history of a run with 3 activity calls
163+
// sequential.json file contains history of a run with 2 activity calls sequentially
161164
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "sequential.json")
162165
assert.ErrorContains(t, err, "nondeterministic workflow")
163166
}
@@ -173,11 +176,12 @@ func TestParallel(t *testing.T) {
173176

174177
// Should have failed since the first go routine has only one branch whereas the history has two branches.
175178
// The replayer totally misses this change.
179+
// TODO(remove comment): Replayer now catches this
176180
func TestParallel2(t *testing.T) {
177181
replayer := worker.NewWorkflowReplayer()
178182

179183
replayer.RegisterWorkflowWithOptions(sampleParallelWorkflow2, workflow.RegisterOptions{Name: "branch2"})
180184

181185
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "branch2.json")
182-
require.NoError(t, err)
186+
assert.ErrorContains(t, err, "nondeterministic workflow")
183187
}

test/replaytests/sequantial_steps_workflow.go

Lines changed: 0 additions & 54 deletions
This file was deleted.

0 commit comments

Comments
 (0)