Skip to content

Commit f6f7cd5

Browse files
committed
unified tests
1 parent f613939 commit f6f7cd5

File tree

3 files changed

+151
-76
lines changed

3 files changed

+151
-76
lines changed

test/integration_test.go

Lines changed: 106 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,6 @@ func (ts *IntegrationTestSuite) BeforeTest(suiteName, testName string) {
175175

176176
ts.worker = worker.New(ts.rpcClient.Interface, domainName, ts.taskListName, options)
177177
ts.registerWorkflowsAndActivities(ts.worker)
178-
ts.beforeVersionedWorkflowTest(testName, ts.worker)
179178
ts.Nil(ts.worker.Start())
180179
}
181180

@@ -559,111 +558,151 @@ func (ts *IntegrationTestSuite) TestOverrideSpanContext() {
559558
ts.Equal("some-value", result["mockpfx-baggage-some-key"])
560559
}
561560

562-
// beforeVersionedWorkflowTest registers appropriate versioned workflow and activity to emulate the versioned workflow test.
563-
func (ts *IntegrationTestSuite) beforeVersionedWorkflowTest(testName string, w worker.Worker) {
564-
switch testName {
565-
case "TestVersionedWorkflowV1":
566-
replaytests.SetupWorkerForVersionedWorkflowV1(w)
567-
case "TestVersionedWorkflowV2":
568-
replaytests.SetupWorkerForVersionedWorkflowV2(w)
569-
case "TestVersionedWorkflowV3":
570-
replaytests.SetupWorkerForVersionedWorkflowV3(w)
571-
case "TestVersionedWorkflowV4":
572-
replaytests.SetupWorkerForVersionedWorkflowV4(w)
573-
case "TestVersionedWorkflowV5":
574-
replaytests.SetupWorkerForVersionedWorkflowV5(w)
575-
case "TestVersionedWorkflowV6":
576-
replaytests.SetupWorkerForVersionedWorkflowV6(w)
577-
}
578-
}
579-
580561
// TestVersionedWorkflowV1 tests that a workflow started on the worker with VersionedWorkflowV1
581562
// can be replayed on worker with VersionedWorkflowV2 and VersionedWorkflowV3,
582563
// but not on VersionedWorkflowV4, VersionedWorkflowV5, VersionedWorkflowV6.
583564
func (ts *IntegrationTestSuite) TestVersionedWorkflowV1() {
584-
execution, err := ts.executeWorkflow("test-versioned-workflow-v1", replaytests.VersionedWorkflowName, nil, "arg")
585-
ts.NoError(err)
586-
587-
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV2, execution), "Failed to replay on the replayer with VersionedWorkflowV2")
588-
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Failed to replay on the replayer with VersionedWorkflowV3")
589-
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV4, execution), "Expected to fail replaying the replayer with VersionedWorkflowV4")
590-
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV5, execution), "Expected to fail replaying the replayer with VersionedWorkflowV5")
591-
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV6, execution), "Expected to fail replaying the replayer with VersionedWorkflowV6")
565+
ts.testVersionedWorkflow(testVersionedWorkflowTestCase{
566+
version: replaytests.VersionWorkflowVersionV1,
567+
compatibleVersions: []replaytests.VersionWorkflowVersion{
568+
replaytests.VersionWorkflowVersionV2,
569+
replaytests.VersionWorkflowVersionV3,
570+
},
571+
inCompatibleVersions: []replaytests.VersionWorkflowVersion{
572+
replaytests.VersionWorkflowVersionV4,
573+
replaytests.VersionWorkflowVersionV5,
574+
replaytests.VersionWorkflowVersionV6,
575+
},
576+
})
592577
}
593578

594579
// TestVersionedWorkflowV2 tests that a workflow started on the worker with VersionedWorkflowV2
595580
// can be replayed on worker with VersionedWorkflowV1 and VersionedWorkflowV3,
596581
// but not on VersionedWorkflowV4, VersionedWorkflowV5, VersionedWorkflowV6.
597582
func (ts *IntegrationTestSuite) TestVersionedWorkflowV2() {
598-
execution, err := ts.executeWorkflow("test-versioned-workflow-v2", replaytests.VersionedWorkflowName, nil, "arg")
599-
ts.NoError(err)
600-
601-
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV1, execution), "Failed to replay on the replayer with VersionedWorkflowV1")
602-
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Failed to replay on the replayer with VersionedWorkflowV3")
603-
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV4, execution), "Expected to fail replaying the replayer with VersionedWorkflowV4")
604-
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV5, execution), "Expected to fail replaying the replayer with VersionedWorkflowV5")
605-
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV6, execution), "Expected to fail replaying the replayer with VersionedWorkflowV6")
583+
ts.testVersionedWorkflow(testVersionedWorkflowTestCase{
584+
version: replaytests.VersionWorkflowVersionV2,
585+
compatibleVersions: []replaytests.VersionWorkflowVersion{
586+
replaytests.VersionWorkflowVersionV1,
587+
replaytests.VersionWorkflowVersionV3,
588+
},
589+
inCompatibleVersions: []replaytests.VersionWorkflowVersion{
590+
replaytests.VersionWorkflowVersionV4,
591+
replaytests.VersionWorkflowVersionV5,
592+
replaytests.VersionWorkflowVersionV6,
593+
},
594+
})
606595
}
607596

608597
// TestVersionedWorkflowV3 tests that a workflow started on the worker with VersionedWorkflowV3
609598
// can be replayed on worker with VersionedWorkflowV2, VersionedWorkflowV4, VersionedWorkflowV5, VersionedWorkflowV6
610599
// but not on VersionedWorkflowV1
611600
func (ts *IntegrationTestSuite) TestVersionedWorkflowV3() {
612-
execution, err := ts.executeWorkflow("test-versioned-workflow-v3", replaytests.VersionedWorkflowName, nil, "arg")
613-
ts.NoError(err)
614-
615-
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV1, execution), "Expected to fail replaying the replayer with VersionedWorkflowV1")
616-
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV2, execution), "Failed to replay on the replayer with VersionedWorkflowV2")
617-
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV4, execution), "Failed to replay on the replayer with VersionedWorkflowV4")
618-
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV5, execution), "Failed to replay on the replayer with VersionedWorkflowV5")
619-
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV6, execution), "Failed to replay on the replayer with VersionedWorkflowV6")
601+
ts.testVersionedWorkflow(testVersionedWorkflowTestCase{
602+
version: replaytests.VersionWorkflowVersionV3,
603+
compatibleVersions: []replaytests.VersionWorkflowVersion{
604+
replaytests.VersionWorkflowVersionV2,
605+
replaytests.VersionWorkflowVersionV4,
606+
replaytests.VersionWorkflowVersionV5,
607+
replaytests.VersionWorkflowVersionV6,
608+
},
609+
inCompatibleVersions: []replaytests.VersionWorkflowVersion{
610+
replaytests.VersionWorkflowVersionV1,
611+
},
612+
})
620613
}
621614

622615
// TestVersionedWorkflowV4 tests that a workflow started on the worker with VersionedWorkflowV4
623616
// can be replayed on worker with VersionedWorkflowV2, VersionedWorkflowV3, VersionedWorkflowV5, VersionedWorkflowV6
624617
// but not on VersionedWorkflowV1
625618
func (ts *IntegrationTestSuite) TestVersionedWorkflowV4() {
626-
execution, err := ts.executeWorkflow("test-versioned-workflow-v4", replaytests.VersionedWorkflowName, nil, "arg")
627-
ts.NoError(err)
628-
629-
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV1, execution), "Expected to fail replaying the replayer with VersionedWorkflowV1")
630-
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV2, execution), "Failed to replay on the replayer with VersionedWorkflowV2")
631-
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Failed to replay on the replayer with VersionedWorkflowV3")
632-
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV5, execution), "Failed to replay on the replayer with VersionedWorkflowV5")
633-
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV6, execution), "Failed to replay on the replayer with VersionedWorkflowV6")
619+
ts.testVersionedWorkflow(testVersionedWorkflowTestCase{
620+
version: replaytests.VersionWorkflowVersionV4,
621+
compatibleVersions: []replaytests.VersionWorkflowVersion{
622+
replaytests.VersionWorkflowVersionV2,
623+
replaytests.VersionWorkflowVersionV3,
624+
replaytests.VersionWorkflowVersionV5,
625+
replaytests.VersionWorkflowVersionV6,
626+
},
627+
inCompatibleVersions: []replaytests.VersionWorkflowVersion{
628+
replaytests.VersionWorkflowVersionV1,
629+
},
630+
})
634631
}
635632

636633
// TestVersionedWorkflowV5 tests that a workflow started on the worker with VersionedWorkflowV5
637634
// can be replayed on worker with VersionedWorkflowV2, VersionedWorkflowV3, VersionedWorkflowV4, VersionedWorkflowV6,
638635
// but not on VersionedWorkflowV1.
639636
func (ts *IntegrationTestSuite) TestVersionedWorkflowV5() {
640-
execution, err := ts.executeWorkflow("test-versioned-workflow-v5", replaytests.VersionedWorkflowName, nil, "arg")
641-
ts.NoError(err)
642-
643-
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV1, execution), "Expected to fail replaying the replayer with VersionedWorkflowV1")
644-
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV2, execution), "Failed to replay on the replayer with VersionedWorkflowV2")
645-
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Failed to replay on the replayer with VersionedWorkflowV3")
646-
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV4, execution), "Failed to replay on the replayer with VersionedWorkflowV4")
647-
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV6, execution), "Failed to replay on the replayer with VersionedWorkflowV6")
637+
ts.testVersionedWorkflow(testVersionedWorkflowTestCase{
638+
version: replaytests.VersionWorkflowVersionV5,
639+
compatibleVersions: []replaytests.VersionWorkflowVersion{
640+
replaytests.VersionWorkflowVersionV2,
641+
replaytests.VersionWorkflowVersionV3,
642+
replaytests.VersionWorkflowVersionV4,
643+
replaytests.VersionWorkflowVersionV6,
644+
},
645+
inCompatibleVersions: []replaytests.VersionWorkflowVersion{
646+
replaytests.VersionWorkflowVersionV1,
647+
},
648+
})
648649
}
649650

650651
// TestVersionedWorkflowV6 tests that a workflow started on the worker with VersionedWorkflowV6
651652
// can be replayed on worker with VersionedWorkflowV5
652653
// but not on VersionedWorkflowV1, VersionedWorkflowV2, VersionedWorkflowV3, VersionedWorkflowV4.
653654
func (ts *IntegrationTestSuite) TestVersionedWorkflowV6() {
654-
execution, err := ts.executeWorkflow("test-versioned-workflow-v6", replaytests.VersionedWorkflowName, nil, "arg")
655+
ts.testVersionedWorkflow(testVersionedWorkflowTestCase{
656+
version: replaytests.VersionWorkflowVersionV6,
657+
compatibleVersions: []replaytests.VersionWorkflowVersion{
658+
replaytests.VersionWorkflowVersionV5,
659+
},
660+
inCompatibleVersions: []replaytests.VersionWorkflowVersion{
661+
replaytests.VersionWorkflowVersionV1,
662+
replaytests.VersionWorkflowVersionV2,
663+
replaytests.VersionWorkflowVersionV3,
664+
replaytests.VersionWorkflowVersionV4,
665+
},
666+
})
667+
}
668+
669+
type testVersionedWorkflowTestCase struct {
670+
version replaytests.VersionWorkflowVersion
671+
compatibleVersions []replaytests.VersionWorkflowVersion
672+
inCompatibleVersions []replaytests.VersionWorkflowVersion
673+
}
674+
675+
// testVersionedWorkflow tests that a workflow started on the worker with version
676+
// can be replayed on worker with compatibleVersions
677+
// but not on worker with inCompatibleVersions
678+
func (ts *IntegrationTestSuite) testVersionedWorkflow(c testVersionedWorkflowTestCase) {
679+
replaytests.SetupWorkerForVersionedWorkflow(c.version, ts.worker)
680+
wfId := fmt.Sprintf("test-versioned-workflow-v%d", c.version)
681+
execution, err := ts.executeWorkflow(wfId, replaytests.VersionedWorkflowName, nil, "arg")
655682
ts.NoError(err)
656683

657-
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV1, execution), "Expected to fail replaying the replayer with VersionedWorkflowV1")
658-
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV2, execution), "Expected to fail replaying the replayer with VersionedWorkflowV2")
659-
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Expected to fail replaying the replayer with VersionedWorkflowV3")
660-
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV4, execution), "Expected to fail replaying the replayer with VersionedWorkflowV4")
661-
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV5, execution), "Failed to replay on the replayer with VersionedWorkflowV5")
684+
c.compatibleVersions = append(c.compatibleVersions, c.version)
685+
686+
ts.Require().Equalf(len(c.compatibleVersions)+len(c.inCompatibleVersions), int(replaytests.MaxVersionWorkflowVersion),
687+
"Test case should cover all versions, but got %d compatible (one of them the testing version itself) and %d incompatible versions, that not equal to %d",
688+
len(c.compatibleVersions),
689+
len(c.inCompatibleVersions),
690+
replaytests.MaxVersionWorkflowVersion)
691+
692+
for _, replayedVersion := range c.compatibleVersions {
693+
err := ts.replayVersionedWorkflow(replayedVersion, execution)
694+
ts.NoErrorf(err, "Failed to replay on the replayer with VersionedWorkflowV%d", replayedVersion)
695+
}
696+
697+
for _, replayedVersion := range c.inCompatibleVersions {
698+
err := ts.replayVersionedWorkflow(replayedVersion, execution)
699+
ts.Errorf(err, "Expected to fail replaying the replayer with VersionedWorkflowV%d", replayedVersion)
700+
}
662701
}
663702

664-
func (ts *IntegrationTestSuite) replayVersionedWorkflow(setupWorkerFunc func(w worker.Registry), execution *workflow.Execution) error {
703+
func (ts *IntegrationTestSuite) replayVersionedWorkflow(version replaytests.VersionWorkflowVersion, execution *workflow.Execution) error {
665704
replayer := worker.NewWorkflowReplayer()
666-
setupWorkerFunc(replayer)
705+
replaytests.SetupWorkerForVersionedWorkflow(version, replayer)
667706
return replayer.ReplayWorkflowExecution(context.Background(), ts.rpcClient, zaptest.NewLogger(ts.T()), domainName, *execution)
668707
}
669708

test/replaytests/replay_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,12 @@ import (
2424
"strings"
2525
"testing"
2626

27-
"go.uber.org/cadence/activity"
28-
2927
"github.com/stretchr/testify/assert"
30-
3128
"github.com/stretchr/testify/require"
32-
"go.uber.org/zap/zaptest"
33-
29+
"go.uber.org/cadence/activity"
3430
"go.uber.org/cadence/worker"
3531
"go.uber.org/cadence/workflow"
32+
"go.uber.org/zap/zaptest"
3633
)
3734

3835
func TestReplayWorkflowHistoryFromFile(t *testing.T) {
@@ -202,7 +199,7 @@ func TestContinueAsNew(t *testing.T) {
202199
// * VersionedWorkflowBar - which is the second version of the workflow which version of change id is 1
203200
// - This workflow is supposed to execute BarActivity
204201
//
205-
// There are 4 versions of the workflow:
202+
// There are 6 versions of the workflow:
206203
//
207204
// * VersionedWorkflowV1 - which supports only DefaultVersion and executes FooActivity
208205
// - This workflow is able to replay the history of only of VersionedWorkflowFoo
@@ -247,6 +244,7 @@ func TestVersionedWorkflows(t *testing.T) {
247244
t.Run("fail to replay with VersionedWorkflowBar", func(t *testing.T) {
248245
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowBarHistoryFile)
249246
require.Error(t, err, "Expected to fail replaying VersionedWorkflowBar history")
247+
assert.ErrorContains(t, err, "nondeterministic workflow")
250248
})
251249
})
252250

@@ -287,6 +285,7 @@ func TestVersionedWorkflows(t *testing.T) {
287285
t.Run("fail to replay with VersionedWorkflowFoo", func(t *testing.T) {
288286
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowFooHistoryFile)
289287
require.Error(t, err, "Expected to fail replaying VersionedWorkflowFoo history")
288+
assert.ErrorContains(t, err, "WORKFLOW_WORKER_UNHANDLED_FAILURE")
290289
})
291290

292291
t.Run("successfully replayed with VersionedWorkflowBar", func(t *testing.T) {
@@ -302,6 +301,7 @@ func TestVersionedWorkflows(t *testing.T) {
302301
t.Run("fail to replay with VersionedWorkflowFoo", func(t *testing.T) {
303302
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowFooHistoryFile)
304303
require.Error(t, err, "Expected to fail replaying VersionedWorkflowFoo history")
304+
assert.ErrorContains(t, err, "WORKFLOW_WORKER_UNHANDLED_FAILURE")
305305
})
306306

307307
t.Run("successfully replayed with VersionedWorkflowBar", func(t *testing.T) {
@@ -317,6 +317,7 @@ func TestVersionedWorkflows(t *testing.T) {
317317
t.Run("fail to replay with VersionedWorkflowFoo", func(t *testing.T) {
318318
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowFooHistoryFile)
319319
require.Error(t, err, "Expected to fail replaying VersionedWorkflowFoo history")
320+
assert.ErrorContains(t, err, "WORKFLOW_WORKER_UNHANDLED_FAILURE")
320321
})
321322

322323
t.Run("successfully replayed with VersionedWorkflowBar", func(t *testing.T) {

test/replaytests/versioned_workflow.go

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,22 @@ var activityOptions = workflow.ActivityOptions{
2828
HeartbeatTimeout: time.Second * 20,
2929
}
3030

31+
// VersionWorkflowVersion is an enum representing the version of the VersionedWorkflow
32+
type VersionWorkflowVersion int
33+
34+
const (
35+
VersionWorkflowVersionV1 VersionWorkflowVersion = iota + 1
36+
VersionWorkflowVersionV2
37+
VersionWorkflowVersionV3
38+
VersionWorkflowVersionV4
39+
VersionWorkflowVersionV5
40+
VersionWorkflowVersionV6
41+
)
42+
43+
// MaxVersionWorkflowVersion is the maximum version of the VersionedWorkflow.
44+
// Update this constant when adding new versions to the workflow.
45+
const MaxVersionWorkflowVersion = VersionWorkflowVersionV6
46+
3147
// VersionedWorkflowV1 is the first version of the workflow, and it supports only DefaultVersion.
3248
// It supports workflow executions started by this version VersionedWorkflowV1
3349
// and VersionedWorkflowV2, as all of them will have the change ID set to DefaultVersion.
@@ -158,20 +174,39 @@ func VersionedWorkflowV6(ctx workflow.Context, _ string) (string, error) {
158174
}
159175

160176
// FooActivity returns "foo" as a result of the activity execution.
161-
func FooActivity(ctx context.Context, _ string) (string, error) {
177+
func FooActivity(_ context.Context, _ string) (string, error) {
162178
return "foo", nil
163179
}
164180

165181
// BarActivity returns "bar" as a result of the activity execution.
166-
func BarActivity(ctx context.Context, _ string) (string, error) {
182+
func BarActivity(_ context.Context, _ string) (string, error) {
167183
return "bar", nil
168184
}
169185

170186
// BazActivity returns "baz" as a result of the activity execution.
171-
func BazActivity(ctx context.Context, _ string) (string, error) {
187+
func BazActivity(_ context.Context, _ string) (string, error) {
172188
return "baz", nil
173189
}
174190

191+
func SetupWorkerForVersionedWorkflow(version VersionWorkflowVersion, w worker.Registry) {
192+
switch version {
193+
case VersionWorkflowVersionV1:
194+
SetupWorkerForVersionedWorkflowV1(w)
195+
case VersionWorkflowVersionV2:
196+
SetupWorkerForVersionedWorkflowV2(w)
197+
case VersionWorkflowVersionV3:
198+
SetupWorkerForVersionedWorkflowV3(w)
199+
case VersionWorkflowVersionV4:
200+
SetupWorkerForVersionedWorkflowV4(w)
201+
case VersionWorkflowVersionV5:
202+
SetupWorkerForVersionedWorkflowV5(w)
203+
case VersionWorkflowVersionV6:
204+
SetupWorkerForVersionedWorkflowV6(w)
205+
default:
206+
panic("unsupported version for versioned workflow")
207+
}
208+
}
209+
175210
// SetupWorkerForVersionedWorkflowV1 registers VersionedWorkflowV1 and FooActivity
176211
func SetupWorkerForVersionedWorkflowV1(w worker.Registry) {
177212
w.RegisterWorkflowWithOptions(VersionedWorkflowV1, workflow.RegisterOptions{Name: VersionedWorkflowName})

0 commit comments

Comments
 (0)