Skip to content

Commit b636a7b

Browse files
committed
add integration test for versioned workflow
1 parent 8eb8258 commit b636a7b

File tree

3 files changed

+145
-62
lines changed

3 files changed

+145
-62
lines changed

test/integration_test.go

Lines changed: 94 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"context"
2626
"errors"
2727
"fmt"
28+
"go.uber.org/cadence/test/replaytests"
2829
"net"
2930
"strings"
3031
"sync"
@@ -173,6 +174,7 @@ func (ts *IntegrationTestSuite) BeforeTest(suiteName, testName string) {
173174

174175
ts.worker = worker.New(ts.rpcClient.Interface, domainName, ts.taskListName, options)
175176
ts.registerWorkflowsAndActivities(ts.worker)
177+
ts.beforeVersionedWorkflowTest(testName, ts.worker)
176178
ts.Nil(ts.worker.Start())
177179
}
178180

@@ -182,7 +184,7 @@ func (ts *IntegrationTestSuite) TearDownTest() {
182184

183185
func (ts *IntegrationTestSuite) TestBasic() {
184186
var expected []string
185-
err := ts.executeWorkflow("test-basic", ts.workflows.Basic, &expected)
187+
_, err := ts.executeWorkflow("test-basic", ts.workflows.Basic, &expected)
186188
ts.NoError(err)
187189
ts.EqualValues(expected, ts.activities.invoked())
188190
ts.Equal([]string{"ExecuteWorkflow begin", "ExecuteActivity", "ExecuteActivity", "ExecuteWorkflow end"},
@@ -191,27 +193,27 @@ func (ts *IntegrationTestSuite) TestBasic() {
191193

192194
func (ts *IntegrationTestSuite) TestActivityRetryOnError() {
193195
var expected []string
194-
err := ts.executeWorkflow("test-activity-retry-on-error", ts.workflows.ActivityRetryOnError, &expected)
196+
_, err := ts.executeWorkflow("test-activity-retry-on-error", ts.workflows.ActivityRetryOnError, &expected)
195197
ts.NoError(err)
196198
ts.EqualValues(expected, ts.activities.invoked())
197199
}
198200

199201
func (ts *IntegrationTestSuite) TestActivityRetryOnTimeoutStableError() {
200202
var expected []string
201-
err := ts.executeWorkflow("test-activity-retry-on-timeout-stable-error", ts.workflows.RetryTimeoutStableErrorWorkflow, &expected)
203+
_, err := ts.executeWorkflow("test-activity-retry-on-timeout-stable-error", ts.workflows.RetryTimeoutStableErrorWorkflow, &expected)
202204
ts.Nil(err)
203205
}
204206

205207
func (ts *IntegrationTestSuite) TestActivityRetryOptionsChange() {
206208
var expected []string
207-
err := ts.executeWorkflow("test-activity-retry-options-change", ts.workflows.ActivityRetryOptionsChange, &expected)
209+
_, err := ts.executeWorkflow("test-activity-retry-options-change", ts.workflows.ActivityRetryOptionsChange, &expected)
208210
ts.NoError(err)
209211
ts.EqualValues(expected, ts.activities.invoked())
210212
}
211213

212214
func (ts *IntegrationTestSuite) TestActivityRetryOnStartToCloseTimeout() {
213215
var expected []string
214-
err := ts.executeWorkflow(
216+
_, err := ts.executeWorkflow(
215217
"test-activity-retry-on-start2close-timeout",
216218
ts.workflows.ActivityRetryOnTimeout,
217219
&expected,
@@ -223,21 +225,21 @@ func (ts *IntegrationTestSuite) TestActivityRetryOnStartToCloseTimeout() {
223225

224226
func (ts *IntegrationTestSuite) TestActivityRetryOnHBTimeout() {
225227
var expected []string
226-
err := ts.executeWorkflow("test-activity-retry-on-hbtimeout", ts.workflows.ActivityRetryOnHBTimeout, &expected)
228+
_, err := ts.executeWorkflow("test-activity-retry-on-hbtimeout", ts.workflows.ActivityRetryOnHBTimeout, &expected)
227229
ts.NoError(err)
228230
ts.EqualValues(expected, ts.activities.invoked())
229231
}
230232

231233
func (ts *IntegrationTestSuite) TestActivityAutoHeartbeat() {
232234
var expected []string
233-
err := ts.executeWorkflow("test-activity-auto-heartbeat", ts.workflows.ActivityAutoHeartbeat, &expected)
235+
_, err := ts.executeWorkflow("test-activity-auto-heartbeat", ts.workflows.ActivityAutoHeartbeat, &expected)
234236
ts.NoError(err)
235237
ts.EqualValues(expected, ts.activities.invoked())
236238
}
237239

238240
func (ts *IntegrationTestSuite) TestContinueAsNew() {
239241
var result int
240-
err := ts.executeWorkflow("test-continueasnew", ts.workflows.ContinueAsNew, &result, 4, ts.taskListName)
242+
_, err := ts.executeWorkflow("test-continueasnew", ts.workflows.ContinueAsNew, &result, 4, ts.taskListName)
241243
ts.NoError(err)
242244
ts.Equal(999, result)
243245
}
@@ -251,7 +253,7 @@ func (ts *IntegrationTestSuite) TestContinueAsNewCarryOver() {
251253
startOptions.SearchAttributes = map[string]interface{}{
252254
"CustomKeywordField": "searchAttr",
253255
}
254-
err := ts.executeWorkflowWithOption(startOptions, ts.workflows.ContinueAsNewWithOptions, &result, 4, ts.taskListName)
256+
_, err := ts.executeWorkflowWithOption(startOptions, ts.workflows.ContinueAsNewWithOptions, &result, 4, ts.taskListName)
255257
ts.NoError(err)
256258
ts.Equal("memoVal,searchAttr", result)
257259
}
@@ -320,7 +322,7 @@ func (ts *IntegrationTestSuite) TestConsistentQuery() {
320322

321323
func (ts *IntegrationTestSuite) TestWorkflowIDReuseRejectDuplicate() {
322324
var result string
323-
err := ts.executeWorkflow(
325+
_, err := ts.executeWorkflow(
324326
"test-workflowidreuse-reject-duplicate",
325327
ts.workflows.IDReusePolicy,
326328
&result,
@@ -338,7 +340,7 @@ func (ts *IntegrationTestSuite) TestWorkflowIDReuseRejectDuplicate() {
338340

339341
func (ts *IntegrationTestSuite) TestWorkflowIDReuseAllowDuplicateFailedOnly1() {
340342
var result string
341-
err := ts.executeWorkflow(
343+
_, err := ts.executeWorkflow(
342344
"test-workflowidreuse-reject-duplicate-failed-only1",
343345
ts.workflows.IDReusePolicy,
344346
&result,
@@ -356,7 +358,7 @@ func (ts *IntegrationTestSuite) TestWorkflowIDReuseAllowDuplicateFailedOnly1() {
356358

357359
func (ts *IntegrationTestSuite) TestWorkflowIDReuseAllowDuplicateFailedOnly2() {
358360
var result string
359-
err := ts.executeWorkflow(
361+
_, err := ts.executeWorkflow(
360362
"test-workflowidreuse-reject-duplicate-failed-only2",
361363
ts.workflows.IDReusePolicy,
362364
&result,
@@ -371,7 +373,7 @@ func (ts *IntegrationTestSuite) TestWorkflowIDReuseAllowDuplicateFailedOnly2() {
371373

372374
func (ts *IntegrationTestSuite) TestWorkflowIDReuseAllowDuplicate() {
373375
var result string
374-
err := ts.executeWorkflow(
376+
_, err := ts.executeWorkflow(
375377
"test-workflowidreuse-allow-duplicate",
376378
ts.workflows.IDReusePolicy,
377379
&result,
@@ -387,7 +389,7 @@ func (ts *IntegrationTestSuite) TestWorkflowIDReuseAllowDuplicate() {
387389
func (ts *IntegrationTestSuite) TestWorkflowIDReuseErrorViaStartWorkflow() {
388390
duplicatedWID := "test-workflowidreuse-duplicate-start-error"
389391
// setup: run any workflow once to consume the ID
390-
err := ts.executeWorkflow(
392+
_, err := ts.executeWorkflow(
391393
duplicatedWID,
392394
ts.workflows.SimplestWorkflow,
393395
nil,
@@ -407,22 +409,22 @@ func (ts *IntegrationTestSuite) TestWorkflowIDReuseErrorViaStartWorkflow() {
407409
}
408410

409411
func (ts *IntegrationTestSuite) TestChildWFRetryOnError() {
410-
err := ts.executeWorkflow("test-childwf-retry-on-error", ts.workflows.ChildWorkflowRetryOnError, nil)
412+
_, err := ts.executeWorkflow("test-childwf-retry-on-error", ts.workflows.ChildWorkflowRetryOnError, nil)
411413
ts.Error(err)
412414
ts.Truef(client.IsWorkflowError(err), "child error should be a workflow error: %#v", err)
413415
ts.EqualValues([]string{"toUpper", "toUpper", "toUpper"}, ts.activities.invoked())
414416
}
415417

416418
func (ts *IntegrationTestSuite) TestChildWFRetryOnTimeout() {
417-
err := ts.executeWorkflow("test-childwf-retry-on-timeout", ts.workflows.ChildWorkflowRetryOnTimeout, nil)
419+
_, err := ts.executeWorkflow("test-childwf-retry-on-timeout", ts.workflows.ChildWorkflowRetryOnTimeout, nil)
418420
ts.Error(err)
419421
ts.Truef(client.IsWorkflowError(err), "child-timeout error should be a workflow error: %#v", err)
420422
ts.EqualValues([]string{"sleep", "sleep", "sleep"}, ts.activities.invoked())
421423
}
422424

423425
func (ts *IntegrationTestSuite) TestChildWFWithMemoAndSearchAttributes() {
424426
var result string
425-
err := ts.executeWorkflow("test-childwf-success-memo-searchAttr", ts.workflows.ChildWorkflowSuccess, &result)
427+
_, err := ts.executeWorkflow("test-childwf-success-memo-searchAttr", ts.workflows.ChildWorkflowSuccess, &result)
426428
ts.NoError(err)
427429
ts.EqualValues([]string{"getMemoAndSearchAttr"}, ts.activities.invoked())
428430
ts.Equal("memoVal, searchAttrVal", result)
@@ -432,7 +434,7 @@ func (ts *IntegrationTestSuite) TestChildWFWithMemoAndSearchAttributes() {
432434

433435
func (ts *IntegrationTestSuite) TestChildWFWithParentClosePolicyTerminate() {
434436
var childWorkflowID string
435-
err := ts.executeWorkflow("test-childwf-parent-close-policy", ts.workflows.ChildWorkflowSuccessWithParentClosePolicyTerminate, &childWorkflowID)
437+
_, err := ts.executeWorkflow("test-childwf-parent-close-policy", ts.workflows.ChildWorkflowSuccessWithParentClosePolicyTerminate, &childWorkflowID)
436438
ts.NoError(err)
437439
// Need to wait for child workflow to finish as well otherwise test becomes flaky
438440
ts.waitForWorkflowFinish(childWorkflowID, "")
@@ -443,7 +445,7 @@ func (ts *IntegrationTestSuite) TestChildWFWithParentClosePolicyTerminate() {
443445

444446
func (ts *IntegrationTestSuite) TestChildWFWithParentClosePolicyAbandon() {
445447
var childWorkflowID string
446-
err := ts.executeWorkflow("test-childwf-parent-close-policy", ts.workflows.ChildWorkflowSuccessWithParentClosePolicyAbandon, &childWorkflowID)
448+
_, err := ts.executeWorkflow("test-childwf-parent-close-policy", ts.workflows.ChildWorkflowSuccessWithParentClosePolicyAbandon, &childWorkflowID)
447449
ts.NoError(err)
448450
resp, err := ts.libClient.DescribeWorkflowExecution(context.Background(), childWorkflowID, "")
449451
ts.NoError(err)
@@ -452,7 +454,7 @@ func (ts *IntegrationTestSuite) TestChildWFWithParentClosePolicyAbandon() {
452454

453455
func (ts *IntegrationTestSuite) TestChildWFCancel() {
454456
var childWorkflowID string
455-
err := ts.executeWorkflow("test-childwf-cancel", ts.workflows.ChildWorkflowCancel, &childWorkflowID)
457+
_, err := ts.executeWorkflow("test-childwf-cancel", ts.workflows.ChildWorkflowCancel, &childWorkflowID)
456458
ts.NoError(err)
457459
resp, err := ts.libClient.DescribeWorkflowExecution(context.Background(), childWorkflowID, "")
458460
ts.NoError(err)
@@ -468,14 +470,14 @@ func (ts *IntegrationTestSuite) TestActivityCancelUsingReplay() {
468470

469471
func (ts *IntegrationTestSuite) TestActivityCancelRepro() {
470472
var expected []string
471-
err := ts.executeWorkflow("test-activity-cancel-sm", ts.workflows.ActivityCancelRepro, &expected)
473+
_, err := ts.executeWorkflow("test-activity-cancel-sm", ts.workflows.ActivityCancelRepro, &expected)
472474
ts.NoError(err)
473475
ts.EqualValues(expected, ts.activities.invoked())
474476
}
475477

476478
func (ts *IntegrationTestSuite) TestWorkflowWithLocalActivityCtxPropagation() {
477479
var expected string
478-
err := ts.executeWorkflow("test-wf-local-activity-ctx-prop", ts.workflows.WorkflowWithLocalActivityCtxPropagation, &expected)
480+
_, err := ts.executeWorkflow("test-wf-local-activity-ctx-prop", ts.workflows.WorkflowWithLocalActivityCtxPropagation, &expected)
479481
ts.NoError(err)
480482
ts.EqualValues(expected, "test-data-in-contexttest-data-in-context")
481483
}
@@ -497,12 +499,12 @@ func (ts *IntegrationTestSuite) TestLargeQueryResultError() {
497499
}
498500

499501
func (ts *IntegrationTestSuite) TestInspectActivityInfo() {
500-
err := ts.executeWorkflow("test-activity-info", ts.workflows.InspectActivityInfo, nil)
502+
_, err := ts.executeWorkflow("test-activity-info", ts.workflows.InspectActivityInfo, nil)
501503
ts.Nil(err)
502504
}
503505

504506
func (ts *IntegrationTestSuite) TestInspectLocalActivityInfo() {
505-
err := ts.executeWorkflow("test-local-activity-info", ts.workflows.InspectLocalActivityInfo, nil)
507+
_, err := ts.executeWorkflow("test-local-activity-info", ts.workflows.InspectLocalActivityInfo, nil)
506508
ts.Nil(err)
507509
}
508510

@@ -523,7 +525,7 @@ func (ts *IntegrationTestSuite) TestDomainUpdate() {
523525
}
524526

525527
func (ts *IntegrationTestSuite) TestNonDeterministicWorkflowFailPolicy() {
526-
err := ts.executeWorkflow("test-nondeterminism-failpolicy", ts.workflows.NonDeterminismSimulatorWorkflow, nil)
528+
_, err := ts.executeWorkflow("test-nondeterminism-failpolicy", ts.workflows.NonDeterminismSimulatorWorkflow, nil)
527529
var customErr *internal.CustomError
528530
ok := errors.As(err, &customErr)
529531
ts.Truef(ok, "expected CustomError but got %T", err)
@@ -551,11 +553,71 @@ func (ts *IntegrationTestSuite) TestNonDeterministicWorkflowQuery() {
551553

552554
func (ts *IntegrationTestSuite) TestOverrideSpanContext() {
553555
var result map[string]string
554-
err := ts.executeWorkflow("test-override-span-context", ts.workflows.OverrideSpanContext, &result)
556+
_, err := ts.executeWorkflow("test-override-span-context", ts.workflows.OverrideSpanContext, &result)
555557
ts.NoError(err)
556558
ts.Equal("some-value", result["mockpfx-baggage-some-key"])
557559
}
558560

561+
// beforeVersionedWorkflowTest registers appropriate versioned workflow and activity to emulate the versioned workflow test.
562+
func (ts *IntegrationTestSuite) beforeVersionedWorkflowTest(testName string, w worker.Worker) {
563+
switch testName {
564+
case "TestVersionedWorkflowV1":
565+
replaytests.SetupWorkerForVersionedWorkflowV1(w)
566+
case "TestVersionedWorkflowV2":
567+
replaytests.SetupWorkerForVersionedWorkflowV2(w)
568+
case "TestVersionedWorkflowV3":
569+
replaytests.SetupWorkerForVersionedWorkflowV3(w)
570+
case "TestVersionedWorkflowV4":
571+
replaytests.SetupWorkerForVersionedWorkflowV4(w)
572+
}
573+
}
574+
575+
// TestVersionedWorkflowV1 tests that a workflow started on the worker with VersionedWorkflowV1 can be replayed on worker with VersionedWorkflowV2 and VersionedWorkflowV3, but not on VersionedWorkflowV4.
576+
func (ts *IntegrationTestSuite) TestVersionedWorkflowV1() {
577+
execution, err := ts.executeWorkflow("test-versioned-workflow-v1", replaytests.VersionedWorkflowName, nil, "arg")
578+
ts.NoError(err)
579+
580+
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV2, execution), "Failed to replay on the replayer with VersionedWorkflowV2")
581+
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Failed to replay on the replayer withVersionedWorkflowV3")
582+
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV4, execution), "Expected to fail replaying the replayer with VersionedWorkflowV4")
583+
}
584+
585+
// TestVersionedWorkflowV1 tests that a workflow started on the worker with VersionedWorkflowV2 can be replayed on worker with VersionedWorkflowV1 and VersionedWorkflowV3, but not on VersionedWorkflowV4.
586+
func (ts *IntegrationTestSuite) TestVersionedWorkflowV2() {
587+
execution, err := ts.executeWorkflow("test-versioned-workflow-v2", replaytests.VersionedWorkflowName, nil, "arg")
588+
ts.NoError(err)
589+
590+
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV1, execution), "Failed to replay on the replayer with VersionedWorkflowV1")
591+
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Failed to replay on the replayer withVersionedWorkflowV3")
592+
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV4, execution), "Expected to fail replaying the replayer with VersionedWorkflowV4")
593+
}
594+
595+
// TestVersionedWorkflowV1 tests that a workflow started on the worker with VersionedWorkflowV3 can be replayed on worker with VersionedWorkflowV2 and VersionedWorkflowV4, but not on VersionedWorkflowV1.
596+
func (ts *IntegrationTestSuite) TestVersionedWorkflowV3() {
597+
execution, err := ts.executeWorkflow("test-versioned-workflow-v3", replaytests.VersionedWorkflowName, nil, "arg")
598+
ts.NoError(err)
599+
600+
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV1, execution), "Expected to fail replaying the replayer with VersionedWorkflowV1")
601+
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV2, execution), "Failed to replay on the replayer with VersionedWorkflowV2")
602+
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV4, execution), "Failed to replay on the replayer with VersionedWorkflowV4")
603+
}
604+
605+
// TestVersionedWorkflowV1 tests that a workflow started on the worker with VersionedWorkflowV4 can be replayed on worker with VersionedWorkflowV2 and VersionedWorkflowV3, but not on VersionedWorkflowV1.
606+
func (ts *IntegrationTestSuite) TestVersionedWorkflowV4() {
607+
execution, err := ts.executeWorkflow("test-versioned-workflow-v4", replaytests.VersionedWorkflowName, nil, "arg")
608+
ts.NoError(err)
609+
610+
ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV1, execution), "Expected to fail replaying the replayer with VersionedWorkflowV1")
611+
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV2, execution), "Failed to replay on the replayer with VersionedWorkflowV2")
612+
ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Failed to replay on the replayer with VersionedWorkflowV3")
613+
}
614+
615+
func (ts *IntegrationTestSuite) replayVersionedWorkflow(setupWorkerFunc func(w worker.Registry), execution *workflow.Execution) error {
616+
replayer := worker.NewWorkflowReplayer()
617+
setupWorkerFunc(replayer)
618+
return replayer.ReplayWorkflowExecution(context.Background(), ts.rpcClient, zaptest.NewLogger(ts.T()), domainName, *execution)
619+
}
620+
559621
func (ts *IntegrationTestSuite) registerDomain() {
560622
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
561623
defer cancel()
@@ -574,12 +636,12 @@ func (ts *IntegrationTestSuite) registerDomain() {
574636
time.Sleep(domainCacheRefreshInterval) // wait for domain cache refresh on cadence-server
575637
// bellow is used to guarantee domain is ready
576638
var dummyReturn string
577-
err = ts.executeWorkflow("test-domain-exist", ts.workflows.SimplestWorkflow, &dummyReturn)
639+
_, err = ts.executeWorkflow("test-domain-exist", ts.workflows.SimplestWorkflow, &dummyReturn)
578640
numOfRetry := 20
579641
for err != nil && numOfRetry >= 0 {
580642
if _, ok := err.(*shared.EntityNotExistsError); ok {
581643
time.Sleep(domainCacheRefreshInterval)
582-
err = ts.executeWorkflow("test-domain-exist", ts.workflows.SimplestWorkflow, &dummyReturn)
644+
_, err = ts.executeWorkflow("test-domain-exist", ts.workflows.SimplestWorkflow, &dummyReturn)
583645
} else {
584646
break
585647
}
@@ -588,21 +650,19 @@ func (ts *IntegrationTestSuite) registerDomain() {
588650
}
589651

590652
// executeWorkflow executes a given workflow and waits for the result
591-
func (ts *IntegrationTestSuite) executeWorkflow(
592-
wfID string, wfFunc interface{}, retValPtr interface{}, args ...interface{}) error {
653+
func (ts *IntegrationTestSuite) executeWorkflow(wfID string, wfFunc interface{}, retValPtr interface{}, args ...interface{}) (*workflow.Execution, error) {
593654
options := ts.startWorkflowOptions(wfID)
594655
return ts.executeWorkflowWithOption(options, wfFunc, retValPtr, args...)
595656
}
596657

597-
func (ts *IntegrationTestSuite) executeWorkflowWithOption(
598-
options client.StartWorkflowOptions, wfFunc interface{}, retValPtr interface{}, args ...interface{}) error {
658+
func (ts *IntegrationTestSuite) executeWorkflowWithOption(options client.StartWorkflowOptions, wfFunc interface{}, retValPtr interface{}, args ...interface{}) (*workflow.Execution, error) {
599659
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
600660
defer cancel()
601661
span := ts.workflows.tracer.StartSpan("test-workflow")
602662
defer span.Finish()
603663
execution, err := ts.libClient.StartWorkflow(ctx, options, wfFunc, args...)
604664
if err != nil {
605-
return err
665+
return nil, err
606666
}
607667
run := ts.libClient.GetWorkflow(ctx, execution.ID, execution.RunID)
608668
err = run.Get(ctx, retValPtr)
@@ -617,7 +677,7 @@ func (ts *IntegrationTestSuite) executeWorkflowWithOption(
617677
logger.Info(event.String())
618678
}
619679
}
620-
return err
680+
return execution, err
621681
}
622682

623683
func (ts *IntegrationTestSuite) startWorkflowOptions(wfID string) client.StartWorkflowOptions {

0 commit comments

Comments
 (0)