diff --git a/CHANGELOG.md b/CHANGELOG.md index badce5ccc..60a847107 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] +### Added +- Added ActiveClusterSelectionPolicy to workflow start options (#1438) + ## [v1.3.0] - 2025-07-08 ### Added - Added PollerInitCount and reduced default PollerMaxCount in AutoScalerOptions (#1433) diff --git a/internal/client.go b/internal/client.go index 50f13a6ac..8c6d1128d 100644 --- a/internal/client.go +++ b/internal/client.go @@ -490,6 +490,11 @@ type ( // Note: JitterStart remains used when determining the actual start time of executions. // Optional: defaulted to CronOverlapPolicySkip CronOverlapPolicy s.CronOverlapPolicy + + // ActiveClusterSelectionPolicy - Policy for selecting the active cluster to start the workflow execution on for active-active domains. + // Note: This doesn't apply to local or active-passive domains. + // Optional: defaulted to region sticky strategy with receiver cluster as the active cluster. + ActiveClusterSelectionPolicy *ActiveClusterSelectionPolicy } // RetryPolicy defines the retry policy. @@ -563,6 +568,29 @@ type ( // ParentClosePolicy defines the action on children when parent is closed ParentClosePolicy int + + // ActiveClusterSelectionPolicy defines the policy for selecting the active cluster to start the workflow execution on for active-active domains. + // Active-active domains can be configured to be active in multiple clusters (at most one in a given region). + // Individual workflows can be configured to be active in one of the active clusters of the domain. + // + // There are two supported strategies: + // - Region sticky: The workflow will be active in the active cluster of the region that start workflow request is sent to. + // - External entity: The workflow can be associated with an external entity which has a corresponding region. + // The workflow will be considered active in the active cluster of the region that the external entity is in. + // Cadence server must be aware of the external entity type used. Custom types can be registered following the documentation in + // https://github.com/cadence-workflow/cadence/blob/master/docs/design/active-active/active-active.md + ActiveClusterSelectionPolicy struct { + Strategy ActiveClusterSelectionStrategy + ExternalEntityType string + ExternalEntityKey string + } + + ActiveClusterSelectionStrategy int +) + +const ( + ActiveClusterSelectionStrategyRegionSticky ActiveClusterSelectionStrategy = iota + ActiveClusterSelectionStrategyExternalEntity ) const ( diff --git a/internal/compatibility/api_test.go b/internal/compatibility/api_test.go index fbd014708..fbcb777a1 100644 --- a/internal/compatibility/api_test.go +++ b/internal/compatibility/api_test.go @@ -21,6 +21,7 @@ package compatibility import ( + "fmt" "testing" gogo "github.com/gogo/protobuf/types" @@ -533,8 +534,17 @@ func TestSignalExternalWorkflowExecutionInitiatedEventAttributes(t *testing.T) { } } func TestSignalWithStartWorkflowExecutionRequest(t *testing.T) { - for _, item := range []*apiv1.SignalWithStartWorkflowExecutionRequest{nil, {StartRequest: &apiv1.StartWorkflowExecutionRequest{}}, &testdata.SignalWithStartWorkflowExecutionRequest} { - assert.Equal(t, item, proto.SignalWithStartWorkflowExecutionRequest(thrift.SignalWithStartWorkflowExecutionRequest(item))) + tests := []*apiv1.SignalWithStartWorkflowExecutionRequest{ + nil, + {StartRequest: &apiv1.StartWorkflowExecutionRequest{}}, + &testdata.SignalWithStartWorkflowExecutionRequest, + &testdata.SignalWithStartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy1, + &testdata.SignalWithStartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy2, + } + for i, item := range tests { + t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { + assert.Equal(t, item, proto.SignalWithStartWorkflowExecutionRequest(thrift.SignalWithStartWorkflowExecutionRequest(item))) + }) } } func TestSignalWithStartWorkflowExecutionResponse(t *testing.T) { @@ -563,8 +573,17 @@ func TestStartTimeFilter(t *testing.T) { } } func TestStartWorkflowExecutionRequest(t *testing.T) { - for _, item := range []*apiv1.StartWorkflowExecutionRequest{nil, {}, &testdata.StartWorkflowExecutionRequest} { - assert.Equal(t, item, proto.StartWorkflowExecutionRequest(thrift.StartWorkflowExecutionRequest(item))) + tests := []*apiv1.StartWorkflowExecutionRequest{ + nil, + {}, + &testdata.StartWorkflowExecutionRequest, + &testdata.StartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy1, + &testdata.StartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy2, + } + for i, item := range tests { + t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { + assert.Equal(t, item, proto.StartWorkflowExecutionRequest(thrift.StartWorkflowExecutionRequest(item))) + }) } } func TestStartWorkflowExecutionResponse(t *testing.T) { diff --git a/internal/compatibility/enum_test.go b/internal/compatibility/enum_test.go index 2baa1631b..a3e3f044c 100644 --- a/internal/compatibility/enum_test.go +++ b/internal/compatibility/enum_test.go @@ -346,3 +346,15 @@ func TestCronOverlapPolicy(t *testing.T) { } assert.Equal(t, apiv1.CronOverlapPolicy_CRON_OVERLAP_POLICY_INVALID, proto.CronOverlapPolicy(thrift.CronOverlapPolicy(999))) } + +func TestActiveClusterSelectionStrategy(t *testing.T) { + for _, v := range []apiv1.ActiveClusterSelectionStrategy{ + apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_INVALID, + apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_REGION_STICKY, + apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_EXTERNAL_ENTITY, + } { + assert.Equal(t, v, proto.ActiveClusterSelectionStrategy(thrift.ActiveClusterSelectionStrategy(v))) + } + + assert.Equal(t, apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_INVALID, proto.ActiveClusterSelectionStrategy(thrift.ActiveClusterSelectionStrategy(999))) +} diff --git a/internal/compatibility/proto/enum.go b/internal/compatibility/proto/enum.go index 90a11bcdf..5ddf54ce0 100644 --- a/internal/compatibility/proto/enum.go +++ b/internal/compatibility/proto/enum.go @@ -406,3 +406,16 @@ func CronOverlapPolicy(t *shared.CronOverlapPolicy) apiv1.CronOverlapPolicy { } return apiv1.CronOverlapPolicy_CRON_OVERLAP_POLICY_INVALID } + +func ActiveClusterSelectionStrategy(t *shared.ActiveClusterSelectionStrategy) apiv1.ActiveClusterSelectionStrategy { + if t == nil { + return apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_INVALID + } + switch *t { + case shared.ActiveClusterSelectionStrategyRegionSticky: + return apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_REGION_STICKY + case shared.ActiveClusterSelectionStrategyExternalEntity: + return apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_EXTERNAL_ENTITY + } + return apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_INVALID +} diff --git a/internal/compatibility/proto/request.go b/internal/compatibility/proto/request.go index 574e6990f..3c4521ccc 100644 --- a/internal/compatibility/proto/request.go +++ b/internal/compatibility/proto/request.go @@ -422,6 +422,8 @@ func SignalWithStartWorkflowExecutionRequest(t *shared.SignalWithStartWorkflowEx DelayStart: secondsToDuration(t.DelayStartSeconds), JitterStart: secondsToDuration(t.JitterStartSeconds), FirstRunAt: unixNanoToTime(t.FirstRunAtTimestamp), + CronOverlapPolicy: CronOverlapPolicy(t.CronOverlapPolicy), + ActiveClusterSelectionPolicy: ActiveClusterSelectionPolicy(t.ActiveClusterSelectionPolicy), }, SignalName: t.GetSignalName(), SignalInput: Payload(t.SignalInput), @@ -477,6 +479,8 @@ func StartWorkflowExecutionRequest(t *shared.StartWorkflowExecutionRequest) *api DelayStart: secondsToDuration(t.DelayStartSeconds), JitterStart: secondsToDuration(t.JitterStartSeconds), FirstRunAt: unixNanoToTime(t.FirstRunAtTimestamp), + CronOverlapPolicy: CronOverlapPolicy(t.CronOverlapPolicy), + ActiveClusterSelectionPolicy: ActiveClusterSelectionPolicy(t.ActiveClusterSelectionPolicy), } } @@ -680,3 +684,36 @@ func DeleteDomainRequest(r *shared.DeleteDomainRequest) *apiv1.DeleteDomainReque SecurityToken: r.GetSecurityToken(), } } + +func ActiveClusterSelectionPolicy(t *shared.ActiveClusterSelectionPolicy) *apiv1.ActiveClusterSelectionPolicy { + if t == nil { + return nil + } + plc := &apiv1.ActiveClusterSelectionPolicy{ + Strategy: ActiveClusterSelectionStrategy(t.Strategy), + } + + if plc.Strategy == apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_INVALID { + return nil + } + + switch *t.Strategy { + case shared.ActiveClusterSelectionStrategyRegionSticky: + plc.StrategyConfig = &apiv1.ActiveClusterSelectionPolicy_ActiveClusterStickyRegionConfig{ + ActiveClusterStickyRegionConfig: &apiv1.ActiveClusterStickyRegionConfig{ + StickyRegion: *t.StickyRegion, + }, + } + case shared.ActiveClusterSelectionStrategyExternalEntity: + plc.StrategyConfig = &apiv1.ActiveClusterSelectionPolicy_ActiveClusterExternalEntityConfig{ + ActiveClusterExternalEntityConfig: &apiv1.ActiveClusterExternalEntityConfig{ + ExternalEntityType: *t.ExternalEntityType, + ExternalEntityKey: *t.ExternalEntityKey, + }, + } + default: + return nil + } + + return plc +} diff --git a/internal/compatibility/testdata/common.go b/internal/compatibility/testdata/common.go index 3567f32fb..a8dd6d63c 100644 --- a/internal/compatibility/testdata/common.go +++ b/internal/compatibility/testdata/common.go @@ -169,6 +169,25 @@ var ( "IndexedField2": &apiv1.Payload{Data: []byte{232, 0}}, }, } + + ActiveClusterSelectionPolicyRegionSticky = &apiv1.ActiveClusterSelectionPolicy{ + Strategy: apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_REGION_STICKY, + StrategyConfig: &apiv1.ActiveClusterSelectionPolicy_ActiveClusterStickyRegionConfig{ + ActiveClusterStickyRegionConfig: &apiv1.ActiveClusterStickyRegionConfig{ + StickyRegion: "us-east-1", + }, + }, + } + ActiveClusterSelectionPolicyExternalEntity = &apiv1.ActiveClusterSelectionPolicy{ + Strategy: apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_EXTERNAL_ENTITY, + StrategyConfig: &apiv1.ActiveClusterSelectionPolicy_ActiveClusterExternalEntityConfig{ + ActiveClusterExternalEntityConfig: &apiv1.ActiveClusterExternalEntityConfig{ + ExternalEntityType: "external-entity-type", + ExternalEntityKey: "external-entity-key", + }, + }, + } + PayloadMap = map[string]*apiv1.Payload{ "Payload1": &Payload1, "Payload2": &Payload2, diff --git a/internal/compatibility/testdata/service.go b/internal/compatibility/testdata/service.go index a4e38530c..d78a4965a 100644 --- a/internal/compatibility/testdata/service.go +++ b/internal/compatibility/testdata/service.go @@ -334,6 +334,44 @@ var ( SearchAttributes: &SearchAttributes, Header: &Header, } + StartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy1 = apiv1.StartWorkflowExecutionRequest{ + Domain: DomainName, + WorkflowId: WorkflowID, + WorkflowType: &WorkflowType, + TaskList: &TaskList, + Input: &Payload1, + ExecutionStartToCloseTimeout: Duration1, + TaskStartToCloseTimeout: Duration2, + Identity: Identity, + RequestId: RequestID, + WorkflowIdReusePolicy: WorkflowIDReusePolicy, + RetryPolicy: &RetryPolicy, + CronSchedule: CronSchedule, + Memo: &Memo, + SearchAttributes: &SearchAttributes, + Header: &Header, + CronOverlapPolicy: apiv1.CronOverlapPolicy_CRON_OVERLAP_POLICY_SKIPPED, + ActiveClusterSelectionPolicy: ActiveClusterSelectionPolicyRegionSticky, + } + StartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy2 = apiv1.StartWorkflowExecutionRequest{ + Domain: DomainName, + WorkflowId: WorkflowID, + WorkflowType: &WorkflowType, + TaskList: &TaskList, + Input: &Payload1, + ExecutionStartToCloseTimeout: Duration1, + TaskStartToCloseTimeout: Duration2, + Identity: Identity, + RequestId: RequestID, + WorkflowIdReusePolicy: WorkflowIDReusePolicy, + RetryPolicy: &RetryPolicy, + CronSchedule: CronSchedule, + Memo: &Memo, + SearchAttributes: &SearchAttributes, + Header: &Header, + CronOverlapPolicy: apiv1.CronOverlapPolicy_CRON_OVERLAP_POLICY_BUFFER_ONE, + ActiveClusterSelectionPolicy: ActiveClusterSelectionPolicyExternalEntity, + } StartWorkflowExecutionResponse = apiv1.StartWorkflowExecutionResponse{ RunId: RunID, } @@ -352,6 +390,18 @@ var ( SignalInput: &Payload2, Control: Control, } + SignalWithStartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy1 = apiv1.SignalWithStartWorkflowExecutionRequest{ + StartRequest: &StartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy1, + SignalName: SignalName, + SignalInput: &Payload2, + Control: Control, + } + SignalWithStartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy2 = apiv1.SignalWithStartWorkflowExecutionRequest{ + StartRequest: &StartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy2, + SignalName: SignalName, + SignalInput: &Payload2, + Control: Control, + } SignalWithStartWorkflowExecutionResponse = apiv1.SignalWithStartWorkflowExecutionResponse{ RunId: RunID, } diff --git a/internal/compatibility/thrift/enum.go b/internal/compatibility/thrift/enum.go index 0fcc3e006..0a5682f19 100644 --- a/internal/compatibility/thrift/enum.go +++ b/internal/compatibility/thrift/enum.go @@ -394,3 +394,16 @@ func CronOverlapPolicy(t apiv1.CronOverlapPolicy) *shared.CronOverlapPolicy { // we treat any unknown value as invalid return nil } + +func ActiveClusterSelectionStrategy(t apiv1.ActiveClusterSelectionStrategy) *shared.ActiveClusterSelectionStrategy { + switch t { + case apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_INVALID: + return nil + case apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_REGION_STICKY: + return shared.ActiveClusterSelectionStrategyRegionSticky.Ptr() + case apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_EXTERNAL_ENTITY: + return shared.ActiveClusterSelectionStrategyExternalEntity.Ptr() + } + // we treat any unknown value as invalid + return nil +} diff --git a/internal/compatibility/thrift/request.go b/internal/compatibility/thrift/request.go index 1a71d34db..474599423 100644 --- a/internal/compatibility/thrift/request.go +++ b/internal/compatibility/thrift/request.go @@ -22,6 +22,7 @@ package thrift import ( "go.uber.org/cadence/.gen/go/shared" + "go.uber.org/cadence/internal/common" apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" ) @@ -432,6 +433,8 @@ func SignalWithStartWorkflowExecutionRequest(t *apiv1.SignalWithStartWorkflowExe request.Memo = Memo(t.StartRequest.Memo) request.SearchAttributes = SearchAttributes(t.StartRequest.SearchAttributes) request.Header = Header(t.StartRequest.Header) + request.CronOverlapPolicy = CronOverlapPolicy(t.StartRequest.CronOverlapPolicy) + request.ActiveClusterSelectionPolicy = ActiveClusterSelectionPolicy(t.StartRequest.ActiveClusterSelectionPolicy) } return request @@ -475,6 +478,8 @@ func StartWorkflowExecutionRequest(t *apiv1.StartWorkflowExecutionRequest) *shar DelayStartSeconds: durationToSeconds(t.DelayStart), JitterStartSeconds: durationToSeconds(t.JitterStart), FirstRunAtTimestamp: timeToUnixNano(t.FirstRunAt), + CronOverlapPolicy: CronOverlapPolicy(t.CronOverlapPolicy), + ActiveClusterSelectionPolicy: ActiveClusterSelectionPolicy(t.ActiveClusterSelectionPolicy), } } @@ -592,3 +597,26 @@ func ListOpenWorkflowExecutionsRequest(r *apiv1.ListOpenWorkflowExecutionsReques TypeFilter: WorkflowTypeFilter(r.GetTypeFilter()), } } + +func ActiveClusterSelectionPolicy(t *apiv1.ActiveClusterSelectionPolicy) *shared.ActiveClusterSelectionPolicy { + if t == nil { + return nil + } + plc := &shared.ActiveClusterSelectionPolicy{ + Strategy: ActiveClusterSelectionStrategy(t.Strategy), + } + + if plc.Strategy == nil { + return nil + } + + switch *plc.Strategy { + case shared.ActiveClusterSelectionStrategyRegionSticky: + plc.StickyRegion = common.StringPtr(t.GetActiveClusterStickyRegionConfig().GetStickyRegion()) + case shared.ActiveClusterSelectionStrategyExternalEntity: + plc.ExternalEntityType = common.StringPtr(t.GetActiveClusterExternalEntityConfig().GetExternalEntityType()) + plc.ExternalEntityKey = common.StringPtr(t.GetActiveClusterExternalEntityConfig().GetExternalEntityKey()) + } + + return plc +} diff --git a/internal/convert.go b/internal/convert.go new file mode 100644 index 000000000..f90899bd8 --- /dev/null +++ b/internal/convert.go @@ -0,0 +1,74 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package internal + +import ( + "fmt" + + s "go.uber.org/cadence/.gen/go/shared" + "go.uber.org/cadence/internal/common" + "go.uber.org/cadence/internal/common/backoff" +) + +func convertRetryPolicy(retryPolicy *RetryPolicy) *s.RetryPolicy { + if retryPolicy == nil { + return nil + } + thriftRetryPolicy := s.RetryPolicy{ + InitialIntervalInSeconds: common.Int32Ptr(common.Int32Ceil(retryPolicy.InitialInterval.Seconds())), + MaximumIntervalInSeconds: common.Int32Ptr(common.Int32Ceil(retryPolicy.MaximumInterval.Seconds())), + BackoffCoefficient: &retryPolicy.BackoffCoefficient, + MaximumAttempts: &retryPolicy.MaximumAttempts, + NonRetriableErrorReasons: retryPolicy.NonRetriableErrorReasons, + ExpirationIntervalInSeconds: common.Int32Ptr(common.Int32Ceil(retryPolicy.ExpirationInterval.Seconds())), + } + if *thriftRetryPolicy.BackoffCoefficient == 0 { + thriftRetryPolicy.BackoffCoefficient = common.Float64Ptr(backoff.DefaultBackoffCoefficient) + } + return &thriftRetryPolicy +} + +func convertActiveClusterSelectionPolicy(policy *ActiveClusterSelectionPolicy) (*s.ActiveClusterSelectionPolicy, error) { + if policy == nil { + return nil, nil + } + + switch policy.Strategy { + case ActiveClusterSelectionStrategyRegionSticky: + return &s.ActiveClusterSelectionPolicy{ + Strategy: s.ActiveClusterSelectionStrategyRegionSticky.Ptr(), + }, nil + case ActiveClusterSelectionStrategyExternalEntity: + if policy.ExternalEntityType == "" { + return nil, fmt.Errorf("external entity type is required for external entity strategy") + } + if policy.ExternalEntityKey == "" { + return nil, fmt.Errorf("external entity key is required for external entity strategy") + } + return &s.ActiveClusterSelectionPolicy{ + Strategy: s.ActiveClusterSelectionStrategyExternalEntity.Ptr(), + ExternalEntityType: common.StringPtr(policy.ExternalEntityType), + ExternalEntityKey: common.StringPtr(policy.ExternalEntityKey), + }, nil + default: + return nil, fmt.Errorf("invalid active cluster selection strategy: %d", policy.Strategy) + } +} diff --git a/internal/convert_test.go b/internal/convert_test.go new file mode 100644 index 000000000..2b8d22aeb --- /dev/null +++ b/internal/convert_test.go @@ -0,0 +1,159 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package internal + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + s "go.uber.org/cadence/.gen/go/shared" + "go.uber.org/cadence/internal/common" + "go.uber.org/cadence/internal/common/backoff" +) + +func TestConvertRetryPolicy(t *testing.T) { + tests := []struct { + name string + retryPolicy *RetryPolicy + thriftPolicy *s.RetryPolicy + }{ + { + name: "nil retry policy - return nil", + retryPolicy: nil, + thriftPolicy: nil, + }, + { + name: "non-zero backoff coefficient - use provided", + retryPolicy: &RetryPolicy{ + InitialInterval: 1 * time.Second, + MaximumInterval: 10 * time.Second, + BackoffCoefficient: 2.0, + MaximumAttempts: 3, + NonRetriableErrorReasons: []string{"error1", "error2"}, + ExpirationInterval: 10 * time.Second, + }, + thriftPolicy: &s.RetryPolicy{ + InitialIntervalInSeconds: common.Int32Ptr(1), + MaximumIntervalInSeconds: common.Int32Ptr(10), + BackoffCoefficient: common.Float64Ptr(2.0), + MaximumAttempts: common.Int32Ptr(3), + NonRetriableErrorReasons: []string{"error1", "error2"}, + ExpirationIntervalInSeconds: common.Int32Ptr(10), + }, + }, + { + name: "zero backoff coefficient - use default", + retryPolicy: &RetryPolicy{ + InitialInterval: 1 * time.Second, + MaximumInterval: 10 * time.Second, + BackoffCoefficient: 0.0, + MaximumAttempts: 3, + NonRetriableErrorReasons: []string{"error1", "error2"}, + ExpirationInterval: 10 * time.Second, + }, + thriftPolicy: &s.RetryPolicy{ + InitialIntervalInSeconds: common.Int32Ptr(1), + MaximumIntervalInSeconds: common.Int32Ptr(10), + BackoffCoefficient: common.Float64Ptr(backoff.DefaultBackoffCoefficient), + MaximumAttempts: common.Int32Ptr(3), + NonRetriableErrorReasons: []string{"error1", "error2"}, + ExpirationIntervalInSeconds: common.Int32Ptr(10), + }, + }, + } + + for _, test := range tests { + thriftPolicy := convertRetryPolicy(test.retryPolicy) + assert.Equal(t, test.thriftPolicy, thriftPolicy) + } +} + +func TestConvertActiveClusterSelectionPolicy(t *testing.T) { + tests := []struct { + name string + policy *ActiveClusterSelectionPolicy + thriftPolicy *s.ActiveClusterSelectionPolicy + wantErr bool + }{ + { + name: "nil policy - return nil", + policy: nil, + thriftPolicy: nil, + }, + { + name: "region sticky policy", + policy: &ActiveClusterSelectionPolicy{ + Strategy: ActiveClusterSelectionStrategyRegionSticky, + }, + thriftPolicy: &s.ActiveClusterSelectionPolicy{ + Strategy: s.ActiveClusterSelectionStrategyRegionSticky.Ptr(), + }, + }, + { + name: "external entity policy - success", + policy: &ActiveClusterSelectionPolicy{ + Strategy: ActiveClusterSelectionStrategyExternalEntity, + ExternalEntityType: "test-type", + ExternalEntityKey: "test-key", + }, + thriftPolicy: &s.ActiveClusterSelectionPolicy{ + Strategy: s.ActiveClusterSelectionStrategyExternalEntity.Ptr(), + ExternalEntityType: common.StringPtr("test-type"), + ExternalEntityKey: common.StringPtr("test-key"), + }, + }, + { + name: "external entity policy - missing type", + policy: &ActiveClusterSelectionPolicy{ + Strategy: ActiveClusterSelectionStrategyExternalEntity, + ExternalEntityKey: "test-key", + }, + wantErr: true, + }, + { + name: "external entity policy - missing key", + policy: &ActiveClusterSelectionPolicy{ + Strategy: ActiveClusterSelectionStrategyExternalEntity, + ExternalEntityType: "test-type", + }, + wantErr: true, + }, + { + name: "invalid strategy", + policy: &ActiveClusterSelectionPolicy{ + Strategy: ActiveClusterSelectionStrategy(-1), + }, + wantErr: true, + }, + } + + for _, test := range tests { + thriftPolicy, err := convertActiveClusterSelectionPolicy(test.policy) + if test.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, test.thriftPolicy, thriftPolicy) + } + } +} diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 4b7f00428..b90962dac 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -1059,6 +1059,11 @@ func (wc *workflowClient) getWorkflowStartRequest( return nil, errors.New("Invalid FirstRunAt option") } + activeClusterSelectionPolicy, err := convertActiveClusterSelectionPolicy(options.ActiveClusterSelectionPolicy) + if err != nil { + return nil, err + } + // create a workflow start span and attach it to the context object. // N.B. we need to finish this immediately as jaeger does not give us a way // to recreate a span given a span context - which means we will run into @@ -1093,6 +1098,7 @@ func (wc *workflowClient) getWorkflowStartRequest( JitterStartSeconds: common.Int32Ptr(jitterStartSeconds), FirstRunAtTimestamp: common.Int64Ptr(firstRunAtTimestamp), CronOverlapPolicy: common.CronOverlapPolicyPtr(options.CronOverlapPolicy), + ActiveClusterSelectionPolicy: activeClusterSelectionPolicy, } return startRequest, nil @@ -1167,6 +1173,11 @@ func (wc *workflowClient) getSignalWithStartRequest( return nil, errors.New("Invalid FirstRunAt option") } + activeClusterSelectionPolicy, err := convertActiveClusterSelectionPolicy(options.ActiveClusterSelectionPolicy) + if err != nil { + return nil, err + } + // create a workflow start span and attach it to the context object. finish it immediately ctx, span := createOpenTracingWorkflowSpan(ctx, wc.tracer, time.Now(), fmt.Sprintf("%s-%s", tracePrefix, workflowType.Name), workflowID) span.Finish() @@ -1196,6 +1207,7 @@ func (wc *workflowClient) getSignalWithStartRequest( JitterStartSeconds: common.Int32Ptr(jitterStartSeconds), FirstRunAtTimestamp: common.Int64Ptr(firstRunAtTimestamp), CronOverlapPolicy: common.CronOverlapPolicyPtr(options.CronOverlapPolicy), + ActiveClusterSelectionPolicy: activeClusterSelectionPolicy, } return signalWithStartRequest, nil diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index 45911ff98..93bd43117 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -2540,6 +2540,7 @@ func TestGetWorkflowStartRequest(t *testing.T) { DelayStart: 0 * time.Second, JitterStart: 0 * time.Second, CronOverlapPolicy: shared.CronOverlapPolicyBufferone, + ActiveClusterSelectionPolicy: &ActiveClusterSelectionPolicy{Strategy: ActiveClusterSelectionStrategyRegionSticky}, }, workflowFunc: func(ctx Context) {}, wantRequest: &shared.StartWorkflowExecutionRequest{ @@ -2560,6 +2561,7 @@ func TestGetWorkflowStartRequest(t *testing.T) { Header: &shared.Header{Fields: map[string][]byte{}}, WorkflowIdReusePolicy: shared.WorkflowIdReusePolicyAllowDuplicateFailedOnly.Ptr(), CronOverlapPolicy: shared.CronOverlapPolicyBufferone.Ptr(), + ActiveClusterSelectionPolicy: &shared.ActiveClusterSelectionPolicy{Strategy: shared.ActiveClusterSelectionStrategyRegionSticky.Ptr()}, }, }, { @@ -2654,6 +2656,23 @@ func TestGetWorkflowStartRequest(t *testing.T) { args: []interface{}{}, wantErr: "expected 2 args for function", }, + { + name: "missing external entity type in active cluster selection policy", + options: StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: 10 * time.Second, + DecisionTaskStartToCloseTimeout: 5 * time.Second, + DelayStart: 0 * time.Second, + JitterStart: 0 * time.Second, + CronOverlapPolicy: shared.CronOverlapPolicyBufferone, + ActiveClusterSelectionPolicy: &ActiveClusterSelectionPolicy{ + Strategy: ActiveClusterSelectionStrategyExternalEntity, + }, + }, + workflowFunc: func(ctx Context) {}, + wantErr: "external entity type is required for external entity strategy", + }, } for _, tc := range tests { diff --git a/internal/workflow.go b/internal/workflow.go index ae2e491b3..51313a885 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -33,7 +33,6 @@ import ( s "go.uber.org/cadence/.gen/go/shared" "go.uber.org/cadence/internal/common" - "go.uber.org/cadence/internal/common/backoff" ) var ( @@ -1891,21 +1890,3 @@ func WithRetryPolicy(ctx Context, retryPolicy RetryPolicy) Context { getActivityOptions(ctx1).RetryPolicy = convertRetryPolicy(&retryPolicy) return ctx1 } - -func convertRetryPolicy(retryPolicy *RetryPolicy) *s.RetryPolicy { - if retryPolicy == nil { - return nil - } - thriftRetryPolicy := s.RetryPolicy{ - InitialIntervalInSeconds: common.Int32Ptr(common.Int32Ceil(retryPolicy.InitialInterval.Seconds())), - MaximumIntervalInSeconds: common.Int32Ptr(common.Int32Ceil(retryPolicy.MaximumInterval.Seconds())), - BackoffCoefficient: &retryPolicy.BackoffCoefficient, - MaximumAttempts: &retryPolicy.MaximumAttempts, - NonRetriableErrorReasons: retryPolicy.NonRetriableErrorReasons, - ExpirationIntervalInSeconds: common.Int32Ptr(common.Int32Ceil(retryPolicy.ExpirationInterval.Seconds())), - } - if *thriftRetryPolicy.BackoffCoefficient == 0 { - thriftRetryPolicy.BackoffCoefficient = common.Float64Ptr(backoff.DefaultBackoffCoefficient) - } - return &thriftRetryPolicy -}