Skip to content

Commit 223fb5d

Browse files
fix and improve test coverage of replays, add explanation comments to not-obvious places
1 parent b5ec4d9 commit 223fb5d

File tree

8 files changed

+107
-91
lines changed

8 files changed

+107
-91
lines changed

internal/error.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,7 @@ func (e *NonDeterministicError) Error() string {
508508
case "mismatch":
509509
// historical text
510510
return "nondeterministic workflow: " +
511+
"mismatching history event and replay decision found. " +
511512
"history event is " + e.HistoryEventText + ", " +
512513
"replay decision is " + e.DecisionText
513514
default:

internal/internal_activity.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,11 @@ func deSerializeFunctionResult(f interface{}, result []byte, to interface{}, dat
432432
}
433433

434434
// For everything we return result.
435+
// TODO(remove comment):
436+
// Code reaches here for 2 cases
437+
// 1. activity is executed by name (not the func pointer) and it wasn't registered
438+
// 2. activity is executed by func pointer and the signature indicates it doesn't/can't return data.
439+
// for example it only has one return parameter (which can only be be error).
435440
return decodeArg(dataConverter, result, to)
436441
}
437442

internal/internal_task_handlers.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -961,6 +961,9 @@ ProcessEvents:
961961
return nil, err
962962
}
963963

964+
// Break the event processing loop if the workflow is completed except in replay mode.
965+
// In replay mode we check for nondeterminism cases and
966+
// breaking the loop causes missing events in respondEvents which can cause false positives or false negatives.
964967
if w.isWorkflowCompleted && !isInReplay {
965968
break ProcessEvents
966969
}

internal/internal_task_handlers_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -880,9 +880,9 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicLogNonexistingI
880880
require.NotNil(t.T(), replayErrorField)
881881
require.Equal(t.T(), zapcore.ErrorType, replayErrorField.Type)
882882
require.ErrorContains(t.T(), replayErrorField.Interface.(error),
883-
"nondeterministic workflow: "+
883+
"nondeterministic workflow: mismatching history event and replay decision found. "+
884884
"history event is ActivityTaskScheduled: (ActivityId:NotAnActivityID, ActivityType:(Name:pkg.Greeter_Activity), TaskList:(Name:taskList), Input:[]), "+
885-
"replay decision is ScheduleActivityTask: (ActivityId:0, ActivityType:(Name:Greeter_Activity), TaskList:(Name:taskList)")
885+
"replay decision is ScheduleActivityTask: (ActivityId:0, ActivityType:(Name:Greeter_Activity), TaskList:(Name:taskList), Input:[], ScheduleToCloseTimeoutSeconds:120, ScheduleToStartTimeoutSeconds:60, StartToCloseTimeoutSeconds:60, HeartbeatTimeoutSeconds:20, Header:(Fields:map[]))")
886886
}
887887

888888
func (t *TaskHandlersTestSuite) TestWorkflowTask_WorkflowReturnsPanicError() {

test/replaytests/choice.json

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,34 @@
176176
},
177177
{
178178
"eventId": 12,
179-
"timestamp": 1679427717321911295,
179+
"timestamp": 1679427717321780254,
180+
"eventType": "ActivityTaskStarted",
181+
"version": 0,
182+
"taskId": 5243011,
183+
"activityTaskStartedEventAttributes": {
184+
"scheduledEventId": 11,
185+
"identity": "82203@agautam-NV709R969P@choiceGroup@41d230ae-253a-4d01-9079-322ef05c09fb",
186+
"requestId": "ae2aad96-6588-4359-807b-a39a16f0896a",
187+
"attempt": 0,
188+
"lastFailureReason": ""
189+
}
190+
},
191+
{
192+
"eventId": 13,
193+
"timestamp": 1679427717321780255,
194+
"eventType": "ActivityTaskCompleted",
195+
"version": 0,
196+
"taskId": 5243000,
197+
"activityTaskCompletedEventAttributes": {
198+
"result": "ImJhbmFuYSIK",
199+
"scheduledEventId": 11,
200+
"startedEventId": 12,
201+
"identity": "82203@agautam-NV709R969P@choiceGroup@41d230ae-253a-4d01-9079-322ef05c09fb"
202+
}
203+
},
204+
{
205+
"eventId": 14,
206+
"timestamp": 1679427717321780256,
180207
"eventType": "WorkflowExecutionCompleted",
181208
"version": 0,
182209
"taskId": 5243011,

test/replaytests/exclusive_choice_workflow.go

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ const (
3939
orderChoiceCherry = "cherry"
4040
)
4141

42-
// exclusiveChoiceWorkflow Workflow Decider. This workflow executes Cherry order.
42+
// exclusiveChoiceWorkflow executes main.getOrderActivity and executes either cherry or banana activity depends on what main.getOrderActivity returns
4343
func exclusiveChoiceWorkflow(ctx workflow.Context) error {
4444
// Get order.
4545
ao := workflow.ActivityOptions{
@@ -50,7 +50,7 @@ func exclusiveChoiceWorkflow(ctx workflow.Context) error {
5050
ctx = workflow.WithActivityOptions(ctx, ao)
5151

5252
var orderChoice string
53-
err := workflow.ExecuteActivity(ctx, getOrderActivity).Get(ctx, &orderChoice)
53+
err := workflow.ExecuteActivity(ctx, "main.getOrderActivity").Get(ctx, &orderChoice)
5454
if err != nil {
5555
return err
5656
}
@@ -60,9 +60,9 @@ func exclusiveChoiceWorkflow(ctx workflow.Context) error {
6060
// choose next activity based on order result
6161
switch orderChoice {
6262
case orderChoiceBanana:
63-
workflow.ExecuteActivity(ctx, orderBananaActivity, orderChoice)
63+
workflow.ExecuteActivity(ctx, "main.orderBananaActivity", orderChoice)
6464
case orderChoiceCherry:
65-
workflow.ExecuteActivity(ctx, orderCherryActivity, orderChoice)
65+
workflow.ExecuteActivity(ctx, "main.orderCherryActivity", orderChoice)
6666
default:
6767
logger.Error("Unexpected order", zap.String("Choice", orderChoice))
6868
}
@@ -71,8 +71,8 @@ func exclusiveChoiceWorkflow(ctx workflow.Context) error {
7171
return nil
7272
}
7373

74-
// This workflow explicitly executes Apple Activity received from the getorderActivity.
75-
func exclusiveChoiceWorkflow2(ctx workflow.Context) error {
74+
// exclusiveChoiceWorkflow executes main.getOrderActivity and executes either cherry or banana activity depends on what main.getOrderActivity returns
75+
func exclusiveChoiceWorkflowAlwaysCherry(ctx workflow.Context) error {
7676
// Get order.
7777
ao := workflow.ActivityOptions{
7878
ScheduleToStartTimeout: time.Minute,
@@ -82,40 +82,25 @@ func exclusiveChoiceWorkflow2(ctx workflow.Context) error {
8282
ctx = workflow.WithActivityOptions(ctx, ao)
8383

8484
var orderChoice string
85-
err := workflow.ExecuteActivity(ctx, getAppleOrderActivity).Get(ctx, &orderChoice)
85+
err := workflow.ExecuteActivity(ctx, "main.getOrderActivity").Get(ctx, &orderChoice)
8686
if err != nil {
8787
return err
8888
}
8989

9090
logger := workflow.GetLogger(ctx)
91+
logger.Sugar().Infof("Got order for %s but will ignore and order cherry!!", orderChoice)
9192

92-
// choose next activity based on order result. It's apple in this case.
93-
switch orderChoice {
94-
case orderChoiceApple:
95-
workflow.ExecuteActivity(ctx, orderAppleActivity, orderChoice)
96-
default:
97-
logger.Error("Unexpected order", zap.String("Choice", orderChoice))
98-
}
93+
workflow.ExecuteActivity(ctx, "main.orderCherryActivity", orderChoice)
9994

10095
logger.Info("Workflow completed.")
10196
return nil
10297
}
10398

104-
func getOrderActivity() (string, error) {
105-
fmt.Printf("Order is for Cherry")
106-
return "cherry", nil
107-
}
108-
109-
func getAppleOrderActivity() (string, error) {
99+
func getBananaOrderActivity() (string, error) {
110100
fmt.Printf("Order is for Apple")
111101
return "apple", nil
112102
}
113103

114-
func orderAppleActivity(choice string) error {
115-
fmt.Printf("Order choice: %v\n", choice)
116-
return nil
117-
}
118-
119104
func orderBananaActivity(choice string) error {
120105
fmt.Printf("Order choice: %v\n", choice)
121106
return nil

test/replaytests/replay_test.go

Lines changed: 57 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"go.uber.org/cadence/workflow"
3636
)
3737

38+
// Basic happy paths
3839
func TestReplayWorkflowHistoryFromFile(t *testing.T) {
3940
for _, testFile := range []string{"basic.json", "basic_new.json", "version.json", "version_new.json"} {
4041
t.Run("replay_"+strings.Split(testFile, ".")[0], func(t *testing.T) {
@@ -48,6 +49,7 @@ func TestReplayWorkflowHistoryFromFile(t *testing.T) {
4849
}
4950
}
5051

52+
// Child workflow happy path
5153
func TestReplayChildWorkflowBugBackport(t *testing.T) {
5254
replayer := worker.NewWorkflowReplayer()
5355
replayer.RegisterWorkflowWithOptions(childWorkflow, workflow.RegisterOptions{Name: "child"})
@@ -62,18 +64,18 @@ func TestGreetingsWorkflowforActivity(t *testing.T) {
6264
replayer := worker.NewWorkflowReplayer()
6365
replayer.RegisterWorkflowWithOptions(greetingsWorkflowActivity, workflow.RegisterOptions{Name: "greetings"})
6466
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "greetings.json")
65-
require.Error(t, err)
67+
assert.ErrorContains(t, err, "nondeterministic workflow: mismatching history event and replay decision found")
6668
}
6769

70+
// Simple greeting workflow with 3 activities executed sequentially: getGreetingsActivity, getNameActivity, sayGreetingsActivity
6871
func TestGreetingsWorkflow(t *testing.T) {
6972
replayer := worker.NewWorkflowReplayer()
7073
replayer.RegisterWorkflowWithOptions(greetingsWorkflow, workflow.RegisterOptions{Name: "greetings"})
7174
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "greetings.json")
7275
require.NoError(t, err)
7376
}
7477

75-
// Should have failed but passed. Maybe, because the result recorded in history still matches the return type of the workflow.
76-
// TODO(remove comment): Debug why is this still missed
78+
// Return types of activity change is not considered non-determinism (at least for now) so this test doesn't find non-determinism error
7779
func TestGreetingsWorkflow3(t *testing.T) {
7880
replayer := worker.NewWorkflowReplayer()
7981
replayer.RegisterActivityWithOptions(getNameActivity3, activity.RegisterOptions{Name: "main.getNameActivity", DisableAlreadyRegisteredCheck: true})
@@ -82,107 +84,100 @@ func TestGreetingsWorkflow3(t *testing.T) {
8284
require.NoError(t, err)
8385
}
8486

85-
// Fails because the expected signature was different from history.
86-
func TestGreetingsWorkflow4(t *testing.T) {
87+
// The recorded history has following activities in this order: main.getOrderActivity, main.orderBananaActivity
88+
// This test runs a version of choice workflow which does the exact same thing so no errors expected.
89+
func TestExclusiveChoiceWorkflowSuccess(t *testing.T) {
8790
replayer := worker.NewWorkflowReplayer()
88-
replayer.RegisterActivityWithOptions(getNameActivity4, activity.RegisterOptions{Name: "main.getNameActivity", DisableAlreadyRegisteredCheck: true})
89-
replayer.RegisterWorkflowWithOptions(greetingsWorkflow4, workflow.RegisterOptions{Name: "greetings"})
90-
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "greetings.json")
91-
require.Error(t, err)
91+
replayer.RegisterWorkflowWithOptions(exclusiveChoiceWorkflow, workflow.RegisterOptions{Name: "choice"})
92+
replayer.RegisterActivityWithOptions(getBananaOrderActivity, activity.RegisterOptions{Name: "main.getOrderActivity"})
93+
replayer.RegisterActivityWithOptions(orderBananaActivity, activity.RegisterOptions{Name: "main.orderBananaActivity"})
94+
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "choice.json")
95+
require.NoError(t, err)
9296
}
9397

94-
// Panic with failed to register activity. This passes in cadence_samples because it's registered in Helper.
95-
// To test it on cadence_samples change the https://github.com/uber-common/cadence-samples/blob/master/cmd/samples/recipes/greetings/greetings_workflow.go
96-
// to include the extra return types in getNameActivity.
97-
func TestGreetingsWorkflow2(t *testing.T) {
98-
99-
t.Skip("Panic with failed to register activity. Here the activity returns incompatible arguments so the test should fail")
98+
// The recorded history has following activities in this order: main.getOrderActivity, main.orderBananaActivity
99+
// This test runs a version of choice workflow which does the exact same thing but the activities are not registered.
100+
// It doesn't matter for replayer so no exceptions expected.
101+
// The reason is that activity result decoding logic just passes the result back to the given pointer
102+
func TestExclusiveChoiceWorkflowActivitiyRegistrationMissing(t *testing.T) {
100103
replayer := worker.NewWorkflowReplayer()
101-
replayer.RegisterActivityWithOptions(getNameActivity2, activity.RegisterOptions{Name: "main.getNameActivity", DisableAlreadyRegisteredCheck: true})
102-
replayer.RegisterWorkflowWithOptions(greetingsWorkflow2, workflow.RegisterOptions{Name: "greetings"})
103-
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "greetings.json")
104-
require.Error(t, err)
104+
replayer.RegisterWorkflowWithOptions(exclusiveChoiceWorkflow, workflow.RegisterOptions{Name: "choice"})
105+
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "choice.json")
106+
require.NoError(t, err)
105107
}
106108

107-
// Ideally replayer doesn't concern itself with the change in the activity content until it matches the expected output type.
108-
// History has recorded the output of banana activity instead. The replayer should have failed because we have not registered any
109-
// activity here in the test.
110-
// The replayer still runs whatever it found in the history and passes.
111-
func TestExclusiveChoiceWorkflowWithUnregisteredActivity(t *testing.T) {
109+
// The recorded history has following activities in this order: main.getOrderActivity, main.orderBananaActivity
110+
// This test runs a version of choice workflow which registers a single return parameter function for main.getOrderActivity
111+
// - Original main.getOrderActivity signature: func() (string, error)
112+
// - New main.getOrderActivity signature: func() error
113+
//
114+
// In this case result of main.getOrderActivity from history is not passed back to the given pointer by the workflow.
115+
// Compared to the activity registration missing scenario (above case) this is a little bit weird behavior.
116+
// The workflow code continues with orderChoice="" instead of "banana". Therefore it doesn't invoke 2nd activity main.getOrderActivity.
117+
// This means history has more events then replay decisions which causes non-determinism error
118+
func TestExclusiveChoiceWorkflowWithActivitySignatureChange(t *testing.T) {
112119
replayer := worker.NewWorkflowReplayer()
113-
114120
replayer.RegisterWorkflowWithOptions(exclusiveChoiceWorkflow, workflow.RegisterOptions{Name: "choice"})
121+
replayer.RegisterActivityWithOptions(func() error { return nil }, activity.RegisterOptions{Name: "main.getOrderActivity"})
122+
replayer.RegisterActivityWithOptions(orderBananaActivity, activity.RegisterOptions{Name: "main.orderBananaActivity"})
115123
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "choice.json")
116-
require.NoError(t, err)
124+
assert.ErrorContains(t, err, "nondeterministic workflow: missing replay decision")
117125
}
118126

119-
// This test registers Cherry Activity as the activity but calls Apple activity in the workflow code. Infact, Cherry and Banana
120-
// activities are not even a part of the workflow code in question.
121-
// History has recorded the output of banana activity. Here, The workflow is not waiting for the activity so it doesn't notice
122-
// that registered activity is different from executed activity.
123-
// The replayer relies on whatever is recorded in the History so as long as the main activity name in the options matched partially
124-
// it doesn't raise errors.
125-
// TODO(remove comment): Replayer now catches this
126-
func TestExclusiveChoiceWorkflowWithDifferentActvityCombo(t *testing.T) {
127+
// The recorded history has following activities in this order: main.getOrderActivity, main.orderBananaActivity
128+
// This test runs a version of choice workflow which calls main.getOrderActivity and then calls the main.orderCherryActivity.
129+
// The replayer will find non-determinism because of mismatch between replay decision and history (banana vs cherry)
130+
func TestExclusiveChoiceWorkflowWithMismatchingActivity(t *testing.T) {
127131
replayer := worker.NewWorkflowReplayer()
128-
129-
replayer.RegisterWorkflowWithOptions(exclusiveChoiceWorkflow2, workflow.RegisterOptions{Name: "choice"})
130-
replayer.RegisterActivityWithOptions(getAppleOrderActivity, activity.RegisterOptions{Name: "main.getOrderActivity"})
131-
replayer.RegisterActivityWithOptions(orderAppleActivity, activity.RegisterOptions{Name: "testactivity"})
132+
replayer.RegisterWorkflowWithOptions(exclusiveChoiceWorkflowAlwaysCherry, workflow.RegisterOptions{Name: "choice"})
133+
replayer.RegisterActivityWithOptions(getBananaOrderActivity, activity.RegisterOptions{Name: "main.getOrderActivity"})
134+
replayer.RegisterActivityWithOptions(orderCherryActivity, activity.RegisterOptions{Name: "main.orderCherryActivity"})
132135
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "choice.json")
133-
assert.ErrorContains(t, err, "nondeterministic workflow")
136+
assert.ErrorContains(t, err, "nondeterministic workflow: mismatching history event and replay decision found")
134137
}
135138

139+
// Branch workflow happy case.
140+
// It branches out to 3 open activities and then they complete.
136141
func TestBranchWorkflow(t *testing.T) {
137142
replayer := worker.NewWorkflowReplayer()
138-
139143
replayer.RegisterWorkflowWithOptions(sampleBranchWorkflow, workflow.RegisterOptions{Name: "branch"})
140-
141144
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "branch.json")
142145
require.NoError(t, err)
143146
}
144147

145-
// Fails with a non deterministic error because there was an additional unexpected branch. Decreasing the number of branches will
146-
// also fail the test because the history expects the same number of branches executing the activity.
148+
// Branch workflow normal history file is replayed against modified workflow code which
149+
// has 2 branches only. This causes nondetereministic error.
147150
func TestBranchWorkflowWithExtraBranch(t *testing.T) {
148151
replayer := worker.NewWorkflowReplayer()
149-
150152
replayer.RegisterWorkflowWithOptions(sampleBranchWorkflow2, workflow.RegisterOptions{Name: "branch"})
151-
152153
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "branch.json")
153-
assert.ErrorContains(t, err, "nondeterministic workflow")
154+
assert.ErrorContains(t, err, "nondeterministic workflow: missing replay decision")
154155
}
155156

156-
// TestSequentialStepsWorkflow replays a history with 2 sequential activity calls and runs it against new version of the workflow code which only calls 1 activity.
157-
// This should be considered as non-determinism error.
157+
// TestSequentialStepsWorkflow replays a history with 2 sequential non overlapping activity calls (one completes before the other is scheduled)
158+
// and runs it against new version of the workflow code which only calls 1 activity.
159+
// This is considered as non-determinism error.
158160
func TestSequentialStepsWorkflow(t *testing.T) {
159161
replayer := worker.NewWorkflowReplayer()
160-
161162
replayer.RegisterWorkflowWithOptions(replayerHelloWorldWorkflow, workflow.RegisterOptions{Name: "fx.ReplayerHelloWorldWorkflow"})
162163
replayer.RegisterActivityWithOptions(replayerHelloWorldActivity, activity.RegisterOptions{Name: "replayerhello"})
163-
164-
// sequential.json file contains history of a run with 2 activity calls sequentially
165164
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "sequential.json")
166-
assert.ErrorContains(t, err, "nondeterministic workflow")
165+
assert.ErrorContains(t, err, "nondeterministic workflow: missing replay decision")
167166
}
168167

169-
func TestParallel(t *testing.T) {
168+
// Runs simpleParallelWorkflow which starts two workflow.Go routines that executes 1 and 2 activities respectively.
169+
func TestSimpleParallelWorkflow(t *testing.T) {
170170
replayer := worker.NewWorkflowReplayer()
171-
172171
replayer.RegisterWorkflowWithOptions(sampleParallelWorkflow, workflow.RegisterOptions{Name: "branch2"})
173-
174172
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "branch2.json")
175173
require.NoError(t, err)
176174
}
177175

178-
// Should have failed since the first go routine has only one branch whereas the history has two branches.
179-
// The replayer totally misses this change.
180-
// TODO(remove comment): Replayer now catches this
181-
func TestParallel2(t *testing.T) {
176+
// Runs modified version of simpleParallelWorkflow which starts 1 less activity in the second workflow-gouroutine.
177+
// This is considered as non-determinism error.
178+
func TestSimpleParallelWorkflowWithMissingActivityCall(t *testing.T) {
182179
replayer := worker.NewWorkflowReplayer()
183-
184180
replayer.RegisterWorkflowWithOptions(sampleParallelWorkflow2, workflow.RegisterOptions{Name: "branch2"})
185-
186181
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "branch2.json")
187-
assert.ErrorContains(t, err, "nondeterministic workflow")
182+
assert.ErrorContains(t, err, "nondeterministic workflow: missing replay decision")
188183
}

test/replaytests/sequential_workflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type replayerSampleMessage struct {
2020
// Corresponding unit test covers the scenario that new workflow's history records is subset of previous version's history.
2121
//
2222
// v1: wf started -> call activity -> call activity -> wf complete
23-
// v2: wf started -> call activity -> wf complete
23+
// v2: wf started -> call activity -> wf complete
2424
//
2525
// The v2 clearly has determinism issues and should be considered as non-determism error for replay tests.
2626
func replayerHelloWorldWorkflow(ctx workflow.Context, inputMsg *replayerSampleMessage) error {

0 commit comments

Comments
 (0)