Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Added
- Added ActiveClusterSelectionPolicy to workflow start options (#1438)

## [v1.3.0] - 2025-07-08
### Added
- Added PollerInitCount and reduced default PollerMaxCount in AutoScalerOptions (#1433)
Expand Down
28 changes: 28 additions & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,11 @@ type (
// Note: JitterStart remains used when determining the actual start time of executions.
// Optional: defaulted to CronOverlapPolicySkip
CronOverlapPolicy s.CronOverlapPolicy

// ActiveClusterSelectionPolicy - Policy for selecting the active cluster to start the workflow execution on for active-active domains.
// Note: This doesn't apply to local or active-passive domains.
// Optional: defaulted to region sticky strategy with receiver cluster as the active cluster.
ActiveClusterSelectionPolicy *ActiveClusterSelectionPolicy
}

// RetryPolicy defines the retry policy.
Expand Down Expand Up @@ -563,6 +568,29 @@ type (

// ParentClosePolicy defines the action on children when parent is closed
ParentClosePolicy int

// ActiveClusterSelectionPolicy defines the policy for selecting the active cluster to start the workflow execution on for active-active domains.
// Active-active domains can be configured to be active in multiple clusters (at most one in a given region).
// Individual workflows can be configured to be active in one of the active clusters of the domain.
//
// There are two supported strategies:
// - Region sticky: The workflow will be active in the active cluster of the region that start workflow request is sent to.
// - External entity: The workflow can be associated with an external entity which has a corresponding region.
// The workflow will be considered active in the active cluster of the region that the external entity is in.
// Cadence server must be aware of the external entity type used. Custom types can be registered following the documentation in
// https://github.com/cadence-workflow/cadence/blob/master/docs/design/active-active/active-active.md
ActiveClusterSelectionPolicy struct {
Strategy ActiveClusterSelectionStrategy
ExternalEntityType string
ExternalEntityKey string
}

ActiveClusterSelectionStrategy int
)

const (
ActiveClusterSelectionStrategyRegionSticky ActiveClusterSelectionStrategy = iota
ActiveClusterSelectionStrategyExternalEntity
)

const (
Expand Down
27 changes: 23 additions & 4 deletions internal/compatibility/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package compatibility

import (
"fmt"
"testing"

gogo "github.com/gogo/protobuf/types"
Expand Down Expand Up @@ -533,8 +534,17 @@ func TestSignalExternalWorkflowExecutionInitiatedEventAttributes(t *testing.T) {
}
}
func TestSignalWithStartWorkflowExecutionRequest(t *testing.T) {
for _, item := range []*apiv1.SignalWithStartWorkflowExecutionRequest{nil, {StartRequest: &apiv1.StartWorkflowExecutionRequest{}}, &testdata.SignalWithStartWorkflowExecutionRequest} {
assert.Equal(t, item, proto.SignalWithStartWorkflowExecutionRequest(thrift.SignalWithStartWorkflowExecutionRequest(item)))
tests := []*apiv1.SignalWithStartWorkflowExecutionRequest{
nil,
{StartRequest: &apiv1.StartWorkflowExecutionRequest{}},
&testdata.SignalWithStartWorkflowExecutionRequest,
&testdata.SignalWithStartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy1,
&testdata.SignalWithStartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy2,
}
for i, item := range tests {
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
assert.Equal(t, item, proto.SignalWithStartWorkflowExecutionRequest(thrift.SignalWithStartWorkflowExecutionRequest(item)))
})
}
}
func TestSignalWithStartWorkflowExecutionResponse(t *testing.T) {
Expand Down Expand Up @@ -563,8 +573,17 @@ func TestStartTimeFilter(t *testing.T) {
}
}
func TestStartWorkflowExecutionRequest(t *testing.T) {
for _, item := range []*apiv1.StartWorkflowExecutionRequest{nil, {}, &testdata.StartWorkflowExecutionRequest} {
assert.Equal(t, item, proto.StartWorkflowExecutionRequest(thrift.StartWorkflowExecutionRequest(item)))
tests := []*apiv1.StartWorkflowExecutionRequest{
nil,
{},
&testdata.StartWorkflowExecutionRequest,
&testdata.StartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy1,
&testdata.StartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy2,
}
for i, item := range tests {
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
assert.Equal(t, item, proto.StartWorkflowExecutionRequest(thrift.StartWorkflowExecutionRequest(item)))
})
}
}
func TestStartWorkflowExecutionResponse(t *testing.T) {
Expand Down
12 changes: 12 additions & 0 deletions internal/compatibility/enum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,3 +346,15 @@ func TestCronOverlapPolicy(t *testing.T) {
}
assert.Equal(t, apiv1.CronOverlapPolicy_CRON_OVERLAP_POLICY_INVALID, proto.CronOverlapPolicy(thrift.CronOverlapPolicy(999)))
}

func TestActiveClusterSelectionStrategy(t *testing.T) {
for _, v := range []apiv1.ActiveClusterSelectionStrategy{
apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_INVALID,
apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_REGION_STICKY,
apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_EXTERNAL_ENTITY,
} {
assert.Equal(t, v, proto.ActiveClusterSelectionStrategy(thrift.ActiveClusterSelectionStrategy(v)))
}

assert.Equal(t, apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_INVALID, proto.ActiveClusterSelectionStrategy(thrift.ActiveClusterSelectionStrategy(999)))
}
13 changes: 13 additions & 0 deletions internal/compatibility/proto/enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,16 @@ func CronOverlapPolicy(t *shared.CronOverlapPolicy) apiv1.CronOverlapPolicy {
}
return apiv1.CronOverlapPolicy_CRON_OVERLAP_POLICY_INVALID
}

func ActiveClusterSelectionStrategy(t *shared.ActiveClusterSelectionStrategy) apiv1.ActiveClusterSelectionStrategy {
if t == nil {
return apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_INVALID
}
switch *t {
case shared.ActiveClusterSelectionStrategyRegionSticky:
return apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_REGION_STICKY
case shared.ActiveClusterSelectionStrategyExternalEntity:
return apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_EXTERNAL_ENTITY
}
return apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_INVALID
}
37 changes: 37 additions & 0 deletions internal/compatibility/proto/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,8 @@ func SignalWithStartWorkflowExecutionRequest(t *shared.SignalWithStartWorkflowEx
DelayStart: secondsToDuration(t.DelayStartSeconds),
JitterStart: secondsToDuration(t.JitterStartSeconds),
FirstRunAt: unixNanoToTime(t.FirstRunAtTimestamp),
CronOverlapPolicy: CronOverlapPolicy(t.CronOverlapPolicy),
ActiveClusterSelectionPolicy: ActiveClusterSelectionPolicy(t.ActiveClusterSelectionPolicy),
},
SignalName: t.GetSignalName(),
SignalInput: Payload(t.SignalInput),
Expand Down Expand Up @@ -477,6 +479,8 @@ func StartWorkflowExecutionRequest(t *shared.StartWorkflowExecutionRequest) *api
DelayStart: secondsToDuration(t.DelayStartSeconds),
JitterStart: secondsToDuration(t.JitterStartSeconds),
FirstRunAt: unixNanoToTime(t.FirstRunAtTimestamp),
CronOverlapPolicy: CronOverlapPolicy(t.CronOverlapPolicy),
ActiveClusterSelectionPolicy: ActiveClusterSelectionPolicy(t.ActiveClusterSelectionPolicy),
}
}

Expand Down Expand Up @@ -680,3 +684,36 @@ func DeleteDomainRequest(r *shared.DeleteDomainRequest) *apiv1.DeleteDomainReque
SecurityToken: r.GetSecurityToken(),
}
}

func ActiveClusterSelectionPolicy(t *shared.ActiveClusterSelectionPolicy) *apiv1.ActiveClusterSelectionPolicy {
if t == nil {
return nil
}
plc := &apiv1.ActiveClusterSelectionPolicy{
Strategy: ActiveClusterSelectionStrategy(t.Strategy),
}

if plc.Strategy == apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_INVALID {
return nil
}

switch *t.Strategy {
case shared.ActiveClusterSelectionStrategyRegionSticky:
plc.StrategyConfig = &apiv1.ActiveClusterSelectionPolicy_ActiveClusterStickyRegionConfig{
ActiveClusterStickyRegionConfig: &apiv1.ActiveClusterStickyRegionConfig{
StickyRegion: *t.StickyRegion,
},
}
case shared.ActiveClusterSelectionStrategyExternalEntity:
plc.StrategyConfig = &apiv1.ActiveClusterSelectionPolicy_ActiveClusterExternalEntityConfig{
ActiveClusterExternalEntityConfig: &apiv1.ActiveClusterExternalEntityConfig{
ExternalEntityType: *t.ExternalEntityType,
ExternalEntityKey: *t.ExternalEntityKey,
},
}
default:
return nil
}

return plc
}
19 changes: 19 additions & 0 deletions internal/compatibility/testdata/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,25 @@ var (
"IndexedField2": &apiv1.Payload{Data: []byte{232, 0}},
},
}

ActiveClusterSelectionPolicyRegionSticky = &apiv1.ActiveClusterSelectionPolicy{
Strategy: apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_REGION_STICKY,
StrategyConfig: &apiv1.ActiveClusterSelectionPolicy_ActiveClusterStickyRegionConfig{
ActiveClusterStickyRegionConfig: &apiv1.ActiveClusterStickyRegionConfig{
StickyRegion: "us-east-1",
},
},
}
ActiveClusterSelectionPolicyExternalEntity = &apiv1.ActiveClusterSelectionPolicy{
Strategy: apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_EXTERNAL_ENTITY,
StrategyConfig: &apiv1.ActiveClusterSelectionPolicy_ActiveClusterExternalEntityConfig{
ActiveClusterExternalEntityConfig: &apiv1.ActiveClusterExternalEntityConfig{
ExternalEntityType: "external-entity-type",
ExternalEntityKey: "external-entity-key",
},
},
}

PayloadMap = map[string]*apiv1.Payload{
"Payload1": &Payload1,
"Payload2": &Payload2,
Expand Down
50 changes: 50 additions & 0 deletions internal/compatibility/testdata/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,44 @@ var (
SearchAttributes: &SearchAttributes,
Header: &Header,
}
StartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy1 = apiv1.StartWorkflowExecutionRequest{
Domain: DomainName,
WorkflowId: WorkflowID,
WorkflowType: &WorkflowType,
TaskList: &TaskList,
Input: &Payload1,
ExecutionStartToCloseTimeout: Duration1,
TaskStartToCloseTimeout: Duration2,
Identity: Identity,
RequestId: RequestID,
WorkflowIdReusePolicy: WorkflowIDReusePolicy,
RetryPolicy: &RetryPolicy,
CronSchedule: CronSchedule,
Memo: &Memo,
SearchAttributes: &SearchAttributes,
Header: &Header,
CronOverlapPolicy: apiv1.CronOverlapPolicy_CRON_OVERLAP_POLICY_SKIPPED,
ActiveClusterSelectionPolicy: ActiveClusterSelectionPolicyRegionSticky,
}
StartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy2 = apiv1.StartWorkflowExecutionRequest{
Domain: DomainName,
WorkflowId: WorkflowID,
WorkflowType: &WorkflowType,
TaskList: &TaskList,
Input: &Payload1,
ExecutionStartToCloseTimeout: Duration1,
TaskStartToCloseTimeout: Duration2,
Identity: Identity,
RequestId: RequestID,
WorkflowIdReusePolicy: WorkflowIDReusePolicy,
RetryPolicy: &RetryPolicy,
CronSchedule: CronSchedule,
Memo: &Memo,
SearchAttributes: &SearchAttributes,
Header: &Header,
CronOverlapPolicy: apiv1.CronOverlapPolicy_CRON_OVERLAP_POLICY_BUFFER_ONE,
ActiveClusterSelectionPolicy: ActiveClusterSelectionPolicyExternalEntity,
}
StartWorkflowExecutionResponse = apiv1.StartWorkflowExecutionResponse{
RunId: RunID,
}
Expand All @@ -352,6 +390,18 @@ var (
SignalInput: &Payload2,
Control: Control,
}
SignalWithStartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy1 = apiv1.SignalWithStartWorkflowExecutionRequest{
StartRequest: &StartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy1,
SignalName: SignalName,
SignalInput: &Payload2,
Control: Control,
}
SignalWithStartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy2 = apiv1.SignalWithStartWorkflowExecutionRequest{
StartRequest: &StartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy2,
SignalName: SignalName,
SignalInput: &Payload2,
Control: Control,
}
SignalWithStartWorkflowExecutionResponse = apiv1.SignalWithStartWorkflowExecutionResponse{
RunId: RunID,
}
Expand Down
13 changes: 13 additions & 0 deletions internal/compatibility/thrift/enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,3 +394,16 @@ func CronOverlapPolicy(t apiv1.CronOverlapPolicy) *shared.CronOverlapPolicy {
// we treat any unknown value as invalid
return nil
}

func ActiveClusterSelectionStrategy(t apiv1.ActiveClusterSelectionStrategy) *shared.ActiveClusterSelectionStrategy {
switch t {
case apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_INVALID:
return nil
case apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_REGION_STICKY:
return shared.ActiveClusterSelectionStrategyRegionSticky.Ptr()
case apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_EXTERNAL_ENTITY:
return shared.ActiveClusterSelectionStrategyExternalEntity.Ptr()
}
// we treat any unknown value as invalid
return nil
}
28 changes: 28 additions & 0 deletions internal/compatibility/thrift/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package thrift

import (
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/internal/common"

apiv1 "github.com/uber/cadence-idl/go/proto/api/v1"
)
Expand Down Expand Up @@ -432,6 +433,8 @@ func SignalWithStartWorkflowExecutionRequest(t *apiv1.SignalWithStartWorkflowExe
request.Memo = Memo(t.StartRequest.Memo)
request.SearchAttributes = SearchAttributes(t.StartRequest.SearchAttributes)
request.Header = Header(t.StartRequest.Header)
request.CronOverlapPolicy = CronOverlapPolicy(t.StartRequest.CronOverlapPolicy)
request.ActiveClusterSelectionPolicy = ActiveClusterSelectionPolicy(t.StartRequest.ActiveClusterSelectionPolicy)
}

return request
Expand Down Expand Up @@ -475,6 +478,8 @@ func StartWorkflowExecutionRequest(t *apiv1.StartWorkflowExecutionRequest) *shar
DelayStartSeconds: durationToSeconds(t.DelayStart),
JitterStartSeconds: durationToSeconds(t.JitterStart),
FirstRunAtTimestamp: timeToUnixNano(t.FirstRunAt),
CronOverlapPolicy: CronOverlapPolicy(t.CronOverlapPolicy),
ActiveClusterSelectionPolicy: ActiveClusterSelectionPolicy(t.ActiveClusterSelectionPolicy),
}
}

Expand Down Expand Up @@ -592,3 +597,26 @@ func ListOpenWorkflowExecutionsRequest(r *apiv1.ListOpenWorkflowExecutionsReques
TypeFilter: WorkflowTypeFilter(r.GetTypeFilter()),
}
}

func ActiveClusterSelectionPolicy(t *apiv1.ActiveClusterSelectionPolicy) *shared.ActiveClusterSelectionPolicy {
if t == nil {
return nil
}
plc := &shared.ActiveClusterSelectionPolicy{
Strategy: ActiveClusterSelectionStrategy(t.Strategy),
}

if plc.Strategy == nil {
return nil
}

switch *plc.Strategy {
case shared.ActiveClusterSelectionStrategyRegionSticky:
plc.StickyRegion = common.StringPtr(t.GetActiveClusterStickyRegionConfig().GetStickyRegion())
case shared.ActiveClusterSelectionStrategyExternalEntity:
plc.ExternalEntityType = common.StringPtr(t.GetActiveClusterExternalEntityConfig().GetExternalEntityType())
plc.ExternalEntityKey = common.StringPtr(t.GetActiveClusterExternalEntityConfig().GetExternalEntityKey())
}

return plc
}
Loading