Skip to content

Commit 40074ec

Browse files
Add active cluster selection policy to workflow start options (#1438)
1 parent e9e55a9 commit 40074ec

File tree

15 files changed

+491
-23
lines changed

15 files changed

+491
-23
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

7+
## [Unreleased]
8+
### Added
9+
- Added ActiveClusterSelectionPolicy to workflow start options (#1438)
10+
711
## [v1.3.0] - 2025-07-08
812
### Added
913
- Added PollerInitCount and reduced default PollerMaxCount in AutoScalerOptions (#1433)

internal/client.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,11 @@ type (
490490
// Note: JitterStart remains used when determining the actual start time of executions.
491491
// Optional: defaulted to CronOverlapPolicySkip
492492
CronOverlapPolicy s.CronOverlapPolicy
493+
494+
// ActiveClusterSelectionPolicy - Policy for selecting the active cluster to start the workflow execution on for active-active domains.
495+
// Note: This doesn't apply to local or active-passive domains.
496+
// Optional: defaulted to region sticky strategy with receiver cluster as the active cluster.
497+
ActiveClusterSelectionPolicy *ActiveClusterSelectionPolicy
493498
}
494499

495500
// RetryPolicy defines the retry policy.
@@ -563,6 +568,29 @@ type (
563568

564569
// ParentClosePolicy defines the action on children when parent is closed
565570
ParentClosePolicy int
571+
572+
// ActiveClusterSelectionPolicy defines the policy for selecting the active cluster to start the workflow execution on for active-active domains.
573+
// Active-active domains can be configured to be active in multiple clusters (at most one in a given region).
574+
// Individual workflows can be configured to be active in one of the active clusters of the domain.
575+
//
576+
// There are two supported strategies:
577+
// - Region sticky: The workflow will be active in the active cluster of the region that start workflow request is sent to.
578+
// - External entity: The workflow can be associated with an external entity which has a corresponding region.
579+
// The workflow will be considered active in the active cluster of the region that the external entity is in.
580+
// Cadence server must be aware of the external entity type used. Custom types can be registered following the documentation in
581+
// https://github.com/cadence-workflow/cadence/blob/master/docs/design/active-active/active-active.md
582+
ActiveClusterSelectionPolicy struct {
583+
Strategy ActiveClusterSelectionStrategy
584+
ExternalEntityType string
585+
ExternalEntityKey string
586+
}
587+
588+
ActiveClusterSelectionStrategy int
589+
)
590+
591+
const (
592+
ActiveClusterSelectionStrategyRegionSticky ActiveClusterSelectionStrategy = iota
593+
ActiveClusterSelectionStrategyExternalEntity
566594
)
567595

568596
const (

internal/compatibility/api_test.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package compatibility
2222

2323
import (
24+
"fmt"
2425
"testing"
2526

2627
gogo "github.com/gogo/protobuf/types"
@@ -533,8 +534,17 @@ func TestSignalExternalWorkflowExecutionInitiatedEventAttributes(t *testing.T) {
533534
}
534535
}
535536
func TestSignalWithStartWorkflowExecutionRequest(t *testing.T) {
536-
for _, item := range []*apiv1.SignalWithStartWorkflowExecutionRequest{nil, {StartRequest: &apiv1.StartWorkflowExecutionRequest{}}, &testdata.SignalWithStartWorkflowExecutionRequest} {
537-
assert.Equal(t, item, proto.SignalWithStartWorkflowExecutionRequest(thrift.SignalWithStartWorkflowExecutionRequest(item)))
537+
tests := []*apiv1.SignalWithStartWorkflowExecutionRequest{
538+
nil,
539+
{StartRequest: &apiv1.StartWorkflowExecutionRequest{}},
540+
&testdata.SignalWithStartWorkflowExecutionRequest,
541+
&testdata.SignalWithStartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy1,
542+
&testdata.SignalWithStartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy2,
543+
}
544+
for i, item := range tests {
545+
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
546+
assert.Equal(t, item, proto.SignalWithStartWorkflowExecutionRequest(thrift.SignalWithStartWorkflowExecutionRequest(item)))
547+
})
538548
}
539549
}
540550
func TestSignalWithStartWorkflowExecutionResponse(t *testing.T) {
@@ -563,8 +573,17 @@ func TestStartTimeFilter(t *testing.T) {
563573
}
564574
}
565575
func TestStartWorkflowExecutionRequest(t *testing.T) {
566-
for _, item := range []*apiv1.StartWorkflowExecutionRequest{nil, {}, &testdata.StartWorkflowExecutionRequest} {
567-
assert.Equal(t, item, proto.StartWorkflowExecutionRequest(thrift.StartWorkflowExecutionRequest(item)))
576+
tests := []*apiv1.StartWorkflowExecutionRequest{
577+
nil,
578+
{},
579+
&testdata.StartWorkflowExecutionRequest,
580+
&testdata.StartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy1,
581+
&testdata.StartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy2,
582+
}
583+
for i, item := range tests {
584+
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
585+
assert.Equal(t, item, proto.StartWorkflowExecutionRequest(thrift.StartWorkflowExecutionRequest(item)))
586+
})
568587
}
569588
}
570589
func TestStartWorkflowExecutionResponse(t *testing.T) {

internal/compatibility/enum_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,3 +346,15 @@ func TestCronOverlapPolicy(t *testing.T) {
346346
}
347347
assert.Equal(t, apiv1.CronOverlapPolicy_CRON_OVERLAP_POLICY_INVALID, proto.CronOverlapPolicy(thrift.CronOverlapPolicy(999)))
348348
}
349+
350+
func TestActiveClusterSelectionStrategy(t *testing.T) {
351+
for _, v := range []apiv1.ActiveClusterSelectionStrategy{
352+
apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_INVALID,
353+
apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_REGION_STICKY,
354+
apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_EXTERNAL_ENTITY,
355+
} {
356+
assert.Equal(t, v, proto.ActiveClusterSelectionStrategy(thrift.ActiveClusterSelectionStrategy(v)))
357+
}
358+
359+
assert.Equal(t, apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_INVALID, proto.ActiveClusterSelectionStrategy(thrift.ActiveClusterSelectionStrategy(999)))
360+
}

internal/compatibility/proto/enum.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,3 +406,16 @@ func CronOverlapPolicy(t *shared.CronOverlapPolicy) apiv1.CronOverlapPolicy {
406406
}
407407
return apiv1.CronOverlapPolicy_CRON_OVERLAP_POLICY_INVALID
408408
}
409+
410+
func ActiveClusterSelectionStrategy(t *shared.ActiveClusterSelectionStrategy) apiv1.ActiveClusterSelectionStrategy {
411+
if t == nil {
412+
return apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_INVALID
413+
}
414+
switch *t {
415+
case shared.ActiveClusterSelectionStrategyRegionSticky:
416+
return apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_REGION_STICKY
417+
case shared.ActiveClusterSelectionStrategyExternalEntity:
418+
return apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_EXTERNAL_ENTITY
419+
}
420+
return apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_INVALID
421+
}

internal/compatibility/proto/request.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,8 @@ func SignalWithStartWorkflowExecutionRequest(t *shared.SignalWithStartWorkflowEx
422422
DelayStart: secondsToDuration(t.DelayStartSeconds),
423423
JitterStart: secondsToDuration(t.JitterStartSeconds),
424424
FirstRunAt: unixNanoToTime(t.FirstRunAtTimestamp),
425+
CronOverlapPolicy: CronOverlapPolicy(t.CronOverlapPolicy),
426+
ActiveClusterSelectionPolicy: ActiveClusterSelectionPolicy(t.ActiveClusterSelectionPolicy),
425427
},
426428
SignalName: t.GetSignalName(),
427429
SignalInput: Payload(t.SignalInput),
@@ -477,6 +479,8 @@ func StartWorkflowExecutionRequest(t *shared.StartWorkflowExecutionRequest) *api
477479
DelayStart: secondsToDuration(t.DelayStartSeconds),
478480
JitterStart: secondsToDuration(t.JitterStartSeconds),
479481
FirstRunAt: unixNanoToTime(t.FirstRunAtTimestamp),
482+
CronOverlapPolicy: CronOverlapPolicy(t.CronOverlapPolicy),
483+
ActiveClusterSelectionPolicy: ActiveClusterSelectionPolicy(t.ActiveClusterSelectionPolicy),
480484
}
481485
}
482486

@@ -680,3 +684,36 @@ func DeleteDomainRequest(r *shared.DeleteDomainRequest) *apiv1.DeleteDomainReque
680684
SecurityToken: r.GetSecurityToken(),
681685
}
682686
}
687+
688+
func ActiveClusterSelectionPolicy(t *shared.ActiveClusterSelectionPolicy) *apiv1.ActiveClusterSelectionPolicy {
689+
if t == nil {
690+
return nil
691+
}
692+
plc := &apiv1.ActiveClusterSelectionPolicy{
693+
Strategy: ActiveClusterSelectionStrategy(t.Strategy),
694+
}
695+
696+
if plc.Strategy == apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_INVALID {
697+
return nil
698+
}
699+
700+
switch *t.Strategy {
701+
case shared.ActiveClusterSelectionStrategyRegionSticky:
702+
plc.StrategyConfig = &apiv1.ActiveClusterSelectionPolicy_ActiveClusterStickyRegionConfig{
703+
ActiveClusterStickyRegionConfig: &apiv1.ActiveClusterStickyRegionConfig{
704+
StickyRegion: *t.StickyRegion,
705+
},
706+
}
707+
case shared.ActiveClusterSelectionStrategyExternalEntity:
708+
plc.StrategyConfig = &apiv1.ActiveClusterSelectionPolicy_ActiveClusterExternalEntityConfig{
709+
ActiveClusterExternalEntityConfig: &apiv1.ActiveClusterExternalEntityConfig{
710+
ExternalEntityType: *t.ExternalEntityType,
711+
ExternalEntityKey: *t.ExternalEntityKey,
712+
},
713+
}
714+
default:
715+
return nil
716+
}
717+
718+
return plc
719+
}

internal/compatibility/testdata/common.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,25 @@ var (
169169
"IndexedField2": &apiv1.Payload{Data: []byte{232, 0}},
170170
},
171171
}
172+
173+
ActiveClusterSelectionPolicyRegionSticky = &apiv1.ActiveClusterSelectionPolicy{
174+
Strategy: apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_REGION_STICKY,
175+
StrategyConfig: &apiv1.ActiveClusterSelectionPolicy_ActiveClusterStickyRegionConfig{
176+
ActiveClusterStickyRegionConfig: &apiv1.ActiveClusterStickyRegionConfig{
177+
StickyRegion: "us-east-1",
178+
},
179+
},
180+
}
181+
ActiveClusterSelectionPolicyExternalEntity = &apiv1.ActiveClusterSelectionPolicy{
182+
Strategy: apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_EXTERNAL_ENTITY,
183+
StrategyConfig: &apiv1.ActiveClusterSelectionPolicy_ActiveClusterExternalEntityConfig{
184+
ActiveClusterExternalEntityConfig: &apiv1.ActiveClusterExternalEntityConfig{
185+
ExternalEntityType: "external-entity-type",
186+
ExternalEntityKey: "external-entity-key",
187+
},
188+
},
189+
}
190+
172191
PayloadMap = map[string]*apiv1.Payload{
173192
"Payload1": &Payload1,
174193
"Payload2": &Payload2,

internal/compatibility/testdata/service.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,44 @@ var (
334334
SearchAttributes: &SearchAttributes,
335335
Header: &Header,
336336
}
337+
StartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy1 = apiv1.StartWorkflowExecutionRequest{
338+
Domain: DomainName,
339+
WorkflowId: WorkflowID,
340+
WorkflowType: &WorkflowType,
341+
TaskList: &TaskList,
342+
Input: &Payload1,
343+
ExecutionStartToCloseTimeout: Duration1,
344+
TaskStartToCloseTimeout: Duration2,
345+
Identity: Identity,
346+
RequestId: RequestID,
347+
WorkflowIdReusePolicy: WorkflowIDReusePolicy,
348+
RetryPolicy: &RetryPolicy,
349+
CronSchedule: CronSchedule,
350+
Memo: &Memo,
351+
SearchAttributes: &SearchAttributes,
352+
Header: &Header,
353+
CronOverlapPolicy: apiv1.CronOverlapPolicy_CRON_OVERLAP_POLICY_SKIPPED,
354+
ActiveClusterSelectionPolicy: ActiveClusterSelectionPolicyRegionSticky,
355+
}
356+
StartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy2 = apiv1.StartWorkflowExecutionRequest{
357+
Domain: DomainName,
358+
WorkflowId: WorkflowID,
359+
WorkflowType: &WorkflowType,
360+
TaskList: &TaskList,
361+
Input: &Payload1,
362+
ExecutionStartToCloseTimeout: Duration1,
363+
TaskStartToCloseTimeout: Duration2,
364+
Identity: Identity,
365+
RequestId: RequestID,
366+
WorkflowIdReusePolicy: WorkflowIDReusePolicy,
367+
RetryPolicy: &RetryPolicy,
368+
CronSchedule: CronSchedule,
369+
Memo: &Memo,
370+
SearchAttributes: &SearchAttributes,
371+
Header: &Header,
372+
CronOverlapPolicy: apiv1.CronOverlapPolicy_CRON_OVERLAP_POLICY_BUFFER_ONE,
373+
ActiveClusterSelectionPolicy: ActiveClusterSelectionPolicyExternalEntity,
374+
}
337375
StartWorkflowExecutionResponse = apiv1.StartWorkflowExecutionResponse{
338376
RunId: RunID,
339377
}
@@ -352,6 +390,18 @@ var (
352390
SignalInput: &Payload2,
353391
Control: Control,
354392
}
393+
SignalWithStartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy1 = apiv1.SignalWithStartWorkflowExecutionRequest{
394+
StartRequest: &StartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy1,
395+
SignalName: SignalName,
396+
SignalInput: &Payload2,
397+
Control: Control,
398+
}
399+
SignalWithStartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy2 = apiv1.SignalWithStartWorkflowExecutionRequest{
400+
StartRequest: &StartWorkflowExecutionRequestWithCronAndActiveClusterSelectionPolicy2,
401+
SignalName: SignalName,
402+
SignalInput: &Payload2,
403+
Control: Control,
404+
}
355405
SignalWithStartWorkflowExecutionResponse = apiv1.SignalWithStartWorkflowExecutionResponse{
356406
RunId: RunID,
357407
}

internal/compatibility/thrift/enum.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,3 +394,16 @@ func CronOverlapPolicy(t apiv1.CronOverlapPolicy) *shared.CronOverlapPolicy {
394394
// we treat any unknown value as invalid
395395
return nil
396396
}
397+
398+
func ActiveClusterSelectionStrategy(t apiv1.ActiveClusterSelectionStrategy) *shared.ActiveClusterSelectionStrategy {
399+
switch t {
400+
case apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_INVALID:
401+
return nil
402+
case apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_REGION_STICKY:
403+
return shared.ActiveClusterSelectionStrategyRegionSticky.Ptr()
404+
case apiv1.ActiveClusterSelectionStrategy_ACTIVE_CLUSTER_SELECTION_STRATEGY_EXTERNAL_ENTITY:
405+
return shared.ActiveClusterSelectionStrategyExternalEntity.Ptr()
406+
}
407+
// we treat any unknown value as invalid
408+
return nil
409+
}

internal/compatibility/thrift/request.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package thrift
2222

2323
import (
2424
"go.uber.org/cadence/.gen/go/shared"
25+
"go.uber.org/cadence/internal/common"
2526

2627
apiv1 "github.com/uber/cadence-idl/go/proto/api/v1"
2728
)
@@ -432,6 +433,8 @@ func SignalWithStartWorkflowExecutionRequest(t *apiv1.SignalWithStartWorkflowExe
432433
request.Memo = Memo(t.StartRequest.Memo)
433434
request.SearchAttributes = SearchAttributes(t.StartRequest.SearchAttributes)
434435
request.Header = Header(t.StartRequest.Header)
436+
request.CronOverlapPolicy = CronOverlapPolicy(t.StartRequest.CronOverlapPolicy)
437+
request.ActiveClusterSelectionPolicy = ActiveClusterSelectionPolicy(t.StartRequest.ActiveClusterSelectionPolicy)
435438
}
436439

437440
return request
@@ -475,6 +478,8 @@ func StartWorkflowExecutionRequest(t *apiv1.StartWorkflowExecutionRequest) *shar
475478
DelayStartSeconds: durationToSeconds(t.DelayStart),
476479
JitterStartSeconds: durationToSeconds(t.JitterStart),
477480
FirstRunAtTimestamp: timeToUnixNano(t.FirstRunAt),
481+
CronOverlapPolicy: CronOverlapPolicy(t.CronOverlapPolicy),
482+
ActiveClusterSelectionPolicy: ActiveClusterSelectionPolicy(t.ActiveClusterSelectionPolicy),
478483
}
479484
}
480485

@@ -592,3 +597,26 @@ func ListOpenWorkflowExecutionsRequest(r *apiv1.ListOpenWorkflowExecutionsReques
592597
TypeFilter: WorkflowTypeFilter(r.GetTypeFilter()),
593598
}
594599
}
600+
601+
func ActiveClusterSelectionPolicy(t *apiv1.ActiveClusterSelectionPolicy) *shared.ActiveClusterSelectionPolicy {
602+
if t == nil {
603+
return nil
604+
}
605+
plc := &shared.ActiveClusterSelectionPolicy{
606+
Strategy: ActiveClusterSelectionStrategy(t.Strategy),
607+
}
608+
609+
if plc.Strategy == nil {
610+
return nil
611+
}
612+
613+
switch *plc.Strategy {
614+
case shared.ActiveClusterSelectionStrategyRegionSticky:
615+
plc.StickyRegion = common.StringPtr(t.GetActiveClusterStickyRegionConfig().GetStickyRegion())
616+
case shared.ActiveClusterSelectionStrategyExternalEntity:
617+
plc.ExternalEntityType = common.StringPtr(t.GetActiveClusterExternalEntityConfig().GetExternalEntityType())
618+
plc.ExternalEntityKey = common.StringPtr(t.GetActiveClusterExternalEntityConfig().GetExternalEntityKey())
619+
}
620+
621+
return plc
622+
}

0 commit comments

Comments
 (0)