Skip to content

Commit b2c664f

Browse files
committed
add V5 and V6 versions
1 parent f8f7bd9 commit b2c664f

File tree

3 files changed

+159
-6
lines changed

3 files changed

+159
-6
lines changed

test/integration_test.go

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -570,47 +570,95 @@ func (ts *IntegrationTestSuite) beforeVersionedWorkflowTest(testName string, w w
570570
replaytests.SetupWorkerForVersionedWorkflowV3(w)
571571
case "TestVersionedWorkflowV4":
572572
replaytests.SetupWorkerForVersionedWorkflowV4(w)
573+
case "TestVersionedWorkflowV5":
574+
replaytests.SetupWorkerForVersionedWorkflowV5(w)
575+
case "TestVersionedWorkflowV6":
576+
replaytests.SetupWorkerForVersionedWorkflowV6(w)
573577
}
574578
}
575579

576-
// TestVersionedWorkflowV1 tests that a workflow started on the worker with VersionedWorkflowV1 can be replayed on worker with VersionedWorkflowV2 and VersionedWorkflowV3, but not on VersionedWorkflowV4.
580+
// TestVersionedWorkflowV1 tests that a workflow started on the worker with VersionedWorkflowV1
581+
// can be replayed on worker with VersionedWorkflowV2 and VersionedWorkflowV3,
582+
// but not on VersionedWorkflowV4, VersionedWorkflowV5, VersionedWorkflowV6.
577583
func (ts *IntegrationTestSuite) TestVersionedWorkflowV1() {
578584
execution, err := ts.executeWorkflow("test-versioned-workflow-v1", replaytests.VersionedWorkflowName, nil, "arg")
579585
ts.NoError(err)
580586

581587
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV2, execution), "Failed to replay on the replayer with VersionedWorkflowV2")
582-
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Failed to replay on the replayer withVersionedWorkflowV3")
588+
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Failed to replay on the replayer with VersionedWorkflowV3")
583589
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV4, execution), "Expected to fail replaying the replayer with VersionedWorkflowV4")
590+
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV5, execution), "Expected to fail replaying the replayer with VersionedWorkflowV5")
591+
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV6, execution), "Expected to fail replaying the replayer with VersionedWorkflowV6")
584592
}
585593

586-
// TestVersionedWorkflowV1 tests that a workflow started on the worker with VersionedWorkflowV2 can be replayed on worker with VersionedWorkflowV1 and VersionedWorkflowV3, but not on VersionedWorkflowV4.
594+
// TestVersionedWorkflowV2 tests that a workflow started on the worker with VersionedWorkflowV2
595+
// can be replayed on worker with VersionedWorkflowV1 and VersionedWorkflowV3,
596+
// but not on VersionedWorkflowV4, VersionedWorkflowV5, VersionedWorkflowV6.
587597
func (ts *IntegrationTestSuite) TestVersionedWorkflowV2() {
588598
execution, err := ts.executeWorkflow("test-versioned-workflow-v2", replaytests.VersionedWorkflowName, nil, "arg")
589599
ts.NoError(err)
590600

591601
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV1, execution), "Failed to replay on the replayer with VersionedWorkflowV1")
592-
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Failed to replay on the replayer withVersionedWorkflowV3")
602+
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Failed to replay on the replayer with VersionedWorkflowV3")
593603
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV4, execution), "Expected to fail replaying the replayer with VersionedWorkflowV4")
604+
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV5, execution), "Expected to fail replaying the replayer with VersionedWorkflowV5")
605+
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV6, execution), "Expected to fail replaying the replayer with VersionedWorkflowV6")
594606
}
595607

596-
// TestVersionedWorkflowV1 tests that a workflow started on the worker with VersionedWorkflowV3 can be replayed on worker with VersionedWorkflowV2 and VersionedWorkflowV4, but not on VersionedWorkflowV1.
608+
// TestVersionedWorkflowV3 tests that a workflow started on the worker with VersionedWorkflowV3
609+
// can be replayed on worker with VersionedWorkflowV2, VersionedWorkflowV4, VersionedWorkflowV5, VersionedWorkflowV6
610+
// but not on VersionedWorkflowV1
597611
func (ts *IntegrationTestSuite) TestVersionedWorkflowV3() {
598612
execution, err := ts.executeWorkflow("test-versioned-workflow-v3", replaytests.VersionedWorkflowName, nil, "arg")
599613
ts.NoError(err)
600614

601615
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV1, execution), "Expected to fail replaying the replayer with VersionedWorkflowV1")
602616
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV2, execution), "Failed to replay on the replayer with VersionedWorkflowV2")
603617
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV4, execution), "Failed to replay on the replayer with VersionedWorkflowV4")
618+
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV5, execution), "Failed to replay on the replayer with VersionedWorkflowV5")
619+
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV6, execution), "Failed to replay on the replayer with VersionedWorkflowV6")
604620
}
605621

606-
// TestVersionedWorkflowV1 tests that a workflow started on the worker with VersionedWorkflowV4 can be replayed on worker with VersionedWorkflowV2 and VersionedWorkflowV3, but not on VersionedWorkflowV1.
622+
// TestVersionedWorkflowV4 tests that a workflow started on the worker with VersionedWorkflowV4
623+
// can be replayed on worker with VersionedWorkflowV2, VersionedWorkflowV3, VersionedWorkflowV5, VersionedWorkflowV6
624+
// but not on VersionedWorkflowV1
607625
func (ts *IntegrationTestSuite) TestVersionedWorkflowV4() {
608626
execution, err := ts.executeWorkflow("test-versioned-workflow-v4", replaytests.VersionedWorkflowName, nil, "arg")
609627
ts.NoError(err)
610628

611629
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV1, execution), "Expected to fail replaying the replayer with VersionedWorkflowV1")
612630
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV2, execution), "Failed to replay on the replayer with VersionedWorkflowV2")
613631
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Failed to replay on the replayer with VersionedWorkflowV3")
632+
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV5, execution), "Failed to replay on the replayer with VersionedWorkflowV5")
633+
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV6, execution), "Failed to replay on the replayer with VersionedWorkflowV6")
634+
}
635+
636+
// TestVersionedWorkflowV5 tests that a workflow started on the worker with VersionedWorkflowV5
637+
// can be replayed on worker with VersionedWorkflowV2, VersionedWorkflowV3, VersionedWorkflowV4, VersionedWorkflowV6,
638+
// but not on VersionedWorkflowV1.
639+
func (ts *IntegrationTestSuite) TestVersionedWorkflowV5() {
640+
execution, err := ts.executeWorkflow("test-versioned-workflow-v5", replaytests.VersionedWorkflowName, nil, "arg")
641+
ts.NoError(err)
642+
643+
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV1, execution), "Expected to fail replaying the replayer with VersionedWorkflowV1")
644+
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV2, execution), "Failed to replay on the replayer with VersionedWorkflowV2")
645+
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Failed to replay on the replayer with VersionedWorkflowV3")
646+
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV4, execution), "Failed to replay on the replayer with VersionedWorkflowV4")
647+
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV6, execution), "Failed to replay on the replayer with VersionedWorkflowV6")
648+
}
649+
650+
// TestVersionedWorkflowV6 tests that a workflow started on the worker with VersionedWorkflowV6
651+
// can be replayed on worker with VersionedWorkflowV5
652+
// but not on VersionedWorkflowV1, VersionedWorkflowV2, VersionedWorkflowV3, VersionedWorkflowV4.
653+
func (ts *IntegrationTestSuite) TestVersionedWorkflowV6() {
654+
execution, err := ts.executeWorkflow("test-versioned-workflow-v6", replaytests.VersionedWorkflowName, nil, "arg")
655+
ts.NoError(err)
656+
657+
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV1, execution), "Expected to fail replaying the replayer with VersionedWorkflowV1")
658+
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV2, execution), "Expected to fail replaying the replayer with VersionedWorkflowV2")
659+
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Expected to fail replaying the replayer with VersionedWorkflowV3")
660+
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV4, execution), "Expected to fail replaying the replayer with VersionedWorkflowV4")
661+
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV5, execution), "Failed to replay on the replayer with VersionedWorkflowV5")
614662
}
615663

616664
func (ts *IntegrationTestSuite) replayVersionedWorkflow(setupWorkerFunc func(w worker.Registry), execution *workflow.Execution) error {

test/replaytests/replay_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,15 @@ func TestContinueAsNew(t *testing.T) {
219219
// * VersionedWorkflowV4 - which supports Version 1, and can execute BarActivity
220220
// - This workflow is able to replay the history only of VersionedWorkflowBar
221221
//
222+
// * VersionedWorkflowV5 - which supports Version 1 and 2, and can execute BarActivity and BazActivity
223+
// - This workflow is able to replay the history only of VersionedWorkflowBar
224+
// - A first execution of this workflow will should execute BarActivity, because of usage workflow.ExecuteWithMinVersion(),
225+
// but the test can't check it due to Replay
226+
//
227+
// * VersionedWorkflowV6 - which supports Version 1 and 2, and can execute BarActivity and BazActivity
228+
// - This workflow is able to replay the history only of VersionedWorkflowBar
229+
// - A first execution of this workflow will should execute BazActivity, but the test can't check it due to Replay
230+
//
222231
// So the test focusing workflows supports forward and backward compatibility of the workflows
223232
func TestVersionedWorkflows(t *testing.T) {
224233
const (
@@ -285,4 +294,34 @@ func TestVersionedWorkflows(t *testing.T) {
285294
require.NoError(t, err, "Failed to replay VersionedWorkflowBar history")
286295
})
287296
})
297+
298+
t.Run("VersionedWorkflowV5", func(t *testing.T) {
299+
replayer := worker.NewWorkflowReplayer()
300+
SetupWorkerForVersionedWorkflowV5(replayer)
301+
302+
t.Run("fail to replay with VersionedWorkflowFoo", func(t *testing.T) {
303+
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowFooHistoryFile)
304+
require.Error(t, err, "Expected to fail replaying VersionedWorkflowFoo history")
305+
})
306+
307+
t.Run("successfully replayed with VersionedWorkflowBar", func(t *testing.T) {
308+
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowBarHistoryFile)
309+
require.NoError(t, err, "Failed to replay VersionedWorkflowBar history")
310+
})
311+
})
312+
313+
t.Run("VersionedWorkflowV6", func(t *testing.T) {
314+
replayer := worker.NewWorkflowReplayer()
315+
SetupWorkerForVersionedWorkflowV6(replayer)
316+
317+
t.Run("fail to replay with VersionedWorkflowFoo", func(t *testing.T) {
318+
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowFooHistoryFile)
319+
require.Error(t, err, "Expected to fail replaying VersionedWorkflowFoo history")
320+
})
321+
322+
t.Run("successfully replayed with VersionedWorkflowBar", func(t *testing.T) {
323+
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowBarHistoryFile)
324+
require.NoError(t, err, "Failed to replay VersionedWorkflowBar history")
325+
})
326+
})
288327
}

test/replaytests/versioned_workflow.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ const (
1616
// FooActivityName and BarActivityName are the names of the activities used in the workflows.
1717
FooActivityName = "FooActivity"
1818
BarActivityName = "BarActivity"
19+
BazActivityName = "BazActivity"
1920

2021
// VersionedWorkflowName is the name of the versioned workflow.
2122
VersionedWorkflowName = "VersionedWorkflow"
@@ -110,6 +111,52 @@ func VersionedWorkflowV4(ctx workflow.Context, _ string) (string, error) {
110111
return result, nil
111112
}
112113

114+
// VersionedWorkflowV5 is the fifth version of the workflow. It supports Version 1 and 2.
115+
// All workflows started by this version will have the change ID set to Version 1.
116+
// It supports workflow executions started by VersionedWorkflowV3, VersionedWorkflowV4,
117+
// VersionedWorkflowV5, VersionedWorkflowV6
118+
func VersionedWorkflowV5(ctx workflow.Context, _ string) (string, error) {
119+
ctx = workflow.WithActivityOptions(ctx, activityOptions)
120+
121+
var result string
122+
var err error
123+
124+
version := workflow.GetVersion(ctx, TestChangeID, 1, 2, workflow.ExecuteWithVersion(1))
125+
if version == 1 {
126+
err = workflow.ExecuteActivity(ctx, BarActivityName, "data").Get(ctx, &result)
127+
} else {
128+
err = workflow.ExecuteActivity(ctx, BazActivityName, "data").Get(ctx, &result)
129+
}
130+
if err != nil {
131+
return "", err
132+
}
133+
134+
return result, nil
135+
}
136+
137+
// VersionedWorkflowV6 is the sixth version of the workflow. It supports Version 1 and 2.
138+
// All workflows started by this version will have the change ID set to Version 2.
139+
// It supports workflow executions started by VersionedWorkflowV3, VersionedWorkflowV4,
140+
// VersionedWorkflowV5, VersionedWorkflowV6
141+
func VersionedWorkflowV6(ctx workflow.Context, _ string) (string, error) {
142+
ctx = workflow.WithActivityOptions(ctx, activityOptions)
143+
144+
var result string
145+
var err error
146+
147+
version := workflow.GetVersion(ctx, TestChangeID, 1, 2)
148+
if version == 1 {
149+
err = workflow.ExecuteActivity(ctx, BarActivityName, "data").Get(ctx, &result)
150+
} else {
151+
err = workflow.ExecuteActivity(ctx, BazActivityName, "data").Get(ctx, &result)
152+
}
153+
if err != nil {
154+
return "", err
155+
}
156+
157+
return result, nil
158+
}
159+
113160
// FooActivity returns "foo" as a result of the activity execution.
114161
func FooActivity(ctx context.Context, _ string) (string, error) {
115162
return "foo", nil
@@ -120,6 +167,11 @@ func BarActivity(ctx context.Context, _ string) (string, error) {
120167
return "bar", nil
121168
}
122169

170+
// BazActivity returns "baz" as a result of the activity execution.
171+
func BazActivity(ctx context.Context, _ string) (string, error) {
172+
return "baz", nil
173+
}
174+
123175
// SetupWorkerForVersionedWorkflowV1 registers VersionedWorkflowV1 and FooActivity
124176
func SetupWorkerForVersionedWorkflowV1(w worker.Registry) {
125177
w.RegisterWorkflowWithOptions(VersionedWorkflowV1, workflow.RegisterOptions{Name: VersionedWorkflowName})
@@ -145,3 +197,17 @@ func SetupWorkerForVersionedWorkflowV4(w worker.Registry) {
145197
w.RegisterWorkflowWithOptions(VersionedWorkflowV4, workflow.RegisterOptions{Name: VersionedWorkflowName})
146198
w.RegisterActivityWithOptions(BarActivity, activity.RegisterOptions{Name: BarActivityName})
147199
}
200+
201+
// SetupWorkerForVersionedWorkflowV5 registers VersionedWorkflowV6, BarActivity and BazActivity
202+
func SetupWorkerForVersionedWorkflowV5(w worker.Registry) {
203+
w.RegisterWorkflowWithOptions(VersionedWorkflowV5, workflow.RegisterOptions{Name: VersionedWorkflowName})
204+
w.RegisterActivityWithOptions(BarActivity, activity.RegisterOptions{Name: BarActivityName})
205+
w.RegisterActivityWithOptions(BazActivity, activity.RegisterOptions{Name: BazActivityName})
206+
}
207+
208+
// SetupWorkerForVersionedWorkflowV6 registers VersionedWorkflowV6, BarActivity and BazActivity
209+
func SetupWorkerForVersionedWorkflowV6(w worker.Registry) {
210+
w.RegisterWorkflowWithOptions(VersionedWorkflowV6, workflow.RegisterOptions{Name: VersionedWorkflowName})
211+
w.RegisterActivityWithOptions(BarActivity, activity.RegisterOptions{Name: BarActivityName})
212+
w.RegisterActivityWithOptions(BazActivity, activity.RegisterOptions{Name: BazActivityName})
213+
}

0 commit comments

Comments
 (0)