Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,6 @@ func (entry *DomainCacheEntry) duplicate() *DomainCacheEntry {
result.failoverEndTime = entry.failoverEndTime
result.notificationVersion = entry.notificationVersion
result.initialized = entry.initialized
result.activeClusters = entry.activeClusters
return result
}

Expand Down Expand Up @@ -904,9 +903,11 @@ func (entry *DomainCacheEntry) IsActiveIn(currentCluster string) bool {
}

if entry.GetReplicationConfig().IsActiveActive() {
for _, cl := range entry.GetReplicationConfig().ActiveClusters.ActiveClustersByRegion {
if cl.ActiveClusterName == currentCluster {
return true
for _, scope := range entry.GetReplicationConfig().ActiveClusters.AttributeScopes {
for _, cl := range scope.ClusterAttributes {
if cl.ActiveClusterName == currentCluster {
return true
}
}
}

Expand Down Expand Up @@ -1064,9 +1065,11 @@ func getActiveClusters(replicationConfig *persistence.DomainReplicationConfig) [
if !replicationConfig.IsActiveActive() {
return nil
}
activeClusters := make([]string, 0, len(replicationConfig.ActiveClusters.ActiveClustersByRegion))
for _, cl := range replicationConfig.ActiveClusters.ActiveClustersByRegion {
activeClusters = append(activeClusters, cl.ActiveClusterName)
activeClusters := make([]string, 0, len(replicationConfig.ActiveClusters.AttributeScopes))
for _, scope := range replicationConfig.ActiveClusters.AttributeScopes {
for _, cl := range scope.ClusterAttributes {
activeClusters = append(activeClusters, cl.ActiveClusterName)
}
}
sort.Strings(activeClusters)
return activeClusters
Expand Down
52 changes: 32 additions & 20 deletions common/cache/domainCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,13 @@ func Test_IsActiveIn(t *testing.T) {
isGlobalDomain: true,
currentCluster: "A",
activeClusters: &types.ActiveClusters{
ActiveClustersByRegion: map[string]types.ActiveClusterInfo{
"region0": {ActiveClusterName: "A"},
"region1": {ActiveClusterName: "B"},
AttributeScopes: map[string]types.ClusterAttributeScope{
"region": {
ClusterAttributes: map[string]types.ActiveClusterInfo{
"region0": {ActiveClusterName: "A"},
"region1": {ActiveClusterName: "B"},
},
},
},
},
expectIsActive: true,
Expand All @@ -342,9 +346,13 @@ func Test_IsActiveIn(t *testing.T) {
isGlobalDomain: true,
currentCluster: "C",
activeClusters: &types.ActiveClusters{
ActiveClustersByRegion: map[string]types.ActiveClusterInfo{
"region0": {ActiveClusterName: "A"},
"region1": {ActiveClusterName: "B"},
AttributeScopes: map[string]types.ClusterAttributeScope{
"region": {
ClusterAttributes: map[string]types.ActiveClusterInfo{
"region0": {ActiveClusterName: "A"},
"region1": {ActiveClusterName: "B"},
},
},
},
},
expectIsActive: false,
Expand Down Expand Up @@ -1223,14 +1231,16 @@ func Test_NewDomainNotActiveError(t *testing.T) {
nil,
true,
&persistence.DomainReplicationConfig{
ActiveClusters: &types.ActiveClusters{ActiveClustersByRegion: map[string]types.ActiveClusterInfo{
"region1": {
ActiveClusterName: "cluster1",
},
"region2": {
ActiveClusterName: "cluster2",
ActiveClusters: &types.ActiveClusters{
AttributeScopes: map[string]types.ClusterAttributeScope{
"region": {
ClusterAttributes: map[string]types.ActiveClusterInfo{
"region1": {ActiveClusterName: "cluster1"},
"region2": {ActiveClusterName: "cluster2"},
},
},
},
}},
},
},
0,
nil,
Expand Down Expand Up @@ -1273,14 +1283,16 @@ func Test_getActiveClusters(t *testing.T) {
msg: "active-active domain",
replicationConfig: &persistence.DomainReplicationConfig{
ActiveClusterName: "active",
ActiveClusters: &types.ActiveClusters{ActiveClustersByRegion: map[string]types.ActiveClusterInfo{
"region1": {
ActiveClusterName: "cluster1",
},
"region2": {
ActiveClusterName: "cluster2",
ActiveClusters: &types.ActiveClusters{
AttributeScopes: map[string]types.ClusterAttributeScope{
"region": {
ClusterAttributes: map[string]types.ActiveClusterInfo{
"region1": {ActiveClusterName: "cluster1"},
"region2": {ActiveClusterName: "cluster2"},
},
},
},
}},
},
},
expectedActiveClusters: []string{"cluster1", "cluster2"},
},
Expand Down
12 changes: 12 additions & 0 deletions common/types/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -2600,8 +2600,13 @@ func (v *ActiveClusters) DeepCopy() *ActiveClusters {
for region, activeClusterInfo := range v.ActiveClustersByRegion {
activeClustersByRegion[region] = activeClusterInfo
}
attributeScopes := make(map[string]ClusterAttributeScope)
for scopeType, scope := range v.AttributeScopes {
attributeScopes[scopeType] = scope
}
return &ActiveClusters{
ActiveClustersByRegion: activeClustersByRegion,
AttributeScopes: attributeScopes,
}
}

Expand Down Expand Up @@ -2639,6 +2644,13 @@ type ActiveClusterSelectionPolicy struct {
ClusterAttribute *ClusterAttribute `json:"clusterAttribute,omitempty" yaml:"clusterAttribute,omitempty"`
}

func (p *ActiveClusterSelectionPolicy) GetClusterAttribute() *ClusterAttribute {
if p == nil {
return nil
}
return p.ClusterAttribute
}

func (p *ActiveClusterSelectionPolicy) Equals(other *ActiveClusterSelectionPolicy) bool {
if p == nil && other == nil {
return true
Expand Down
8 changes: 8 additions & 0 deletions common/types/shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ func TestActiveClustersConfigDeepCopy(t *testing.T) {
FailoverVersion: 2,
},
},
AttributeScopes: map[string]ClusterAttributeScope{
"region": {
ClusterAttributes: map[string]ActiveClusterInfo{
"us-east-1": {ActiveClusterName: "us-east-1-cluster", FailoverVersion: 1},
},
},
},
}

tests := []struct {
Expand All @@ -101,6 +108,7 @@ func TestActiveClustersConfigDeepCopy(t *testing.T) {
input: &ActiveClusters{},
expect: &ActiveClusters{
ActiveClustersByRegion: map[string]ActiveClusterInfo{},
AttributeScopes: map[string]ClusterAttributeScope{},
},
},
{
Expand Down
8 changes: 0 additions & 8 deletions service/frontend/templates/clusterredirection.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,6 @@ func (handler *clusterRedirectionHandler) {{$method.Declaration}} {
var actClSelPolicyForNewWF *types.ActiveClusterSelectionPolicy
var workflowExecution *types.WorkflowExecution
{{- if has $method.Name $startWFAPIs}}
if {{(index $method.Params 1).Name}}.ActiveClusterSelectionPolicy == nil {
{{(index $method.Params 1).Name}}.ActiveClusterSelectionPolicy = &types.ActiveClusterSelectionPolicy{
ActiveClusterSelectionStrategy: types.ActiveClusterSelectionStrategyRegionSticky.Ptr(),
StickyRegion: handler.GetActiveClusterManager().CurrentRegion(),
}
} else if {{(index $method.Params 1).Name}}.ActiveClusterSelectionPolicy.GetStrategy() == types.ActiveClusterSelectionStrategyRegionSticky {
{{(index $method.Params 1).Name}}.ActiveClusterSelectionPolicy.StickyRegion = handler.GetActiveClusterManager().CurrentRegion()
}
actClSelPolicyForNewWF = {{(index $method.Params 1).Name}}.ActiveClusterSelectionPolicy
{{- else if has $method.Name $nonstartWFAPIs}}
workflowExecution = {{(index $method.Params 1).Name}}.GetWorkflowExecution()
Expand Down
32 changes: 0 additions & 32 deletions service/frontend/wrappers/clusterredirection/api_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions service/frontend/wrappers/clusterredirection/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,25 +436,25 @@ func (policy *selectedOrAllAPIsForwardingRedirectionPolicy) activeClusterForActi
policy.logger.Debug("Determining active cluster for active-active domain request", tag.WorkflowDomainName(domainEntry.GetInfo().Name), tag.Dynamic("execution", workflowExecution), tag.OperationName(apiName))
if actClSelPolicyForNewWF != nil {
policy.logger.Debug("Active cluster selection policy for new workflow", tag.WorkflowDomainName(domainEntry.GetInfo().Name), tag.OperationName(apiName), tag.Dynamic("policy", actClSelPolicyForNewWF))
lookupRes, err := policy.activeClusterManager.LookupNewWorkflow(ctx, domainEntry.GetInfo().ID, actClSelPolicyForNewWF)
activeClusterInfo, err := policy.activeClusterManager.GetActiveClusterInfoByClusterAttribute(ctx, domainEntry.GetInfo().ID, actClSelPolicyForNewWF.GetClusterAttribute())
if err != nil {
policy.logger.Error("Failed to lookup active cluster of new workflow, using current cluster", tag.WorkflowDomainName(domainEntry.GetInfo().Name), tag.OperationName(apiName), tag.Error(err))
return policy.currentClusterName
}
return lookupRes.ClusterName
return activeClusterInfo.ActiveClusterName
}

if workflowExecution == nil || workflowExecution.WorkflowID == "" || workflowExecution.RunID == "" {
policy.logger.Debug("Workflow execution is nil or workflow id or run id is empty, using current cluster", tag.WorkflowDomainName(domainEntry.GetInfo().Name), tag.OperationName(apiName))
return policy.currentClusterName
}

lookupRes, err := policy.activeClusterManager.LookupWorkflow(ctx, domainEntry.GetInfo().ID, workflowExecution.WorkflowID, workflowExecution.RunID)
activeClusterInfo, err := policy.activeClusterManager.GetActiveClusterInfoByWorkflow(ctx, domainEntry.GetInfo().ID, workflowExecution.WorkflowID, workflowExecution.RunID)
if err != nil {
policy.logger.Error("Failed to lookup active cluster of workflow, using current cluster", tag.WorkflowDomainName(domainEntry.GetInfo().Name), tag.WorkflowID(workflowExecution.WorkflowID), tag.WorkflowRunID(workflowExecution.RunID), tag.OperationName(apiName), tag.Error(err))
return policy.currentClusterName
}

policy.logger.Debug("Lookup workflow result for active-active domain request", tag.WorkflowDomainName(domainEntry.GetInfo().Name), tag.WorkflowID(workflowExecution.WorkflowID), tag.WorkflowRunID(workflowExecution.RunID), tag.OperationName(apiName), tag.ActiveClusterName(lookupRes.ClusterName))
return lookupRes.ClusterName
policy.logger.Debug("Lookup workflow result for active-active domain request", tag.WorkflowDomainName(domainEntry.GetInfo().Name), tag.WorkflowID(workflowExecution.WorkflowID), tag.WorkflowRunID(workflowExecution.RunID), tag.OperationName(apiName), tag.ActiveClusterName(activeClusterInfo.ActiveClusterName))
return activeClusterInfo.ActiveClusterName
}
30 changes: 16 additions & 14 deletions service/frontend/wrappers/clusterredirection/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,13 +598,17 @@ func (s *selectedAPIsForwardingRedirectionPolicySuite) TestActiveClusterForActiv
domainEntry := s.setupActiveActiveDomainWithTwoReplicationCluster(true)

usEastStickyPlcy := &types.ActiveClusterSelectionPolicy{
ActiveClusterSelectionStrategy: types.ActiveClusterSelectionStrategyRegionSticky.Ptr(),
StickyRegion: "us-east",
ClusterAttribute: &types.ClusterAttribute{
Scope: "region",
Name: "us-east",
},
}

usWestStickyPlcy := &types.ActiveClusterSelectionPolicy{
ActiveClusterSelectionStrategy: types.ActiveClusterSelectionStrategyRegionSticky.Ptr(),
StickyRegion: "us-west",
ClusterAttribute: &types.ClusterAttribute{
Scope: "region",
Name: "us-west",
},
}

tests := []struct {
Expand All @@ -620,10 +624,9 @@ func (s *selectedAPIsForwardingRedirectionPolicySuite) TestActiveClusterForActiv
domainEntry: domainEntry,
actClSelPolicyForNewWF: usWestStickyPlcy,
mockFn: func(activeClusterManager *activecluster.MockManager) {
activeClusterManager.EXPECT().LookupNewWorkflow(gomock.Any(), domainEntry.GetInfo().ID, usWestStickyPlcy).Return(&activecluster.LookupResult{
Region: "us-west",
ClusterName: s.alternativeClusterName,
FailoverVersion: 2,
activeClusterManager.EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), domainEntry.GetInfo().ID, usWestStickyPlcy.GetClusterAttribute()).Return(&types.ActiveClusterInfo{
ActiveClusterName: s.alternativeClusterName,
FailoverVersion: 2,
}, nil)
},
want: s.alternativeClusterName,
Expand All @@ -633,7 +636,7 @@ func (s *selectedAPIsForwardingRedirectionPolicySuite) TestActiveClusterForActiv
domainEntry: domainEntry,
actClSelPolicyForNewWF: usEastStickyPlcy,
mockFn: func(activeClusterManager *activecluster.MockManager) {
activeClusterManager.EXPECT().LookupNewWorkflow(gomock.Any(), domainEntry.GetInfo().ID, usEastStickyPlcy).Return(nil, errors.New("lookup failed"))
activeClusterManager.EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), domainEntry.GetInfo().ID, usEastStickyPlcy.GetClusterAttribute()).Return(nil, errors.New("lookup failed"))
},
want: s.currentClusterName,
},
Expand Down Expand Up @@ -672,7 +675,7 @@ func (s *selectedAPIsForwardingRedirectionPolicySuite) TestActiveClusterForActiv
RunID: "run1",
},
mockFn: func(activeClusterManager *activecluster.MockManager) {
activeClusterManager.EXPECT().LookupWorkflow(gomock.Any(), domainEntry.GetInfo().ID, "wf1", "run1").Return(nil, errors.New("lookup failed"))
activeClusterManager.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), domainEntry.GetInfo().ID, "wf1", "run1").Return(nil, errors.New("lookup failed"))
},
want: s.currentClusterName,
},
Expand All @@ -684,10 +687,9 @@ func (s *selectedAPIsForwardingRedirectionPolicySuite) TestActiveClusterForActiv
RunID: "run1",
},
mockFn: func(activeClusterManager *activecluster.MockManager) {
activeClusterManager.EXPECT().LookupWorkflow(gomock.Any(), domainEntry.GetInfo().ID, "wf1", "run1").Return(&activecluster.LookupResult{
Region: "us-west",
ClusterName: s.alternativeClusterName,
FailoverVersion: 2,
activeClusterManager.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), domainEntry.GetInfo().ID, "wf1", "run1").Return(&types.ActiveClusterInfo{
ActiveClusterName: s.alternativeClusterName,
FailoverVersion: 2,
}, nil)
},
want: s.alternativeClusterName,
Expand Down
Loading
Loading