diff --git a/common/types/mapper/thrift/shared.go b/common/types/mapper/thrift/shared.go index a229dae9034..e29c8ce3738 100644 --- a/common/types/mapper/thrift/shared.go +++ b/common/types/mapper/thrift/shared.go @@ -5831,6 +5831,7 @@ func FromStartWorkflowExecutionRequest(t *types.StartWorkflowExecutionRequest) * if t == nil { return nil } + thriftPolicy := FromActiveClusterSelectionPolicy(t.ActiveClusterSelectionPolicy) return &shared.StartWorkflowExecutionRequest{ Domain: &t.Domain, WorkflowId: &t.WorkflowID, @@ -5851,7 +5852,7 @@ func FromStartWorkflowExecutionRequest(t *types.StartWorkflowExecutionRequest) * JitterStartSeconds: t.JitterStartSeconds, FirstRunAtTimestamp: t.FirstRunAtTimeStamp, CronOverlapPolicy: FromCronOverlapPolicy(t.CronOverlapPolicy), - ActiveClusterSelectionPolicy: FromActiveClusterSelectionPolicy(t.ActiveClusterSelectionPolicy), + ActiveClusterSelectionPolicy: thriftPolicy, } } diff --git a/config/dynamicconfig/development.yaml b/config/dynamicconfig/development.yaml index 125829da1de..064c29b1ec0 100644 --- a/config/dynamicconfig/development.yaml +++ b/config/dynamicconfig/development.yaml @@ -75,6 +75,8 @@ system.domainAuditLogTTL: - value: "15m" matching.enableClientAutoConfig: - value: true +frontend.enableActiveClusterSelectionPolicyInStartWorkflow: +- value: true shardDistributor.migrationMode: - value: "onboarded" - value: "local_pass" diff --git a/tools/cli/flags.go b/tools/cli/flags.go index 25bdee5c83a..eb0853c9562 100644 --- a/tools/cli/flags.go +++ b/tools/cli/flags.go @@ -226,6 +226,8 @@ const ( FlagNumReadPartitions = "num_read_partitions" FlagNumWritePartitions = "num_write_partitions" FlagCronOverlapPolicy = "cron_overlap_policy" + FlagClusterAttributeScope = "cluster_attribute_scope" + FlagClusterAttributeName = "cluster_attribute_name" FlagClustersUsage = "Clusters (example: --clusters clusterA,clusterB or --cl clusterA --cl clusterB)" ) @@ -445,6 +447,16 @@ func getFlagsForStart() []cli.Flag { Name: FirstRunAtTime, Usage: "Optional workflow's first run start time in RFC3339 format, like \"1970-01-01T00:00:00Z\". If set, first run of the workflow will start at the specified time.", }, + &cli.StringFlag{ + Name: FlagClusterAttributeScope, + Usage: "Optional cluster attribute to specify how to select the active cluster. Examples might be 'region' or 'location'", + Aliases: []string{"cascope"}, + }, + &cli.StringFlag{ + Name: FlagClusterAttributeName, + Usage: "Optional cluster attribute to be set for the workflow, used to determine, in active-active domains. This specifies which attribute to tie the workflow to, for example, if the scope is 'region' and the name is 'Lisbon' or 'San Francisco'", + Aliases: []string{"caname"}, + }, } } diff --git a/tools/cli/workflow_commands.go b/tools/cli/workflow_commands.go index bac2a101948..bcb6aeba2aa 100644 --- a/tools/cli/workflow_commands.go +++ b/tools/cli/workflow_commands.go @@ -416,6 +416,10 @@ func constructStartWorkflowRequest(c *cli.Context) (*types.StartWorkflowExecutio if err != nil { return nil, commoncli.Problem("Error in starting wf request: ", err) } + activeClusterSelectionPolicy, err := parseClusterAttributes(c.String(FlagClusterAttributeScope), c.String(FlagClusterAttributeName)) + if err != nil { + return nil, commoncli.Problem("Error parsing cluster attributes: ", err) + } startRequest := &types.StartWorkflowExecutionRequest{ RequestID: uuid.New(), Domain: domain, @@ -431,6 +435,7 @@ func constructStartWorkflowRequest(c *cli.Context) (*types.StartWorkflowExecutio TaskStartToCloseTimeoutSeconds: common.Int32Ptr(int32(dt)), Identity: getCliIdentity(), WorkflowIDReusePolicy: reusePolicy, + ActiveClusterSelectionPolicy: activeClusterSelectionPolicy, } if c.IsSet(FlagCronSchedule) { startRequest.CronSchedule = c.String(FlagCronSchedule) @@ -856,6 +861,7 @@ func constructSignalWithStartWorkflowRequest(c *cli.Context) (*types.SignalWithS DelayStartSeconds: startRequest.DelayStartSeconds, JitterStartSeconds: startRequest.JitterStartSeconds, FirstRunAtTimestamp: startRequest.FirstRunAtTimeStamp, + ActiveClusterSelectionPolicy: startRequest.ActiveClusterSelectionPolicy, }, nil } @@ -2692,3 +2698,20 @@ func mapQueryRejectConditionFromFlag(flag string) (types.QueryRejectCondition, e return rejectCondition, nil } + +func parseClusterAttributes(clusterAttributeScope string, clusterAttributeName string) (*types.ActiveClusterSelectionPolicy, error) { + if clusterAttributeScope == "" && clusterAttributeName == "" { + // default case, these values are optional so most workflows will not use them + return nil, nil + } + if clusterAttributeScope == "" || clusterAttributeName == "" { + return nil, fmt.Errorf("invalid cluster attribute, scope or name is empty, either use both or none to start workflows. got %q.%q", clusterAttributeScope, clusterAttributeName) + } + policy := &types.ActiveClusterSelectionPolicy{ + ClusterAttribute: &types.ClusterAttribute{ + Scope: clusterAttributeScope, + Name: clusterAttributeName, + }, + } + return policy, nil +} diff --git a/tools/cli/workflow_commands_test.go b/tools/cli/workflow_commands_test.go index 2bead8c2862..d0ccbdf1fbe 100644 --- a/tools/cli/workflow_commands_test.go +++ b/tools/cli/workflow_commands_test.go @@ -3134,3 +3134,65 @@ func TestMapQueryRejectConditionFromFlag(t *testing.T) { }) } } + +func TestParseClusterAttributes(t *testing.T) { + testCases := []struct { + name string + clusterAttributeScope string + clusterAttributeName string + expectedPolicy *types.ActiveClusterSelectionPolicy + expectError bool + expectedErrorSubstring string + }{ + { + name: "both empty - should return nil", + clusterAttributeScope: "", + clusterAttributeName: "", + expectedPolicy: nil, + expectError: false, + }, + { + name: "both provided - should return valid policy", + clusterAttributeScope: "test-scope", + clusterAttributeName: "test-name", + expectedPolicy: &types.ActiveClusterSelectionPolicy{ + ClusterAttribute: &types.ClusterAttribute{ + Scope: "test-scope", + Name: "test-name", + }, + }, + expectError: false, + }, + { + name: "empty scope with name provided - should error", + clusterAttributeScope: "", + clusterAttributeName: "test-name", + expectedPolicy: nil, + expectError: true, + expectedErrorSubstring: "invalid cluster attribute", + }, + { + name: "scope provided with empty name - should error", + clusterAttributeScope: "test-scope", + clusterAttributeName: "", + expectedPolicy: nil, + expectError: true, + expectedErrorSubstring: "invalid cluster attribute", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result, err := parseClusterAttributes(tc.clusterAttributeScope, tc.clusterAttributeName) + + if tc.expectError { + assert.Error(t, err) + assert.Contains(t, err.Error(), tc.expectedErrorSubstring) + assert.Nil(t, result) + } else { + assert.NoError(t, err) + assert.Equal(t, tc.expectedPolicy, result) + } + }) + } +}