Skip to content

Commit 2050121

Browse files
authored
Test active-active behavior with concurrent SignalWithStart requests (#7270)
<!-- Describe what has changed in this PR --> **What changed?** Tests conflict resolution when the same workflowID is used to create workflows in two different clusters with an Active-Active setup in the domain. <!-- Tell your future self why have you made these changes --> **Why?** Cadence does not support two workflows existing with the same workflow ID. When a domain is configured active-active it can be possible for a user to make two simultaneous requests to two different clusters with the same workflow ID. When this happens, both workflows will run temporarily until the clusters have replicated their data to the other, at which point the conflict will have to be resolved. This test is a follow up to #7265, testing the scenario when SignalWithStart is used (rather than StartWorkflow). <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Locally + against dev canary domains. <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** N/A <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** N/A <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** N/A
1 parent b47a630 commit 2050121

7 files changed

+324
-33
lines changed

.github/workflows/replication-simulation.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ jobs:
1515
scenario:
1616
- activeactive
1717
- activeactive_same_wfid
18+
- activeactive_same_wfid_signalwithstart
19+
- activeactive_same_wfid_signalwithstart_delayed
1820
- activeactive_cron
1921
- activeactive_regional_failover
2022
- activeactive_regional_failover_start_same_wfid
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# This file is used as dynamicconfig override for "activeactive_same_wfid_signalwithstart" replication simulation scenario
2+
# configured via simulation/replication/testdata/replication_simulation_activeactive_same_wfid_signalwithstart.yaml
3+
system.writeVisibilityStoreName:
4+
- value: "db"
5+
system.readVisibilityStoreName:
6+
- value: "db"
7+
history.replicatorTaskBatchSize:
8+
- value: 25
9+
constraints: {}
10+
frontend.failoverCoolDown:
11+
- value: 5s
12+
history.ReplicationTaskProcessorStartWait: # default is 5s. repl task processor sleeps this much before processing received messages.
13+
- value: 10ms
14+
history.standbyTaskMissingEventsResendDelay:
15+
- value: 5s
16+
history.standbyTaskMissingEventsDiscardDelay:
17+
- value: 10s
18+
history.standbyClusterDelay:
19+
- value: 10s
20+
history.enableTransferQueueV2:
21+
- value: true
22+
history.enableTimerQueueV2:
23+
- value: true
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# This file is used as dynamicconfig override for "activeactive_same_wfid_signalwithstart_delayed" replication simulation scenario
2+
# configured via simulation/replication/testdata/replication_simulation_activeactive_same_wfid_signalwithstart_delayed.yaml
3+
system.writeVisibilityStoreName:
4+
- value: "db"
5+
system.readVisibilityStoreName:
6+
- value: "db"
7+
history.replicatorTaskBatchSize:
8+
- value: 25
9+
constraints: {}
10+
frontend.failoverCoolDown:
11+
- value: 5s
12+
history.ReplicationTaskProcessorStartWait: # default is 5s. repl task processor sleeps this much before processing received messages.
13+
- value: 10ms
14+
history.standbyTaskMissingEventsResendDelay:
15+
- value: 5s
16+
history.standbyTaskMissingEventsDiscardDelay:
17+
- value: 10s
18+
history.standbyClusterDelay:
19+
- value: 10s
20+
history.enableTransferQueueV2:
21+
- value: true
22+
history.enableTimerQueueV2:
23+
- value: true

simulation/replication/replication_simulation_test.go

Lines changed: 74 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ func TestReplicationSimulation(t *testing.T) {
8787
startTime := time.Now().UTC()
8888
simTypes.Logf(t, "Simulation start time: %v", startTime)
8989
for i, op := range simCfg.Operations {
90-
op := op
9190
waitForOpTime(t, op, startTime)
9291
var err error
9392
switch op.Type {
@@ -100,9 +99,9 @@ func TestReplicationSimulation(t *testing.T) {
10099
case simTypes.ReplicationSimulationOperationValidate:
101100
err = validate(t, op, simCfg, sim)
102101
case simTypes.ReplicationSimulationOperationQueryWorkflow:
103-
err = queryWorkflow(t, op, simCfg)
102+
err = queryWorkflow(t, op, simCfg, sim)
104103
case simTypes.ReplicationSimulationOperationSignalWithStartWorkflow:
105-
err = signalWithStartWorkflow(t, op, simCfg)
104+
err = signalWithStartWorkflow(t, op, simCfg, sim)
106105
case simTypes.ReplicationSimulationOperationMigrateDomainToActiveActive:
107106
err = migrateDomainToActiveActive(t, op, simCfg)
108107
case simTypes.ReplicationSimulationOperationValidateWorkflowReplication:
@@ -330,7 +329,7 @@ func migrateDomainToActiveActive(t *testing.T, op *simTypes.Operation, simCfg *s
330329
return nil
331330
}
332331

333-
func queryWorkflow(t *testing.T, op *simTypes.Operation, simCfg *simTypes.ReplicationSimulationConfig) error {
332+
func queryWorkflow(t *testing.T, op *simTypes.Operation, simCfg *simTypes.ReplicationSimulationConfig, sim *simTypes.ReplicationSimulation) error {
334333
t.Helper()
335334

336335
simTypes.Logf(t, "Querying workflow: %s on domain %s on cluster: %s", op.WorkflowID, op.Domain, op.Cluster)
@@ -344,11 +343,22 @@ func queryWorkflow(t *testing.T, op *simTypes.Operation, simCfg *simTypes.Replic
344343
consistencyLevel = types.QueryConsistencyLevelStrong.Ptr()
345344
}
346345

346+
// Prepare workflow execution - use specific RunID if provided via runIDKey
347+
executionRequest := &types.WorkflowExecution{
348+
WorkflowID: op.WorkflowID,
349+
}
350+
if op.RunIDKey != "" {
351+
if runID, err := sim.GetRunID(op.RunIDKey); err == nil && runID != "" {
352+
executionRequest.RunID = runID
353+
simTypes.Logf(t, "Using stored RunID %s for query (key: %s)", runID, op.RunIDKey)
354+
} else {
355+
return fmt.Errorf("runIDKey %s specified but no RunID found in registry", op.RunIDKey)
356+
}
357+
}
358+
347359
queryResp, err := frontendCl.QueryWorkflow(ctx, &types.QueryWorkflowRequest{
348-
Domain: op.Domain,
349-
Execution: &types.WorkflowExecution{
350-
WorkflowID: op.WorkflowID,
351-
},
360+
Domain: op.Domain,
361+
Execution: executionRequest,
352362
Query: &types.WorkflowQuery{
353363
QueryType: op.Query,
354364
},
@@ -373,6 +383,7 @@ func signalWithStartWorkflow(
373383
t *testing.T,
374384
op *simTypes.Operation,
375385
simCfg *simTypes.ReplicationSimulationConfig,
386+
sim *simTypes.ReplicationSimulation,
376387
) error {
377388
t.Helper()
378389
simTypes.Logf(t, "SignalWithStart workflow: %s on domain %s on cluster: %s", op.WorkflowID, op.Domain, op.Cluster)
@@ -398,7 +409,15 @@ func signalWithStartWorkflow(
398409
return err
399410
}
400411

401-
simTypes.Logf(t, "SignalWithStart workflow: %s on domain %s on cluster: %s. RunID: %s", op.WorkflowID, op.Domain, op.Cluster, signalResp.GetRunID())
412+
runID := signalResp.GetRunID()
413+
simTypes.Logf(t, "SignalWithStart workflow: %s on domain %s on cluster: %s. RunID: %s", op.WorkflowID, op.Domain, op.Cluster, runID)
414+
415+
// Store RunID if runIDKey is specified
416+
if op.RunIDKey != "" {
417+
sim.StoreRunID(op.RunIDKey, runID)
418+
simTypes.Logf(t, "Stored RunID %s with key: %s", runID, op.RunIDKey)
419+
}
420+
402421
return nil
403422
}
404423

@@ -415,73 +434,89 @@ func validate(
415434

416435
simTypes.Logf(t, "Validating workflow: %s on cluster: %s", op.WorkflowID, op.Cluster)
417436

437+
consistencyLevel := types.QueryConsistencyLevelEventual.Ptr()
438+
if op.ConsistencyLevel == "strong" {
439+
consistencyLevel = types.QueryConsistencyLevelStrong.Ptr()
440+
}
441+
418442
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
419443
defer cancel()
420444

421445
// Prepare workflow execution - use specific RunID if provided via runIDKey
422-
execution := &types.WorkflowExecution{
446+
executionRequest := &types.WorkflowExecution{
423447
WorkflowID: op.WorkflowID,
424448
}
425449
if op.RunIDKey != "" {
426450
if runID, err := sim.GetRunID(op.RunIDKey); err == nil && runID != "" {
427-
execution.RunID = runID
451+
executionRequest.RunID = runID
428452
simTypes.Logf(t, "Using stored RunID %s for validation (key: %s)", runID, op.RunIDKey)
429453
} else {
430454
return fmt.Errorf("runIDKey %s specified but no RunID found in registry", op.RunIDKey)
431455
}
432456
}
457+
433458
resp, err := simCfg.MustGetFrontendClient(t, op.Cluster).DescribeWorkflowExecution(ctx,
434459
&types.DescribeWorkflowExecutionRequest{
435-
Domain: op.Domain,
436-
Execution: execution,
460+
Domain: op.Domain,
461+
Execution: executionRequest,
462+
QueryConsistencyLevel: consistencyLevel,
437463
})
438464
if err != nil {
439465
return err
440466
}
441467

468+
workflowStatus := resp.GetWorkflowExecutionInfo().GetCloseStatus()
469+
workflowCloseTime := resp.GetWorkflowExecutionInfo().GetCloseTime()
442470
switch op.Want.Status {
443471
case "completed":
444472
// Validate workflow completed
445-
if resp.GetWorkflowExecutionInfo().GetCloseStatus() != types.WorkflowExecutionCloseStatusCompleted || resp.GetWorkflowExecutionInfo().GetCloseTime() == 0 {
446-
return fmt.Errorf("workflow %s not completed. status: %s, close time: %v", op.WorkflowID, resp.GetWorkflowExecutionInfo().GetCloseStatus(), time.Unix(0, resp.GetWorkflowExecutionInfo().GetCloseTime()))
473+
if workflowStatus != types.WorkflowExecutionCloseStatusCompleted || workflowCloseTime == 0 {
474+
return fmt.Errorf("workflow %s not completed. status: %s, close time: %v", op.WorkflowID, workflowStatus, time.Unix(0, workflowCloseTime))
447475
}
448476
case "failed":
449477
// Validate workflow failed
450-
if resp.GetWorkflowExecutionInfo().GetCloseStatus() != types.WorkflowExecutionCloseStatusFailed || resp.GetWorkflowExecutionInfo().GetCloseTime() == 0 {
451-
return fmt.Errorf("workflow %s not failed. status: %s, close time: %v", op.WorkflowID, resp.GetWorkflowExecutionInfo().GetCloseStatus(), time.Unix(0, resp.GetWorkflowExecutionInfo().GetCloseTime()))
478+
if workflowStatus != types.WorkflowExecutionCloseStatusFailed || workflowCloseTime == 0 {
479+
return fmt.Errorf("workflow %s not failed. status: %s, close time: %v", op.WorkflowID, workflowStatus, time.Unix(0, workflowCloseTime))
452480
}
453481
case "canceled":
454482
// Validate workflow canceled
455-
if resp.GetWorkflowExecutionInfo().GetCloseStatus() != types.WorkflowExecutionCloseStatusCanceled || resp.GetWorkflowExecutionInfo().GetCloseTime() == 0 {
456-
return fmt.Errorf("workflow %s not canceled. status: %s, close time: %v", op.WorkflowID, resp.GetWorkflowExecutionInfo().GetCloseStatus(), time.Unix(0, resp.GetWorkflowExecutionInfo().GetCloseTime()))
483+
if workflowStatus != types.WorkflowExecutionCloseStatusCanceled || workflowCloseTime == 0 {
484+
return fmt.Errorf("workflow %s not canceled. status: %s, close time: %v", op.WorkflowID, workflowStatus, time.Unix(0, workflowCloseTime))
457485
}
458486
case "terminated":
459487
// Validate workflow terminated
460-
if resp.GetWorkflowExecutionInfo().GetCloseStatus() != types.WorkflowExecutionCloseStatusTerminated || resp.GetWorkflowExecutionInfo().GetCloseTime() == 0 {
461-
return fmt.Errorf("workflow %s not terminated. status: %s, close time: %v", op.WorkflowID, resp.GetWorkflowExecutionInfo().GetCloseStatus(), time.Unix(0, resp.GetWorkflowExecutionInfo().GetCloseTime()))
488+
if workflowStatus != types.WorkflowExecutionCloseStatusTerminated || workflowCloseTime == 0 {
489+
return fmt.Errorf("workflow %s not terminated. status: %s, close time: %v", op.WorkflowID, workflowStatus, time.Unix(0, workflowCloseTime))
462490
}
463491
case "continued-as-new":
464492
// Validate workflow continued as new
465-
if resp.GetWorkflowExecutionInfo().GetCloseStatus() != types.WorkflowExecutionCloseStatusContinuedAsNew || resp.GetWorkflowExecutionInfo().GetCloseTime() == 0 {
466-
return fmt.Errorf("workflow %s not continued as new. status: %s, close time: %v", op.WorkflowID, resp.GetWorkflowExecutionInfo().GetCloseStatus(), time.Unix(0, resp.GetWorkflowExecutionInfo().GetCloseTime()))
493+
if workflowStatus != types.WorkflowExecutionCloseStatusContinuedAsNew || workflowCloseTime == 0 {
494+
return fmt.Errorf("workflow %s not continued as new. status: %s, close time: %v", op.WorkflowID, workflowStatus, time.Unix(0, workflowCloseTime))
467495
}
468496
case "timed-out":
469497
// Validate workflow timed out
470-
if resp.GetWorkflowExecutionInfo().GetCloseStatus() != types.WorkflowExecutionCloseStatusTimedOut || resp.GetWorkflowExecutionInfo().GetCloseTime() == 0 {
471-
return fmt.Errorf("workflow %s not timed out. status: %s, close time: %v", op.WorkflowID, resp.GetWorkflowExecutionInfo().GetCloseStatus(), time.Unix(0, resp.GetWorkflowExecutionInfo().GetCloseTime()))
498+
if workflowStatus != types.WorkflowExecutionCloseStatusTimedOut || workflowCloseTime == 0 {
499+
return fmt.Errorf("workflow %s not timed out. status: %s, close time: %v", op.WorkflowID, workflowStatus, time.Unix(0, workflowCloseTime))
472500
}
473501
default:
474502
// Validate workflow is running
475-
if resp.GetWorkflowExecutionInfo().GetCloseTime() != 0 {
476-
return fmt.Errorf("workflow %s not running. status: %s, close time: %v", op.WorkflowID, resp.GetWorkflowExecutionInfo().GetCloseStatus(), time.Unix(0, resp.GetWorkflowExecutionInfo().GetCloseTime()))
503+
if workflowCloseTime != 0 {
504+
return fmt.Errorf("workflow %s not running. status: %s, close time: %v", op.WorkflowID, workflowStatus, time.Unix(0, workflowCloseTime))
477505
}
478506
}
479507

480508
simTypes.Logf(t, "Validated workflow: %s on cluster: %s. Status: %s, CloseTime: %v", op.WorkflowID, op.Cluster, resp.GetWorkflowExecutionInfo().GetCloseStatus(), time.Unix(0, resp.GetWorkflowExecutionInfo().GetCloseTime()))
481509

482510
// Get history to validate the worker identity that started and completed the workflow
483511
// Some workflows start in cluster0 and complete in cluster1. This is to validate that
484-
history, err := getAllHistory(t, simCfg, op.Cluster, op.Domain, op.WorkflowID)
512+
var runID string
513+
if op.RunIDKey != "" {
514+
runID, err = sim.GetRunID(op.RunIDKey)
515+
if err != nil {
516+
return err
517+
}
518+
}
519+
history, err := getAllHistory(t, simCfg, op.Cluster, op.Domain, op.WorkflowID, runID)
485520
if err != nil {
486521
return err
487522
}
@@ -588,17 +623,23 @@ func waitForOpTime(t *testing.T, op *simTypes.Operation, startTime time.Time) {
588623
simTypes.Logf(t, "Operation time (t + %ds) reached: %v", int(op.At.Seconds()), startTime.Add(op.At))
589624
}
590625

591-
func getAllHistory(t *testing.T, simCfg *simTypes.ReplicationSimulationConfig, clusterName, domainName, wfID string) ([]types.HistoryEvent, error) {
626+
func getAllHistory(t *testing.T, simCfg *simTypes.ReplicationSimulationConfig, clusterName, domainName, wfID, runID string) ([]types.HistoryEvent, error) {
592627
frontendCl := simCfg.MustGetFrontendClient(t, clusterName)
593628
var nextPageToken []byte
594629
var history []types.HistoryEvent
630+
631+
executionRequest := &types.WorkflowExecution{
632+
WorkflowID: wfID,
633+
}
634+
if runID != "" {
635+
executionRequest.RunID = runID
636+
}
637+
595638
for {
596639
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
597640
response, err := frontendCl.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{
598-
Domain: domainName,
599-
Execution: &types.WorkflowExecution{
600-
WorkflowID: wfID,
601-
},
641+
Domain: domainName,
642+
Execution: executionRequest,
602643
MaximumPageSize: 1000,
603644
NextPageToken: nextPageToken,
604645
WaitForNewEvent: false,
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# This file is a replication simulation scenario spec.
2+
# It is parsed into ReplicationSimulationConfig struct.
3+
# Replication simulation for this file can be run via ./simulation/replication/run.sh activeactive_same_wfid_signalwithstart
4+
# Dynamic config overrides can be set via config/dynamicconfig/replication_simulation_activeactive_same_wfid_signalwithstart.yml
5+
# When a domain is configured as active-active
6+
# And the same workflow ID is used to SignalWithStart in multiple clusters
7+
# Then the 'earlier' workflow should be terminated
8+
# And the 'later' workflow should complete
9+
10+
clusters:
11+
cluster0:
12+
grpcEndpoint: "cadence-cluster0:7833"
13+
cluster1:
14+
grpcEndpoint: "cadence-cluster1:7833"
15+
16+
# primaryCluster is where domain data is written to and replicates to others. e.g. domain registration
17+
primaryCluster: "cluster0"
18+
19+
domains:
20+
test-domain-aa-conflict:
21+
activeClustersByRegion:
22+
region0: cluster0
23+
region1: cluster1
24+
25+
operations:
26+
- op: signal_with_start_workflow
27+
at: 0s
28+
workflowID: conflict-wf
29+
workflowType: timer-activity-loop-workflow
30+
cluster: cluster0
31+
domain: test-domain-aa-conflict
32+
signalName: custom-signal
33+
signalInput: "cluster0-signal-data"
34+
workflowExecutionStartToCloseTimeout: 65s
35+
workflowDuration: 35s
36+
runIDKey: cluster0-run
37+
38+
- op: signal_with_start_workflow
39+
at: 2s
40+
workflowID: conflict-wf
41+
workflowType: timer-activity-loop-workflow
42+
cluster: cluster1
43+
domain: test-domain-aa-conflict
44+
signalName: custom-signal
45+
signalInput: "cluster1-signal-data"
46+
workflowExecutionStartToCloseTimeout: 65s
47+
workflowDuration: 35s
48+
runIDKey: cluster1-run
49+
50+
# Query the cluster0 run to validate it was started with the correct signal
51+
# Note that if this query is delayed it will eventually see the signal from the 'later' workflow
52+
- op: query_workflow
53+
at: 3s
54+
workflowID: conflict-wf
55+
cluster: cluster0
56+
domain: test-domain-aa-conflict
57+
query: latest-signal-content
58+
runIDKey: cluster0-run
59+
want:
60+
queryResult: ["cluster0-signal-data"]
61+
62+
# Query the cluster1 run to validate it was started with the correct signal
63+
- op: query_workflow
64+
at: 3s
65+
workflowID: conflict-wf
66+
cluster: cluster1
67+
domain: test-domain-aa-conflict
68+
query: latest-signal-content
69+
runIDKey: cluster1-run
70+
want:
71+
queryResult: ["cluster1-signal-data"]
72+
73+
- op: validate
74+
at: 30s
75+
workflowID: conflict-wf
76+
runIDKey: cluster0-run
77+
cluster: cluster0
78+
domain: test-domain-aa-conflict
79+
want:
80+
status: terminated
81+
startedByWorkersInCluster: cluster0
82+
completedByWorkersInCluster: cluster0
83+
84+
- op: validate
85+
at: 30s
86+
workflowID: conflict-wf
87+
runIDKey: cluster1-run
88+
cluster: cluster0
89+
domain: test-domain-aa-conflict
90+
want:
91+
status: running
92+
startedByWorkersInCluster: cluster1
93+
94+
- op: validate
95+
at: 70s
96+
workflowID: conflict-wf
97+
runIDKey: cluster1-run
98+
cluster: cluster0
99+
domain: test-domain-aa-conflict
100+
want:
101+
status: completed
102+
startedByWorkersInCluster: cluster1
103+
completedByWorkersInCluster: cluster1

0 commit comments

Comments
 (0)