Skip to content

Commit 8eb8258

Browse files
committed
add TestVersionedWorkflows case
1 parent 4ef37a2 commit 8eb8258

File tree

4 files changed

+571
-0
lines changed

4 files changed

+571
-0
lines changed

test/replaytests/replay_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,3 +191,104 @@ func TestContinueAsNew(t *testing.T) {
191191
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "continue_as_new.json")
192192
assert.ErrorContains(t, err, "missing replay decision for WorkflowExecutionContinuedAsNew")
193193
}
194+
195+
// TestSafeDeploymentVersionedWorkflow verifies that versioned workflows can be executed
196+
// safely across different versions without causing non-deterministic errors.
197+
// There are 2 workflow executions:
198+
//
199+
// * VersionedWorkflowFoo - which is the first version of the workflow which version of change id is DefaultVersion
200+
// - This workflow is supposed to execute FooActivity
201+
//
202+
// * VersionedWorkflowBar - which is the second version of the workflow which version of change id is 1
203+
// - This workflow is supposed to execute BarActivity
204+
//
205+
// There are 4 versions of the workflow:
206+
//
207+
// * VersionedWorkflowV1 - which supports only DefaultVersion and executes FooActivity
208+
// - This workflow is able to replay the history of only of VersionedWorkflowFoo
209+
//
210+
// * VersionedWorkflowV2 - which supports DefaultVersion and Version 1, and can execute FooActivity or BarActivity
211+
// - This workflow is able to replay the history of both of VersionedWorkflowFoo and VersionedWorkflowBar
212+
// - A first execution of this workflow will should execute FooActivity, because of usage workflow.ExecuteWithMinVersion(),
213+
// but the test can't check it due to Replay
214+
//
215+
// * VersionedWorkflowV3 - which supports DefaultVersion and Version 1, and can execute FooActivity or BarActivity
216+
// - This workflow is able to replay the history of both of VersionedWorkflowFoo and VersionedWorkflowBar
217+
// - A first execution of this workflow will should execute BarActivity, but the test can't check it due to Replay
218+
//
219+
// * VersionedWorkflowV4 - which supports Version 1, and can execute BarActivity
220+
// - This workflow is able to replay the history only of VersionedWorkflowBar
221+
//
222+
// So the test focusing workflows supports forward and backward compatibility of the workflows
223+
func TestVersionedWorkflows(t *testing.T) {
224+
const (
225+
versionedWorkflowFooHistoryFile = "versioned_workflow_foo.json"
226+
versionedWorkflowBarHistoryFile = "versioned_workflow_bar.json"
227+
)
228+
229+
t.Run("VersionedWorkflowV1", func(t *testing.T) {
230+
replayer := worker.NewWorkflowReplayer()
231+
replayer.RegisterWorkflowWithOptions(VersionedWorkflowV1, workflow.RegisterOptions{Name: versionedWorkflowName})
232+
replayer.RegisterActivityWithOptions(FooActivity, activity.RegisterOptions{Name: fooActivityName})
233+
234+
t.Run("successfully replayed with VersionedWorkflowFoo", func(t *testing.T) {
235+
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowFooHistoryFile)
236+
require.NoError(t, err, "Failed to replay VersionedWorkflowFoo history")
237+
})
238+
239+
t.Run("fail to replay with VersionedWorkflowBar", func(t *testing.T) {
240+
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowBarHistoryFile)
241+
require.Error(t, err, "Expected to fail replaying VersionedWorkflowBar history")
242+
})
243+
})
244+
245+
t.Run("VersionedWorkflowV2", func(t *testing.T) {
246+
replayer := worker.NewWorkflowReplayer()
247+
replayer.RegisterWorkflowWithOptions(VersionedWorkflowV2, workflow.RegisterOptions{Name: versionedWorkflowName})
248+
replayer.RegisterActivityWithOptions(FooActivity, activity.RegisterOptions{Name: fooActivityName})
249+
replayer.RegisterActivityWithOptions(BarActivity, activity.RegisterOptions{Name: barActivityName})
250+
251+
t.Run("successfully replayed with VersionedWorkflowFoo", func(t *testing.T) {
252+
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowFooHistoryFile)
253+
require.NoError(t, err, "Failed to replay VersionedWorkflowFoo history")
254+
})
255+
256+
t.Run("successfully replayed with VersionedWorkflowBar", func(t *testing.T) {
257+
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowBarHistoryFile)
258+
require.NoError(t, err, "Failed to replay VersionedWorkflowBar history")
259+
})
260+
})
261+
262+
t.Run("VersionedWorkflowV3", func(t *testing.T) {
263+
replayer := worker.NewWorkflowReplayer()
264+
replayer.RegisterWorkflowWithOptions(VersionedWorkflowV3, workflow.RegisterOptions{Name: versionedWorkflowName})
265+
replayer.RegisterActivityWithOptions(FooActivity, activity.RegisterOptions{Name: fooActivityName})
266+
replayer.RegisterActivityWithOptions(BarActivity, activity.RegisterOptions{Name: barActivityName})
267+
268+
t.Run("successfully replayed with VersionedWorkflowFoo", func(t *testing.T) {
269+
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowFooHistoryFile)
270+
require.NoError(t, err, "Failed to replay VersionedWorkflowFoo history")
271+
})
272+
273+
t.Run("successfully replayed with VersionedWorkflowBar", func(t *testing.T) {
274+
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowBarHistoryFile)
275+
require.NoError(t, err, "Failed to replay VersionedWorkflowBar history")
276+
})
277+
})
278+
279+
t.Run("VersionedWorkflowV4", func(t *testing.T) {
280+
replayer := worker.NewWorkflowReplayer()
281+
replayer.RegisterWorkflowWithOptions(VersionedWorkflowV4, workflow.RegisterOptions{Name: versionedWorkflowName})
282+
replayer.RegisterActivityWithOptions(BarActivity, activity.RegisterOptions{Name: barActivityName})
283+
284+
t.Run("fail to replay with VersionedWorkflowFoo", func(t *testing.T) {
285+
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowFooHistoryFile)
286+
require.Error(t, err, "Expected to fail replaying VersionedWorkflowFoo history")
287+
})
288+
289+
t.Run("successfully replayed with VersionedWorkflowBar", func(t *testing.T) {
290+
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowBarHistoryFile)
291+
require.NoError(t, err, "Failed to replay VersionedWorkflowBar history")
292+
})
293+
})
294+
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package replaytests
2+
3+
import (
4+
"go.uber.org/cadence/workflow"
5+
"time"
6+
)
7+
8+
const (
9+
// testChangeID is a constant used to identify the version change in the workflow.
10+
testChangeID = "test-change"
11+
12+
// fooActivityName and barActivityName are the names of the activities used in the workflows.
13+
fooActivityName = "FooActivity"
14+
barActivityName = "BarActivity"
15+
16+
// versionedWorkflowName is the name of the versioned workflow.
17+
versionedWorkflowName = "VersionedWorkflow"
18+
)
19+
20+
var activityOptions = workflow.ActivityOptions{
21+
ScheduleToStartTimeout: time.Minute,
22+
StartToCloseTimeout: time.Minute,
23+
HeartbeatTimeout: time.Second * 20,
24+
}
25+
26+
// VersionedWorkflowV1 is the first version of the workflow, and it supports only DefaultVersion.
27+
// It supports workflow executions started by this version VersionedWorkflowV1
28+
// and VersionedWorkflowV2, as all of them will have the change ID set to DefaultVersion.
29+
func VersionedWorkflowV1(ctx workflow.Context, _ string) (string, error) {
30+
ctx = workflow.WithActivityOptions(ctx, activityOptions)
31+
32+
var result string
33+
err := workflow.ExecuteActivity(ctx, fooActivityName, "").Get(ctx, &result)
34+
if err != nil {
35+
return "", err
36+
}
37+
38+
return result, nil
39+
}
40+
41+
// VersionedWorkflowV2 is the second version of the workflow. It supports DefaultVersion and Version 1.
42+
// All workflows started by this version will have the change ID set to DefaultVersion.
43+
// It supports workflow executions started by VersionedWorkflowV1 and VersionedWorkflowV2,
44+
// as all of them will have the change ID set to DefaultVersion.
45+
// It also supports workflow executions started by VersionedWorkflowV3 and VersionedWorkflowV4
46+
// because the code supports execution of Version 1 of the workflow.
47+
func VersionedWorkflowV2(ctx workflow.Context, _ string) (string, error) {
48+
ctx = workflow.WithActivityOptions(ctx, activityOptions)
49+
50+
var result string
51+
var err error
52+
53+
version := workflow.GetVersion(ctx, testChangeID, workflow.DefaultVersion, 1, workflow.ExecuteWithMinVersion())
54+
if version == workflow.DefaultVersion {
55+
err = workflow.ExecuteActivity(ctx, fooActivityName, "").Get(ctx, &result)
56+
} else {
57+
err = workflow.ExecuteActivity(ctx, barActivityName, "").Get(ctx, &result)
58+
}
59+
if err != nil {
60+
return "", err
61+
}
62+
63+
return result, nil
64+
}
65+
66+
// VersionedWorkflowV3 is the third version of the workflow. It supports DefaultVersion and Version 1 as well.
67+
// However, all workflows started by this version will have the change ID set to Version 1.
68+
// It supports workflow executions started by VersionedWorkflowV1 and VersionedWorkflowV2,
69+
// as all of them will have the change ID set to DefaultVersion, and it supports them.
70+
// It also supports workflow executions started by VersionedWorkflowV3 and VersionedWorkflowV4,
71+
// because the code supports execution of Version 1 of the workflow.
72+
func VersionedWorkflowV3(ctx workflow.Context, _ string) (string, error) {
73+
ctx = workflow.WithActivityOptions(ctx, activityOptions)
74+
75+
var result string
76+
var err error
77+
78+
version := workflow.GetVersion(ctx, testChangeID, workflow.DefaultVersion, 1)
79+
if version == workflow.DefaultVersion {
80+
err = workflow.ExecuteActivity(ctx, fooActivityName, "").Get(ctx, &result)
81+
} else {
82+
err = workflow.ExecuteActivity(ctx, barActivityName, "").Get(ctx, &result)
83+
}
84+
if err != nil {
85+
return "", err
86+
}
87+
88+
return result, nil
89+
}
90+
91+
// VersionedWorkflowV4 is the fourth version of the workflow. It supports only Version 1.
92+
// All workflows started by this version will have the change ID set to Version 1.
93+
// It supports workflow executions started by VersionedWorkflowV3 and VersionedWorkflowV4,
94+
// as all of them will have the change ID set to Version 1.
95+
func VersionedWorkflowV4(ctx workflow.Context, _ string) (string, error) {
96+
ctx = workflow.WithActivityOptions(ctx, activityOptions)
97+
98+
var result string
99+
100+
workflow.GetVersion(ctx, testChangeID, 1, 1)
101+
err := workflow.ExecuteActivity(ctx, barActivityName, "").Get(ctx, &result)
102+
if err != nil {
103+
return "", err
104+
}
105+
106+
return result, nil
107+
}
108+
109+
// FooActivity returns "foo" as a result of the activity execution.
110+
func FooActivity(ctx workflow.Context, _ string) (string, error) {
111+
return "foo", nil
112+
}
113+
114+
// BarActivity returns "bar" as a result of the activity execution.
115+
func BarActivity(ctx workflow.Context, _ string) (string, error) {
116+
return "bar", nil
117+
}

0 commit comments

Comments
 (0)