diff --git a/common/cache/domainCache.go b/common/cache/domainCache.go index 294a4cd116b..21029e914b5 100644 --- a/common/cache/domainCache.go +++ b/common/cache/domainCache.go @@ -762,7 +762,6 @@ func (entry *DomainCacheEntry) duplicate() *DomainCacheEntry { result.replicationConfig.Clusters = append(result.replicationConfig.Clusters, &c) } result.replicationConfig.ActiveClusters = entry.replicationConfig.ActiveClusters.DeepCopy() - result.activeClusters = entry.activeClusters result.configVersion = entry.configVersion result.failoverVersion = entry.failoverVersion result.isGlobalDomain = entry.isGlobalDomain @@ -904,9 +903,11 @@ func (entry *DomainCacheEntry) IsActiveIn(currentCluster string) bool { } if entry.GetReplicationConfig().IsActiveActive() { - for _, cl := range entry.GetReplicationConfig().ActiveClusters.ActiveClustersByRegion { - if cl.ActiveClusterName == currentCluster { - return true + for _, scope := range entry.GetReplicationConfig().ActiveClusters.AttributeScopes { + for _, cl := range scope.ClusterAttributes { + if cl.ActiveClusterName == currentCluster { + return true + } } } @@ -1064,9 +1065,11 @@ func getActiveClusters(replicationConfig *persistence.DomainReplicationConfig) [ if !replicationConfig.IsActiveActive() { return nil } - activeClusters := make([]string, 0, len(replicationConfig.ActiveClusters.ActiveClustersByRegion)) - for _, cl := range replicationConfig.ActiveClusters.ActiveClustersByRegion { - activeClusters = append(activeClusters, cl.ActiveClusterName) + activeClusters := make([]string, 0, len(replicationConfig.ActiveClusters.AttributeScopes)) + for _, scope := range replicationConfig.ActiveClusters.AttributeScopes { + for _, cl := range scope.ClusterAttributes { + activeClusters = append(activeClusters, cl.ActiveClusterName) + } } sort.Strings(activeClusters) return activeClusters diff --git a/common/cache/domainCache_test.go b/common/cache/domainCache_test.go index 7789daac33d..d15dd7c5a70 100644 --- a/common/cache/domainCache_test.go +++ b/common/cache/domainCache_test.go @@ -330,9 +330,13 @@ func Test_IsActiveIn(t *testing.T) { isGlobalDomain: true, currentCluster: "A", activeClusters: &types.ActiveClusters{ - ActiveClustersByRegion: map[string]types.ActiveClusterInfo{ - "region0": {ActiveClusterName: "A"}, - "region1": {ActiveClusterName: "B"}, + AttributeScopes: map[string]types.ClusterAttributeScope{ + "region": { + ClusterAttributes: map[string]types.ActiveClusterInfo{ + "region0": {ActiveClusterName: "A"}, + "region1": {ActiveClusterName: "B"}, + }, + }, }, }, expectIsActive: true, @@ -342,9 +346,13 @@ func Test_IsActiveIn(t *testing.T) { isGlobalDomain: true, currentCluster: "C", activeClusters: &types.ActiveClusters{ - ActiveClustersByRegion: map[string]types.ActiveClusterInfo{ - "region0": {ActiveClusterName: "A"}, - "region1": {ActiveClusterName: "B"}, + AttributeScopes: map[string]types.ClusterAttributeScope{ + "region": { + ClusterAttributes: map[string]types.ActiveClusterInfo{ + "region0": {ActiveClusterName: "A"}, + "region1": {ActiveClusterName: "B"}, + }, + }, }, }, expectIsActive: false, @@ -1223,14 +1231,16 @@ func Test_NewDomainNotActiveError(t *testing.T) { nil, true, &persistence.DomainReplicationConfig{ - ActiveClusters: &types.ActiveClusters{ActiveClustersByRegion: map[string]types.ActiveClusterInfo{ - "region1": { - ActiveClusterName: "cluster1", - }, - "region2": { - ActiveClusterName: "cluster2", + ActiveClusters: &types.ActiveClusters{ + AttributeScopes: map[string]types.ClusterAttributeScope{ + "region": { + ClusterAttributes: map[string]types.ActiveClusterInfo{ + "region1": {ActiveClusterName: "cluster1"}, + "region2": {ActiveClusterName: "cluster2"}, + }, + }, }, - }}, + }, }, 0, nil, @@ -1273,14 +1283,16 @@ func Test_getActiveClusters(t *testing.T) { msg: "active-active domain", replicationConfig: &persistence.DomainReplicationConfig{ ActiveClusterName: "active", - ActiveClusters: &types.ActiveClusters{ActiveClustersByRegion: map[string]types.ActiveClusterInfo{ - "region1": { - ActiveClusterName: "cluster1", - }, - "region2": { - ActiveClusterName: "cluster2", + ActiveClusters: &types.ActiveClusters{ + AttributeScopes: map[string]types.ClusterAttributeScope{ + "region": { + ClusterAttributes: map[string]types.ActiveClusterInfo{ + "region1": {ActiveClusterName: "cluster1"}, + "region2": {ActiveClusterName: "cluster2"}, + }, + }, }, - }}, + }, }, expectedActiveClusters: []string{"cluster1", "cluster2"}, }, diff --git a/common/types/shared.go b/common/types/shared.go index b29e127567a..bc59944e1ba 100644 --- a/common/types/shared.go +++ b/common/types/shared.go @@ -2604,8 +2604,13 @@ func (v *ActiveClusters) DeepCopy() *ActiveClusters { for region, activeClusterInfo := range v.ActiveClustersByRegion { activeClustersByRegion[region] = activeClusterInfo } + attributeScopes := make(map[string]ClusterAttributeScope) + for scopeType, scope := range v.AttributeScopes { + attributeScopes[scopeType] = scope + } return &ActiveClusters{ ActiveClustersByRegion: activeClustersByRegion, + AttributeScopes: attributeScopes, } } @@ -2643,6 +2648,13 @@ type ActiveClusterSelectionPolicy struct { ClusterAttribute *ClusterAttribute `json:"clusterAttribute,omitempty" yaml:"clusterAttribute,omitempty"` } +func (p *ActiveClusterSelectionPolicy) GetClusterAttribute() *ClusterAttribute { + if p == nil { + return nil + } + return p.ClusterAttribute +} + func (p *ActiveClusterSelectionPolicy) Equals(other *ActiveClusterSelectionPolicy) bool { if p == nil && other == nil { return true diff --git a/common/types/shared_test.go b/common/types/shared_test.go index 865859bcb3c..77426553df2 100644 --- a/common/types/shared_test.go +++ b/common/types/shared_test.go @@ -84,6 +84,13 @@ func TestActiveClustersConfigDeepCopy(t *testing.T) { FailoverVersion: 2, }, }, + AttributeScopes: map[string]ClusterAttributeScope{ + "region": { + ClusterAttributes: map[string]ActiveClusterInfo{ + "us-east-1": {ActiveClusterName: "us-east-1-cluster", FailoverVersion: 1}, + }, + }, + }, } tests := []struct { @@ -101,6 +108,7 @@ func TestActiveClustersConfigDeepCopy(t *testing.T) { input: &ActiveClusters{}, expect: &ActiveClusters{ ActiveClustersByRegion: map[string]ActiveClusterInfo{}, + AttributeScopes: map[string]ClusterAttributeScope{}, }, }, { diff --git a/service/frontend/templates/clusterredirection.tmpl b/service/frontend/templates/clusterredirection.tmpl index 1424bc537ce..4b3cba423c8 100644 --- a/service/frontend/templates/clusterredirection.tmpl +++ b/service/frontend/templates/clusterredirection.tmpl @@ -111,14 +111,6 @@ func (handler *clusterRedirectionHandler) {{$method.Declaration}} { var actClSelPolicyForNewWF *types.ActiveClusterSelectionPolicy var workflowExecution *types.WorkflowExecution {{- if has $method.Name $startWFAPIs}} - if {{(index $method.Params 1).Name}}.ActiveClusterSelectionPolicy == nil { - {{(index $method.Params 1).Name}}.ActiveClusterSelectionPolicy = &types.ActiveClusterSelectionPolicy{ - ActiveClusterSelectionStrategy: types.ActiveClusterSelectionStrategyRegionSticky.Ptr(), - StickyRegion: handler.GetActiveClusterManager().CurrentRegion(), - } - } else if {{(index $method.Params 1).Name}}.ActiveClusterSelectionPolicy.GetStrategy() == types.ActiveClusterSelectionStrategyRegionSticky { - {{(index $method.Params 1).Name}}.ActiveClusterSelectionPolicy.StickyRegion = handler.GetActiveClusterManager().CurrentRegion() - } actClSelPolicyForNewWF = {{(index $method.Params 1).Name}}.ActiveClusterSelectionPolicy {{- else if has $method.Name $nonstartWFAPIs}} workflowExecution = {{(index $method.Params 1).Name}}.GetWorkflowExecution() diff --git a/service/frontend/wrappers/clusterredirection/api_generated.go b/service/frontend/wrappers/clusterredirection/api_generated.go index bff3c3239e8..7bda97ea918 100644 --- a/service/frontend/wrappers/clusterredirection/api_generated.go +++ b/service/frontend/wrappers/clusterredirection/api_generated.go @@ -1347,14 +1347,6 @@ func (handler *clusterRedirectionHandler) SignalWithStartWorkflowExecution(ctx c var actClSelPolicyForNewWF *types.ActiveClusterSelectionPolicy var workflowExecution *types.WorkflowExecution - if sp1.ActiveClusterSelectionPolicy == nil { - sp1.ActiveClusterSelectionPolicy = &types.ActiveClusterSelectionPolicy{ - ActiveClusterSelectionStrategy: types.ActiveClusterSelectionStrategyRegionSticky.Ptr(), - StickyRegion: handler.GetActiveClusterManager().CurrentRegion(), - } - } else if sp1.ActiveClusterSelectionPolicy.GetStrategy() == types.ActiveClusterSelectionStrategyRegionSticky { - sp1.ActiveClusterSelectionPolicy.StickyRegion = handler.GetActiveClusterManager().CurrentRegion() - } actClSelPolicyForNewWF = sp1.ActiveClusterSelectionPolicy err = handler.redirectionPolicy.Redirect(ctx, domainEntry, workflowExecution, actClSelPolicyForNewWF, apiName, requestedConsistencyLevel, func(targetDC string) error { @@ -1395,14 +1387,6 @@ func (handler *clusterRedirectionHandler) SignalWithStartWorkflowExecutionAsync( var actClSelPolicyForNewWF *types.ActiveClusterSelectionPolicy var workflowExecution *types.WorkflowExecution - if sp1.ActiveClusterSelectionPolicy == nil { - sp1.ActiveClusterSelectionPolicy = &types.ActiveClusterSelectionPolicy{ - ActiveClusterSelectionStrategy: types.ActiveClusterSelectionStrategyRegionSticky.Ptr(), - StickyRegion: handler.GetActiveClusterManager().CurrentRegion(), - } - } else if sp1.ActiveClusterSelectionPolicy.GetStrategy() == types.ActiveClusterSelectionStrategyRegionSticky { - sp1.ActiveClusterSelectionPolicy.StickyRegion = handler.GetActiveClusterManager().CurrentRegion() - } actClSelPolicyForNewWF = sp1.ActiveClusterSelectionPolicy err = handler.redirectionPolicy.Redirect(ctx, domainEntry, workflowExecution, actClSelPolicyForNewWF, apiName, requestedConsistencyLevel, func(targetDC string) error { @@ -1483,14 +1467,6 @@ func (handler *clusterRedirectionHandler) StartWorkflowExecution(ctx context.Con var actClSelPolicyForNewWF *types.ActiveClusterSelectionPolicy var workflowExecution *types.WorkflowExecution - if sp1.ActiveClusterSelectionPolicy == nil { - sp1.ActiveClusterSelectionPolicy = &types.ActiveClusterSelectionPolicy{ - ActiveClusterSelectionStrategy: types.ActiveClusterSelectionStrategyRegionSticky.Ptr(), - StickyRegion: handler.GetActiveClusterManager().CurrentRegion(), - } - } else if sp1.ActiveClusterSelectionPolicy.GetStrategy() == types.ActiveClusterSelectionStrategyRegionSticky { - sp1.ActiveClusterSelectionPolicy.StickyRegion = handler.GetActiveClusterManager().CurrentRegion() - } actClSelPolicyForNewWF = sp1.ActiveClusterSelectionPolicy err = handler.redirectionPolicy.Redirect(ctx, domainEntry, workflowExecution, actClSelPolicyForNewWF, apiName, requestedConsistencyLevel, func(targetDC string) error { @@ -1531,14 +1507,6 @@ func (handler *clusterRedirectionHandler) StartWorkflowExecutionAsync(ctx contex var actClSelPolicyForNewWF *types.ActiveClusterSelectionPolicy var workflowExecution *types.WorkflowExecution - if sp1.ActiveClusterSelectionPolicy == nil { - sp1.ActiveClusterSelectionPolicy = &types.ActiveClusterSelectionPolicy{ - ActiveClusterSelectionStrategy: types.ActiveClusterSelectionStrategyRegionSticky.Ptr(), - StickyRegion: handler.GetActiveClusterManager().CurrentRegion(), - } - } else if sp1.ActiveClusterSelectionPolicy.GetStrategy() == types.ActiveClusterSelectionStrategyRegionSticky { - sp1.ActiveClusterSelectionPolicy.StickyRegion = handler.GetActiveClusterManager().CurrentRegion() - } actClSelPolicyForNewWF = sp1.ActiveClusterSelectionPolicy err = handler.redirectionPolicy.Redirect(ctx, domainEntry, workflowExecution, actClSelPolicyForNewWF, apiName, requestedConsistencyLevel, func(targetDC string) error { diff --git a/service/frontend/wrappers/clusterredirection/api_test.go b/service/frontend/wrappers/clusterredirection/api_test.go index 1fb3aab43f8..eb32656bbed 100644 --- a/service/frontend/wrappers/clusterredirection/api_test.go +++ b/service/frontend/wrappers/clusterredirection/api_test.go @@ -910,12 +910,13 @@ func (s *clusterRedirectionHandlerSuite) TestSignalWithStartWorkflowExecution() req := &types.SignalWithStartWorkflowExecutionRequest{ Domain: s.domainName, ActiveClusterSelectionPolicy: &types.ActiveClusterSelectionPolicy{ - ActiveClusterSelectionStrategy: types.ActiveClusterSelectionStrategyRegionSticky.Ptr(), - StickyRegion: "region-a", + ClusterAttribute: &types.ClusterAttribute{ + Scope: "region", + Name: "region-a", + }, }, } - s.mockResource.ActiveClusterMgr.EXPECT().CurrentRegion().Return("region-a").Times(1) s.mockClusterRedirectionPolicy.EXPECT().Redirect(ctx, s.domainCacheEntry, nil, req.ActiveClusterSelectionPolicy, apiName, types.QueryConsistencyLevelEventual, gomock.Any()). DoAndReturn(func(ctx context.Context, domainCacheEntry *cache.DomainCacheEntry, wfExec *types.WorkflowExecution, selPlcy *types.ActiveClusterSelectionPolicy, apiName string, consistencyLevel types.QueryConsistencyLevel, callFn func(targetDC string) error) error { // validate callFn logic @@ -970,12 +971,13 @@ func (s *clusterRedirectionHandlerSuite) TestStartWorkflowExecution() { req := &types.StartWorkflowExecutionRequest{ Domain: s.domainName, ActiveClusterSelectionPolicy: &types.ActiveClusterSelectionPolicy{ - ActiveClusterSelectionStrategy: types.ActiveClusterSelectionStrategyRegionSticky.Ptr(), - StickyRegion: "region-b", + ClusterAttribute: &types.ClusterAttribute{ + Scope: "region", + Name: "region-b", + }, }, } - s.mockResource.ActiveClusterMgr.EXPECT().CurrentRegion().Return("region-a").Times(1) s.mockClusterRedirectionPolicy.EXPECT().Redirect(ctx, s.domainCacheEntry, nil, req.ActiveClusterSelectionPolicy, apiName, types.QueryConsistencyLevelEventual, gomock.Any()). DoAndReturn(func(ctx context.Context, domainCacheEntry *cache.DomainCacheEntry, wfExec *types.WorkflowExecution, selPlcy *types.ActiveClusterSelectionPolicy, apiName string, consistencyLevel types.QueryConsistencyLevel, callFn func(targetDC string) error) error { // validate callFn logic diff --git a/service/frontend/wrappers/clusterredirection/policy.go b/service/frontend/wrappers/clusterredirection/policy.go index 02f0e987d80..523379bcfb1 100644 --- a/service/frontend/wrappers/clusterredirection/policy.go +++ b/service/frontend/wrappers/clusterredirection/policy.go @@ -440,12 +440,12 @@ func (policy *selectedOrAllAPIsForwardingRedirectionPolicy) activeClusterForActi policy.logger.Debug("Determining active cluster for active-active domain request", tag.WorkflowDomainName(domainEntry.GetInfo().Name), tag.Dynamic("execution", workflowExecution), tag.OperationName(apiName)) if actClSelPolicyForNewWF != nil { policy.logger.Debug("Active cluster selection policy for new workflow", tag.WorkflowDomainName(domainEntry.GetInfo().Name), tag.OperationName(apiName), tag.Dynamic("policy", actClSelPolicyForNewWF)) - lookupRes, err := policy.activeClusterManager.LookupNewWorkflow(ctx, domainEntry.GetInfo().ID, actClSelPolicyForNewWF) + activeClusterInfo, err := policy.activeClusterManager.GetActiveClusterInfoByClusterAttribute(ctx, domainEntry.GetInfo().ID, actClSelPolicyForNewWF.GetClusterAttribute()) if err != nil { policy.logger.Error("Failed to lookup active cluster of new workflow, using current cluster", tag.WorkflowDomainName(domainEntry.GetInfo().Name), tag.OperationName(apiName), tag.Error(err)) return policy.currentClusterName } - return lookupRes.ClusterName + return activeClusterInfo.ActiveClusterName } if workflowExecution == nil || workflowExecution.WorkflowID == "" || workflowExecution.RunID == "" { @@ -453,12 +453,12 @@ func (policy *selectedOrAllAPIsForwardingRedirectionPolicy) activeClusterForActi return policy.currentClusterName } - lookupRes, err := policy.activeClusterManager.LookupWorkflow(ctx, domainEntry.GetInfo().ID, workflowExecution.WorkflowID, workflowExecution.RunID) + activeClusterInfo, err := policy.activeClusterManager.GetActiveClusterInfoByWorkflow(ctx, domainEntry.GetInfo().ID, workflowExecution.WorkflowID, workflowExecution.RunID) if err != nil { policy.logger.Error("Failed to lookup active cluster of workflow, using current cluster", tag.WorkflowDomainName(domainEntry.GetInfo().Name), tag.WorkflowID(workflowExecution.WorkflowID), tag.WorkflowRunID(workflowExecution.RunID), tag.OperationName(apiName), tag.Error(err)) return policy.currentClusterName } - policy.logger.Debug("Lookup workflow result for active-active domain request", tag.WorkflowDomainName(domainEntry.GetInfo().Name), tag.WorkflowID(workflowExecution.WorkflowID), tag.WorkflowRunID(workflowExecution.RunID), tag.OperationName(apiName), tag.ActiveClusterName(lookupRes.ClusterName)) - return lookupRes.ClusterName + policy.logger.Debug("Lookup workflow result for active-active domain request", tag.WorkflowDomainName(domainEntry.GetInfo().Name), tag.WorkflowID(workflowExecution.WorkflowID), tag.WorkflowRunID(workflowExecution.RunID), tag.OperationName(apiName), tag.ActiveClusterName(activeClusterInfo.ActiveClusterName)) + return activeClusterInfo.ActiveClusterName } diff --git a/service/frontend/wrappers/clusterredirection/policy_test.go b/service/frontend/wrappers/clusterredirection/policy_test.go index f3fb4416ddd..f84a9b0a251 100644 --- a/service/frontend/wrappers/clusterredirection/policy_test.go +++ b/service/frontend/wrappers/clusterredirection/policy_test.go @@ -598,13 +598,17 @@ func (s *selectedAPIsForwardingRedirectionPolicySuite) TestActiveClusterForActiv domainEntry := s.setupActiveActiveDomainWithTwoReplicationCluster(true) usEastStickyPlcy := &types.ActiveClusterSelectionPolicy{ - ActiveClusterSelectionStrategy: types.ActiveClusterSelectionStrategyRegionSticky.Ptr(), - StickyRegion: "us-east", + ClusterAttribute: &types.ClusterAttribute{ + Scope: "region", + Name: "us-east", + }, } usWestStickyPlcy := &types.ActiveClusterSelectionPolicy{ - ActiveClusterSelectionStrategy: types.ActiveClusterSelectionStrategyRegionSticky.Ptr(), - StickyRegion: "us-west", + ClusterAttribute: &types.ClusterAttribute{ + Scope: "region", + Name: "us-west", + }, } tests := []struct { @@ -620,10 +624,9 @@ func (s *selectedAPIsForwardingRedirectionPolicySuite) TestActiveClusterForActiv domainEntry: domainEntry, actClSelPolicyForNewWF: usWestStickyPlcy, mockFn: func(activeClusterManager *activecluster.MockManager) { - activeClusterManager.EXPECT().LookupNewWorkflow(gomock.Any(), domainEntry.GetInfo().ID, usWestStickyPlcy).Return(&activecluster.LookupResult{ - Region: "us-west", - ClusterName: s.alternativeClusterName, - FailoverVersion: 2, + activeClusterManager.EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), domainEntry.GetInfo().ID, usWestStickyPlcy.GetClusterAttribute()).Return(&types.ActiveClusterInfo{ + ActiveClusterName: s.alternativeClusterName, + FailoverVersion: 2, }, nil) }, want: s.alternativeClusterName, @@ -633,7 +636,7 @@ func (s *selectedAPIsForwardingRedirectionPolicySuite) TestActiveClusterForActiv domainEntry: domainEntry, actClSelPolicyForNewWF: usEastStickyPlcy, mockFn: func(activeClusterManager *activecluster.MockManager) { - activeClusterManager.EXPECT().LookupNewWorkflow(gomock.Any(), domainEntry.GetInfo().ID, usEastStickyPlcy).Return(nil, errors.New("lookup failed")) + activeClusterManager.EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), domainEntry.GetInfo().ID, usEastStickyPlcy.GetClusterAttribute()).Return(nil, errors.New("lookup failed")) }, want: s.currentClusterName, }, @@ -672,7 +675,7 @@ func (s *selectedAPIsForwardingRedirectionPolicySuite) TestActiveClusterForActiv RunID: "run1", }, mockFn: func(activeClusterManager *activecluster.MockManager) { - activeClusterManager.EXPECT().LookupWorkflow(gomock.Any(), domainEntry.GetInfo().ID, "wf1", "run1").Return(nil, errors.New("lookup failed")) + activeClusterManager.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), domainEntry.GetInfo().ID, "wf1", "run1").Return(nil, errors.New("lookup failed")) }, want: s.currentClusterName, }, @@ -684,10 +687,9 @@ func (s *selectedAPIsForwardingRedirectionPolicySuite) TestActiveClusterForActiv RunID: "run1", }, mockFn: func(activeClusterManager *activecluster.MockManager) { - activeClusterManager.EXPECT().LookupWorkflow(gomock.Any(), domainEntry.GetInfo().ID, "wf1", "run1").Return(&activecluster.LookupResult{ - Region: "us-west", - ClusterName: s.alternativeClusterName, - FailoverVersion: 2, + activeClusterManager.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), domainEntry.GetInfo().ID, "wf1", "run1").Return(&types.ActiveClusterInfo{ + ActiveClusterName: s.alternativeClusterName, + FailoverVersion: 2, }, nil) }, want: s.alternativeClusterName, diff --git a/service/history/decision/handler_test.go b/service/history/decision/handler_test.go index 23d9357628f..72c77d71088 100644 --- a/service/history/decision/handler_test.go +++ b/service/history/decision/handler_test.go @@ -240,14 +240,15 @@ func TestHandleDecisionTaskScheduled(t *testing.T) { IsFirstDecision: test.isfirstDecision, } decisionHandler := &handlerImpl{ - config: config.NewForTest(), - shard: shard.NewMockContext(ctrl), - timeSource: clock.NewRealTimeSource(), - metricsClient: metrics.NewClient(tally.NoopScope, metrics.History, metrics.HistogramMigration{}), - logger: testlogger.New(t), - versionChecker: client.NewVersionChecker(), - tokenSerializer: common.NewMockTaskTokenSerializer(ctrl), - domainCache: cache.NewMockDomainCache(ctrl), + config: config.NewForTest(), + shard: shard.NewMockContext(ctrl), + timeSource: clock.NewRealTimeSource(), + metricsClient: metrics.NewClient(tally.NoopScope, metrics.History, metrics.HistogramMigration{}), + logger: testlogger.New(t), + versionChecker: client.NewVersionChecker(), + tokenSerializer: common.NewMockTaskTokenSerializer(ctrl), + domainCache: cache.NewMockDomainCache(ctrl), + activeClusterManager: activecluster.NewMockManager(ctrl), } expectCommonCalls(decisionHandler, test.domainID) expectGetWorkflowExecution(decisionHandler, test.domainID, test.mutablestate) @@ -375,14 +376,15 @@ func TestHandleDecisionTaskFailed(t *testing.T) { } shardContext := shard.NewMockContext(ctrl) decisionHandler := &handlerImpl{ - config: config.NewForTest(), - shard: shardContext, - timeSource: clock.NewRealTimeSource(), - metricsClient: metrics.NewClient(tally.NoopScope, metrics.History, metrics.HistogramMigration{}), - logger: testlogger.New(t), - versionChecker: client.NewVersionChecker(), - tokenSerializer: common.NewMockTaskTokenSerializer(ctrl), - domainCache: cache.NewMockDomainCache(ctrl), + config: config.NewForTest(), + shard: shardContext, + timeSource: clock.NewRealTimeSource(), + metricsClient: metrics.NewClient(tally.NoopScope, metrics.History, metrics.HistogramMigration{}), + logger: testlogger.New(t), + versionChecker: client.NewVersionChecker(), + tokenSerializer: common.NewMockTaskTokenSerializer(ctrl), + domainCache: cache.NewMockDomainCache(ctrl), + activeClusterManager: activecluster.NewMockManager(ctrl), } expectCommonCalls(decisionHandler, test.domainID) expectGetWorkflowExecution(decisionHandler, test.domainID, test.mutablestate) @@ -542,13 +544,14 @@ func TestHandleDecisionTaskStarted(t *testing.T) { } shardContext := shard.NewMockContext(ctrl) decisionHandler := &handlerImpl{ - config: config.NewForTest(), - shard: shardContext, - timeSource: clock.NewRealTimeSource(), - metricsClient: metrics.NewClient(tally.NoopScope, metrics.History, metrics.HistogramMigration{}), - logger: testlogger.New(t), - versionChecker: client.NewVersionChecker(), - domainCache: cache.NewMockDomainCache(ctrl), + config: config.NewForTest(), + shard: shardContext, + timeSource: clock.NewRealTimeSource(), + metricsClient: metrics.NewClient(tally.NoopScope, metrics.History, metrics.HistogramMigration{}), + logger: testlogger.New(t), + versionChecker: client.NewVersionChecker(), + domainCache: cache.NewMockDomainCache(ctrl), + activeClusterManager: activecluster.NewMockManager(ctrl), } expectCommonCalls(decisionHandler, test.domainID) expectGetWorkflowExecution(decisionHandler, test.domainID, test.mutablestate) @@ -825,6 +828,7 @@ func TestHandleDecisionTaskCompleted(t *testing.T) { decisionHandler.shard.(*shard.MockContext).EXPECT().GenerateTaskIDs(2).Times(1).Return([]int64{0, 1}, nil) decisionHandler.shard.(*shard.MockContext).EXPECT().AppendHistoryV2Events(gomock.Any(), gomock.Any(), constants.TestDomainID, gomock.Any()).Return(nil, &persistence.TransactionSizeLimitError{Msg: fmt.Sprintf("transaction size exceeds limit")}) decisionHandler.shard.(*shard.MockContext).EXPECT().GetExecutionManager().Times(1) + decisionHandler.activeClusterManager.(*activecluster.MockManager).EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), gomock.Any(), gomock.Any()).Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil) }, mutableState: &persistence.WorkflowMutableState{ ExecutionInfo: &persistence.WorkflowExecutionInfo{ @@ -866,6 +870,7 @@ func TestHandleDecisionTaskCompleted(t *testing.T) { decisionHandler.shard.(*shard.MockContext).EXPECT().GenerateTaskIDs(2).Times(1).Return([]int64{0, 1}, nil) decisionHandler.shard.(*shard.MockContext).EXPECT().AppendHistoryV2Events(gomock.Any(), gomock.Any(), constants.TestDomainID, gomock.Any()).Return(nil, execution.NewConflictError(new(testing.T), errors.New("some random conflict error"))) decisionHandler.shard.(*shard.MockContext).EXPECT().GetExecutionManager().Times(1) + decisionHandler.activeClusterManager.(*activecluster.MockManager).EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), gomock.Any(), gomock.Any()).Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil) }, mutableState: &persistence.WorkflowMutableState{ ExecutionInfo: &persistence.WorkflowExecutionInfo{ @@ -907,6 +912,7 @@ func TestHandleDecisionTaskCompleted(t *testing.T) { decisionHandler.shard.(*shard.MockContext).EXPECT().GenerateTaskIDs(2).Times(1).Return([]int64{0, 1}, nil) decisionHandler.shard.(*shard.MockContext).EXPECT().AppendHistoryV2Events(gomock.Any(), gomock.Any(), constants.TestDomainID, gomock.Any()).Return(nil, &persistence.TransactionSizeLimitError{Msg: fmt.Sprintf("transaction size exceeds limit")}) decisionHandler.shard.(*shard.MockContext).EXPECT().GetExecutionManager().Times(1) + decisionHandler.activeClusterManager.(*activecluster.MockManager).EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), gomock.Any(), gomock.Any()).Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil) firstGetWfExecutionCall := decisionHandler.shard.(*shard.MockContext).EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()). Return(&persistence.GetWorkflowExecutionResponse{ State: &persistence.WorkflowMutableState{ @@ -920,6 +926,7 @@ func TestHandleDecisionTaskCompleted(t *testing.T) { }, MutableStateStats: &persistence.MutableStateStats{}, }, nil) + decisionHandler.activeClusterManager.(*activecluster.MockManager).EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID).Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil).AnyTimes() lastGetWfExecutionCall := decisionHandler.shard.(*shard.MockContext).EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil, errors.New("some error occurred when loading workflow execution")) gomock.InOrder(firstGetWfExecutionCall, lastGetWfExecutionCall) }, @@ -960,6 +967,7 @@ func TestHandleDecisionTaskCompleted(t *testing.T) { decisionHandler.shard.(*shard.MockContext).EXPECT().AppendHistoryV2Events(gomock.Any(), gomock.Any(), constants.TestDomainID, gomock.Any()).Return(nil, &persistence.TransactionSizeLimitError{Msg: fmt.Sprintf("transaction size exceeds limit")}) decisionHandler.shard.(*shard.MockContext).EXPECT().AppendHistoryV2Events(gomock.Any(), gomock.Any(), constants.TestDomainID, gomock.Any()).Return(&persistence.AppendHistoryNodesResponse{}, nil) decisionHandler.shard.(*shard.MockContext).EXPECT().GetExecutionManager().Times(1) + decisionHandler.activeClusterManager.(*activecluster.MockManager).EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), gomock.Any(), gomock.Any()).Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil) decisionHandler.shard.(*shard.MockContext).EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()). DoAndReturn(func(ctx interface{}, request interface{}) (*persistence.GetWorkflowExecutionResponse, error) { return &persistence.GetWorkflowExecutionResponse{ @@ -975,6 +983,7 @@ func TestHandleDecisionTaskCompleted(t *testing.T) { MutableStateStats: &persistence.MutableStateStats{}, }, nil }).Times(2) + decisionHandler.activeClusterManager.(*activecluster.MockManager).EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID).Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil).AnyTimes() decisionHandler.shard.(*shard.MockContext).EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil, errors.New("some error updating workflow execution")) engine := engine.NewMockEngine(ctrl) decisionHandler.shard.(*shard.MockContext).EXPECT().GetEngine().Return(engine).Times(2) @@ -1034,6 +1043,7 @@ func TestHandleDecisionTaskCompleted(t *testing.T) { }, MutableStateStats: &persistence.MutableStateStats{}, }, nil).Times(1) + decisionHandler.activeClusterManager.(*activecluster.MockManager).EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID).Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil).AnyTimes() }, request: &types.HistoryRespondDecisionTaskCompletedRequest{ DomainUUID: constants.TestDomainID, @@ -1185,15 +1195,16 @@ func TestHandleDecisionTaskCompleted(t *testing.T) { handlerConfig.EnableActivityLocalDispatchByDomain = func(domain string) bool { return true } handlerConfig.DecisionRetryMaxAttempts = func(domain string) int { return 1 } decisionHandler := &handlerImpl{ - config: handlerConfig, - shard: shard, - timeSource: clock.NewMockedTimeSource(), - domainCache: domainCache, - metricsClient: metrics.NewClient(tally.NoopScope, metrics.History, metrics.HistogramMigration{}), - logger: testlogger.New(t), - versionChecker: client.NewVersionChecker(), - tokenSerializer: common.NewMockTaskTokenSerializer(ctrl), - attrValidator: newAttrValidator(domainCache, metrics.NewClient(tally.NoopScope, metrics.History, metrics.HistogramMigration{}), config.NewForTest(), testlogger.New(t)), + config: handlerConfig, + shard: shard, + timeSource: clock.NewMockedTimeSource(), + domainCache: domainCache, + metricsClient: metrics.NewClient(tally.NoopScope, metrics.History, metrics.HistogramMigration{}), + logger: testlogger.New(t), + versionChecker: client.NewVersionChecker(), + tokenSerializer: common.NewMockTaskTokenSerializer(ctrl), + attrValidator: newAttrValidator(domainCache, metrics.NewClient(tally.NoopScope, metrics.History, metrics.HistogramMigration{}), config.NewForTest(), testlogger.New(t)), + activeClusterManager: activecluster.NewMockManager(ctrl), } expectCommonCalls(decisionHandler, test.domainID) if test.expectGetWorkflowExecution { @@ -1462,6 +1473,7 @@ func expectCommonCalls(handler *handlerImpl, domainID string) { handler.domainCache.(*cache.MockDomainCache).EXPECT().GetDomainName(domainID).AnyTimes().Return(constants.TestDomainName, nil) handler.shard.(*shard.MockContext).EXPECT().GetExecutionManager().Times(1) handler.shard.(*shard.MockContext).EXPECT().GetShardID().Return(testShardID).Times(1) + handler.shard.(*shard.MockContext).EXPECT().GetActiveClusterManager().Return(handler.activeClusterManager).AnyTimes() } func expectGetWorkflowExecution(handler *handlerImpl, domainID string, state *persistence.WorkflowMutableState) { @@ -1485,6 +1497,8 @@ func expectGetWorkflowExecution(handler *handlerImpl, domainID string, state *pe RunID: constants.TestRunID, }, }).AnyTimes().Return(workflowExecutionResponse, nil) + + handler.activeClusterManager.(*activecluster.MockManager).EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), domainID, constants.TestWorkflowID, constants.TestRunID).Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil).AnyTimes() } func expectDefaultDomainCache(handler *handlerImpl, domainID string) { diff --git a/service/history/engine/engineimpl/history_engine2_test.go b/service/history/engine/engineimpl/history_engine2_test.go index 3649f3ee58c..8e3c3c0b323 100644 --- a/service/history/engine/engineimpl/history_engine2_test.go +++ b/service/history/engine/engineimpl/history_engine2_test.go @@ -129,6 +129,13 @@ func (s *engine2Suite) SetupTest() { s.mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return(constants.TestDomainID, nil).AnyTimes() s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + testActiveClusterInfo := &types.ActiveClusterInfo{ + ActiveClusterName: testDomainEntry.GetReplicationConfig().ActiveClusterName, + FailoverVersion: testDomainEntry.GetFailoverVersion(), + } + s.mockShard.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(testActiveClusterInfo, nil).AnyTimes() + s.mockShard.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), constants.TestDomainID, gomock.Any()).Return(testActiveClusterInfo, nil).AnyTimes() + s.logger = s.mockShard.GetLogger() executionCache := execution.NewCache(s.mockShard) diff --git a/service/history/engine/engineimpl/history_engine3_eventsv2_test.go b/service/history/engine/engineimpl/history_engine3_eventsv2_test.go index 31aa4a9bb63..3d7543aa143 100644 --- a/service/history/engine/engineimpl/history_engine3_eventsv2_test.go +++ b/service/history/engine/engineimpl/history_engine3_eventsv2_test.go @@ -152,6 +152,11 @@ func (s *engine3Suite) TestRecordDecisionTaskStartedSuccessStickyEnabled() { s.mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(testDomainEntry, nil).AnyTimes() s.mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return(constants.TestDomainName, nil).AnyTimes() s.mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(testDomainEntry, nil).AnyTimes() + testActiveClusterInfo := &types.ActiveClusterInfo{ + ActiveClusterName: testDomainEntry.GetReplicationConfig().ActiveClusterName, + FailoverVersion: testDomainEntry.GetFailoverVersion(), + } + s.mockShard.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(testActiveClusterInfo, nil).AnyTimes() domainID := constants.TestDomainID we := types.WorkflowExecution{ @@ -232,6 +237,11 @@ func (s *engine3Suite) TestStartWorkflowExecution_BrandNew() { s.mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(testDomainEntry, nil).AnyTimes() s.mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return(constants.TestDomainName, nil).AnyTimes() s.mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(testDomainEntry, nil).AnyTimes() + testActiveClusterInfo := &types.ActiveClusterInfo{ + ActiveClusterName: testDomainEntry.GetReplicationConfig().ActiveClusterName, + FailoverVersion: testDomainEntry.GetFailoverVersion(), + } + s.mockShard.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), constants.TestDomainID, gomock.Any()).Return(testActiveClusterInfo, nil).AnyTimes() domainID := constants.TestDomainID workflowID := "workflowID" @@ -301,6 +311,11 @@ func (s *engine3Suite) TestSignalWithStartWorkflowExecution_JustSignal() { s.mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(testDomainEntry, nil).AnyTimes() s.mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return(constants.TestDomainName, nil).AnyTimes() s.mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(testDomainEntry, nil).AnyTimes() + testActiveClusterInfo := &types.ActiveClusterInfo{ + ActiveClusterName: testDomainEntry.GetReplicationConfig().ActiveClusterName, + FailoverVersion: testDomainEntry.GetFailoverVersion(), + } + s.mockShard.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(testActiveClusterInfo, nil).AnyTimes() sRequest := &types.HistorySignalWithStartWorkflowExecutionRequest{} _, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest) @@ -352,6 +367,11 @@ func (s *engine3Suite) TestSignalWithStartWorkflowExecution_WorkflowNotExist() { s.mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(testDomainEntry, nil).AnyTimes() s.mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return(constants.TestDomainName, nil).AnyTimes() s.mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(testDomainEntry, nil).AnyTimes() + testActiveClusterInfo := &types.ActiveClusterInfo{ + ActiveClusterName: testDomainEntry.GetReplicationConfig().ActiveClusterName, + FailoverVersion: testDomainEntry.GetFailoverVersion(), + } + s.mockShard.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), constants.TestDomainID, gomock.Any()).Return(testActiveClusterInfo, nil).AnyTimes() sRequest := &types.HistorySignalWithStartWorkflowExecutionRequest{} _, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest) diff --git a/service/history/engine/engineimpl/history_engine_test.go b/service/history/engine/engineimpl/history_engine_test.go index 51db9c4190c..fc490eded49 100644 --- a/service/history/engine/engineimpl/history_engine_test.go +++ b/service/history/engine/engineimpl/history_engine_test.go @@ -148,6 +148,13 @@ func (s *engineSuite) SetupTest() { s.mockDomainCache.EXPECT().GetDomain(constants.TestDomainName).Return(constants.TestLocalDomainEntry, nil).AnyTimes() s.mockDomainCache.EXPECT().GetDomainID(constants.TestDomainName).Return(constants.TestDomainID, nil).AnyTimes() + testActiveClusterInfo := &types.ActiveClusterInfo{ + ActiveClusterName: constants.TestLocalDomainEntry.GetReplicationConfig().ActiveClusterName, + FailoverVersion: constants.TestLocalDomainEntry.GetFailoverVersion(), + } + s.mockShard.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(testActiveClusterInfo, nil).AnyTimes() + s.mockShard.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), constants.TestDomainID, gomock.Any()).Return(testActiveClusterInfo, nil).AnyTimes() + historyEventNotifier := events.NewNotifier( clock.NewRealTimeSource(), s.mockShard.Resource.MetricsClient, diff --git a/service/history/engine/engineimpl/refresh_workflow_tasks_test.go b/service/history/engine/engineimpl/refresh_workflow_tasks_test.go index 2aea26f4433..31677308d92 100644 --- a/service/history/engine/engineimpl/refresh_workflow_tasks_test.go +++ b/service/history/engine/engineimpl/refresh_workflow_tasks_test.go @@ -27,6 +27,7 @@ import ( "testing" "github.com/stretchr/testify/mock" + "go.uber.org/mock/gomock" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" @@ -118,6 +119,10 @@ func TestRefreshWorkflowTasks(t *testing.T) { Return(getExecResp, tc.getWFExecErr). Once() + eft.ShardCtx.Resource.ActiveClusterMgr. + EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, tc.execution.WorkflowID, tc.execution.RunID). + Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil).MaxTimes(1) + // ReadHistoryBranch prep historyBranchResp := &persistence.ReadHistoryBranchResponse{ HistoryEvents: []*types.HistoryEvent{ diff --git a/service/history/engine/engineimpl/register_domain_failover_callback_test.go b/service/history/engine/engineimpl/register_domain_failover_callback_test.go index 438e1d8c1ba..9c1bb8a9e2b 100644 --- a/service/history/engine/engineimpl/register_domain_failover_callback_test.go +++ b/service/history/engine/engineimpl/register_domain_failover_callback_test.go @@ -395,14 +395,18 @@ func TestDomainCallback(t *testing.T) { &persistence.DomainReplicationConfig{ Clusters: clusters, ActiveClusters: &types.ActiveClusters{ - ActiveClustersByRegion: map[string]types.ActiveClusterInfo{ - "region0": { - ActiveClusterName: "cluster0", - FailoverVersion: 1, - }, - "region1": { - ActiveClusterName: "cluster1", - FailoverVersion: 2, + AttributeScopes: map[string]types.ClusterAttributeScope{ + "region": { + ClusterAttributes: map[string]types.ActiveClusterInfo{ + "region0": { + ActiveClusterName: "cluster0", + FailoverVersion: 1, + }, + "region1": { + ActiveClusterName: "cluster1", + FailoverVersion: 2, + }, + }, }, }, }, diff --git a/service/history/engine/engineimpl/reset_sticky_tasklist_test.go b/service/history/engine/engineimpl/reset_sticky_tasklist_test.go index 4d6822246d5..5c882df0201 100644 --- a/service/history/engine/engineimpl/reset_sticky_tasklist_test.go +++ b/service/history/engine/engineimpl/reset_sticky_tasklist_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "go.uber.org/mock/gomock" "github.com/uber/cadence/common/checksum" "github.com/uber/cadence/common/persistence" @@ -78,6 +79,8 @@ func TestResetStickyTaskList(t *testing.T) { Checksum: checksum.Checksum{}, }, }, nil) + engine.ShardCtx.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, execution.WorkflowID, execution.RunID). + Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil).Times(1) }, expectedErr: workflow.ErrAlreadyCompleted, }, @@ -102,6 +105,8 @@ func TestResetStickyTaskList(t *testing.T) { Checksum: checksum.Checksum{}, }, }, nil) + engine.ShardCtx.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, execution.WorkflowID, execution.RunID). + Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil).Times(1) engine.ShardCtx.Resource.ExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.MatchedBy(func(req *persistence.UpdateWorkflowExecutionRequest) bool { return req.UpdateWorkflowMutation.ExecutionInfo.WorkflowID == execution.WorkflowID && req.UpdateWorkflowMutation.ExecutionInfo.RunID == execution.RunID && diff --git a/service/history/engine/engineimpl/reset_workflow_execution_test.go b/service/history/engine/engineimpl/reset_workflow_execution_test.go index 29eb9ef942a..5b2ac656e6c 100644 --- a/service/history/engine/engineimpl/reset_workflow_execution_test.go +++ b/service/history/engine/engineimpl/reset_workflow_execution_test.go @@ -347,6 +347,7 @@ func TestResetWorkflowExecution(t *testing.T) { }, ExecutionStats: &persistence.ExecutionStats{HistorySize: 1}, }), + withActiveClusterInfo(constants.TestDomainID, latestExecution, &types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}), }, expectedErr: &types.BadRequestError{Message: "Cannot reset workflow without a decision task schedule."}, }, @@ -364,6 +365,7 @@ func TestResetWorkflowExecution(t *testing.T) { }, ExecutionStats: &persistence.ExecutionStats{HistorySize: 1}, }), + withActiveClusterInfo(constants.TestDomainID, latestExecution, &types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}), }, expectedErr: &types.BadRequestError{Message: "Decision finish ID must be > 1 && <= workflow next event ID."}, }, @@ -382,6 +384,7 @@ func TestResetWorkflowExecution(t *testing.T) { }, ExecutionStats: &persistence.ExecutionStats{HistorySize: 1}, }), + withActiveClusterInfo(constants.TestDomainID, latestExecution, &types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}), }, expected: &types.ResetWorkflowExecutionResponse{ RunID: latestRunID, @@ -405,6 +408,7 @@ func TestResetWorkflowExecution(t *testing.T) { }, ExecutionStats: &persistence.ExecutionStats{HistorySize: 1}, }), + withActiveClusterInfo(constants.TestDomainID, latestExecution, &types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}), withHistoryPagination(branchToken, 24), func(t *testing.T, engine *testdata.EngineForTest) { ctrl := gomock.NewController(t) @@ -472,6 +476,7 @@ func TestResetWorkflowExecution(t *testing.T) { }, ExecutionStats: &persistence.ExecutionStats{HistorySize: 1}, }), + withActiveClusterInfo(constants.TestDomainID, latestExecution, &types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}), withHistoryPagination(branchToken, 24), func(t *testing.T, engine *testdata.EngineForTest) { ctrl := gomock.NewController(t) @@ -514,6 +519,7 @@ func TestResetWorkflowExecution(t *testing.T) { }, ExecutionStats: &persistence.ExecutionStats{HistorySize: 1}, }), + withActiveClusterInfo(constants.TestDomainID, latestExecution, &types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}), withState(previousExecution, &persistence.WorkflowMutableState{ ExecutionInfo: &persistence.WorkflowExecutionInfo{ DomainID: constants.TestDomainID, @@ -527,6 +533,7 @@ func TestResetWorkflowExecution(t *testing.T) { }, ExecutionStats: &persistence.ExecutionStats{HistorySize: 1}, }), + withActiveClusterInfo(constants.TestDomainID, latestExecution, &types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}), withHistoryPagination(branchToken, 24), func(t *testing.T, engine *testdata.EngineForTest) { ctrl := gomock.NewController(t) @@ -571,6 +578,7 @@ func TestResetWorkflowExecution(t *testing.T) { }, ExecutionStats: &persistence.ExecutionStats{HistorySize: 1}, }), + withActiveClusterInfo(constants.TestDomainID, latestExecution, &types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}), withHistoryPagination(branchToken, 24), func(t *testing.T, engine *testdata.EngineForTest) { ctrl := gomock.NewController(t) @@ -620,6 +628,7 @@ func TestResetWorkflowExecution(t *testing.T) { }, ExecutionStats: &persistence.ExecutionStats{HistorySize: 1}, }), + withActiveClusterInfo(constants.TestDomainID, latestExecution, &types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}), withHistoryPagination(branchToken, 24), func(t *testing.T, engine *testdata.EngineForTest) { ctrl := gomock.NewController(t) @@ -670,6 +679,7 @@ func TestResetWorkflowExecution(t *testing.T) { }, ExecutionStats: &persistence.ExecutionStats{HistorySize: 1}, }), + withActiveClusterInfo(constants.TestDomainID, latestExecution, &types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}), withHistoryPagination(branchToken, 24), func(t *testing.T, engine *testdata.EngineForTest) { ctrl := gomock.NewController(t) @@ -911,6 +921,7 @@ func TestResetWorkflowExecution_ResetPointsValidation(t *testing.T) { mockResetter := reset.NewMockWorkflowResetter(ctrl) eft.Engine.(*historyEngineImpl).workflowResetter = mockResetter + eft.ShardCtx.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, latestExecution.WorkflowID, latestExecution.RunID).Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil) tc.setupMock(mockResetter, tc.resetEventID) @@ -1019,6 +1030,12 @@ func withState(execution *types.WorkflowExecution, state *persistence.WorkflowMu } } +func withActiveClusterInfo(domainID string, execution *types.WorkflowExecution, activeClusterInfo *types.ActiveClusterInfo) InitFn { + return func(_ *testing.T, engine *testdata.EngineForTest) { + engine.ShardCtx.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), domainID, execution.WorkflowID, execution.RunID).Return(activeClusterInfo, nil) + } +} + func resetExecutionRequest(execution *types.WorkflowExecution, decisionFinishEventID int) *types.HistoryResetWorkflowExecutionRequest { return &types.HistoryResetWorkflowExecutionRequest{ DomainUUID: constants.TestDomainID, diff --git a/service/history/engine/engineimpl/start_workflow_execution.go b/service/history/engine/engineimpl/start_workflow_execution.go index e32d1c1527e..5b6e14a0ab9 100644 --- a/service/history/engine/engineimpl/start_workflow_execution.go +++ b/service/history/engine/engineimpl/start_workflow_execution.go @@ -23,12 +23,14 @@ package engineimpl import ( "context" + "errors" "fmt" "time" "github.com/pborman/uuid" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/activecluster" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" @@ -54,9 +56,6 @@ func (e *historyEngineImpl) StartWorkflowExecution( if err != nil { return nil, err } - if domainEntry.GetReplicationConfig().IsActiveActive() && startRequest.StartRequest.ActiveClusterSelectionPolicy == nil { - return nil, &types.BadRequestError{Message: "ActiveClusterSelectionPolicy is required for active-active domains"} - } return e.startWorkflowHelper( ctx, @@ -130,10 +129,16 @@ func (e *historyEngineImpl) startWorkflowHelper( return nil, err } if prevLastWriteVersion > curMutableState.GetCurrentVersion() { - return nil, e.newDomainNotActiveError( - domainEntry, - prevLastWriteVersion, - ) + policy, err := e.shard.GetActiveClusterManager().GetActiveClusterSelectionPolicyForWorkflow(ctx, domainID, workflowID, prevMutableState.GetExecutionInfo().RunID) + if err != nil { + return nil, err + } + if policy.Equals(request.ActiveClusterSelectionPolicy) { + return nil, e.newDomainNotActiveError( + domainEntry, + prevLastWriteVersion, + ) + } } err = e.applyWorkflowIDReusePolicyForSigWithStart( prevMutableState.GetExecutionInfo(), @@ -248,25 +253,16 @@ func (e *historyEngineImpl) startWorkflowHelper( } if curMutableState.GetCurrentVersion() < t.LastWriteVersion { - if !domainEntry.GetReplicationConfig().IsActiveActive() { + policy, err := e.shard.GetActiveClusterManager().GetActiveClusterSelectionPolicyForWorkflow(ctx, domainID, workflowID, t.RunID) + if err != nil { + return nil, err + } + if policy.Equals(request.ActiveClusterSelectionPolicy) { return nil, e.newDomainNotActiveError( domainEntry, t.LastWriteVersion, ) } - // TODO(active-active): we should update this logic to support external entity policy - if request.ActiveClusterSelectionPolicy != nil && request.ActiveClusterSelectionPolicy.GetStrategy() == types.ActiveClusterSelectionStrategyRegionSticky { - res, err := e.shard.GetActiveClusterManager().LookupWorkflow(ctx, domainID, workflowID, t.RunID) - if err != nil { - return nil, err - } - if res.Region == request.ActiveClusterSelectionPolicy.StickyRegion { - return nil, e.newDomainNotActiveError( - domainEntry, - t.LastWriteVersion, - ) - } - } } prevRunID = t.RunID @@ -348,9 +344,6 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution( if domainEntry.GetInfo().Status != persistence.DomainStatusRegistered { return nil, errDomainDeprecated } - if domainEntry.GetReplicationConfig().IsActiveActive() && signalWithStartRequest.SignalWithStartRequest.ActiveClusterSelectionPolicy == nil { - return nil, &types.BadRequestError{Message: "ActiveClusterSelectionPolicy is required for active-active domains"} - } domainID := domainEntry.GetInfo().ID sRequest := signalWithStartRequest.SignalWithStartRequest @@ -848,21 +841,28 @@ func (e *historyEngineImpl) createMutableState( runID string, startRequest *types.HistoryStartWorkflowExecutionRequest, ) (execution.MutableState, error) { - version := domainEntry.GetFailoverVersion() - // TODO(active-active): replace with cluster attributes - if domainEntry.GetReplicationConfig().IsActiveActive() { - res, err := e.shard.GetActiveClusterManager().LookupNewWorkflow(ctx, domainEntry.GetInfo().ID, startRequest.StartRequest.ActiveClusterSelectionPolicy) + activeClusterInfo, err := e.shard.GetActiveClusterManager().GetActiveClusterInfoByClusterAttribute(ctx, domainEntry.GetInfo().ID, startRequest.StartRequest.ActiveClusterSelectionPolicy.GetClusterAttribute()) + if err != nil { + var errNotFound *activecluster.ClusterAttributeNotFoundError + if !errors.As(err, &errNotFound) { + // unexpected error + return nil, err + } + e.logger.Warn("Failed to get active cluster info by cluster attribute, falling back to domain-level active cluster info", tag.Error(err)) + // fallback to domain-level active cluster info + startRequest.StartRequest.ActiveClusterSelectionPolicy = nil + activeClusterInfo, err = e.shard.GetActiveClusterManager().GetActiveClusterInfoByClusterAttribute(ctx, domainEntry.GetInfo().ID, nil) if err != nil { + // unexpected error return nil, err } - version = res.FailoverVersion } newMutableState := execution.NewMutableStateBuilderWithVersionHistories( e.shard, e.logger, domainEntry, - version, + activeClusterInfo.FailoverVersion, ) if err := newMutableState.SetHistoryTree(runID); err != nil { diff --git a/service/history/engine/engineimpl/start_workflow_execution_test.go b/service/history/engine/engineimpl/start_workflow_execution_test.go index 0273e39088a..24f990a2762 100644 --- a/service/history/engine/engineimpl/start_workflow_execution_test.go +++ b/service/history/engine/engineimpl/start_workflow_execution_test.go @@ -85,6 +85,7 @@ func TestStartWorkflowExecution(t *testing.T) { setupMocks: func(t *testing.T, eft *testdata.EngineForTest) { domainEntry := &cache.DomainCacheEntry{} eft.ShardCtx.Resource.DomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(domainEntry, nil).AnyTimes() + eft.ShardCtx.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), constants.TestDomainID, nil).Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil).AnyTimes() eft.ShardCtx.Resource.ExecutionMgr.On("CreateWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.CreateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{}}, nil).Once() historyBranchResp := &persistence.ReadHistoryBranchResponse{ HistoryEvents: []*types.HistoryEvent{ @@ -152,6 +153,7 @@ func TestStartWorkflowExecution(t *testing.T) { setupMocks: func(t *testing.T, eft *testdata.EngineForTest) { domainEntry := &cache.DomainCacheEntry{} eft.ShardCtx.Resource.DomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(domainEntry, nil).AnyTimes() + eft.ShardCtx.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), constants.TestDomainID, nil).Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil).AnyTimes() eft.ShardCtx.Resource.ExecutionMgr.On("CreateWorkflowExecution", mock.Anything, mock.Anything).Return(nil, errors.New("version conflict")).Once() eft.ShardCtx.Resource.ExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(nil, errors.New("internal error")).Once() @@ -187,6 +189,7 @@ func TestStartWorkflowExecution(t *testing.T) { setupMocks: func(t *testing.T, eft *testdata.EngineForTest) { domainEntry := &cache.DomainCacheEntry{} eft.ShardCtx.Resource.DomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(domainEntry, nil).AnyTimes() + eft.ShardCtx.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), constants.TestDomainID, nil).Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil).AnyTimes() // Simulate the termination and recreation process eft.ShardCtx.Resource.ExecutionMgr.On("TerminateWorkflowExecution", mock.Anything, mock.Anything).Return(nil).Once() eft.ShardCtx.Resource.ExecutionMgr.On("CreateWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.CreateWorkflowExecutionResponse{}, nil).Once() @@ -218,6 +221,7 @@ func TestStartWorkflowExecution(t *testing.T) { setupMocks: func(t *testing.T, eft *testdata.EngineForTest) { domainEntry := &cache.DomainCacheEntry{} eft.ShardCtx.Resource.DomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(domainEntry, nil).AnyTimes() + eft.ShardCtx.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), constants.TestDomainID, nil).Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil).AnyTimes() eft.ShardCtx.Resource.ExecutionMgr.On("CreateWorkflowExecution", mock.Anything, mock.Anything).Return(nil, &persistence.WorkflowExecutionAlreadyStartedError{ StartRequestID: "existing-request-id", @@ -232,35 +236,6 @@ func TestStartWorkflowExecution(t *testing.T) { }, wantErr: true, }, - { - name: "active-active domain - active cluster selection policy is required", - request: &types.HistoryStartWorkflowExecutionRequest{ - DomainUUID: "36047120-fe32-40d6-a242-f14864fc302c", - StartRequest: &types.StartWorkflowExecutionRequest{ - Domain: constants.TestDomainName, - WorkflowID: "workflow-id", - WorkflowType: &types.WorkflowType{Name: "workflow-type"}, - TaskList: &types.TaskList{Name: "default-task-list"}, - ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(3600), // 1 hour - TaskStartToCloseTimeoutSeconds: common.Int32Ptr(10), // 10 seconds - Identity: "workflow-starter", - RequestID: "request-id-for-start", - WorkflowIDReusePolicy: types.WorkflowIDReusePolicyRejectDuplicate.Ptr(), - }, - }, - setupMocks: func(t *testing.T, eft *testdata.EngineForTest) { - domainEntry := getDomainCacheEntry(-1, &types.ActiveClusters{ - ActiveClustersByRegion: map[string]types.ActiveClusterInfo{ - "region0": { - ActiveClusterName: "active", - FailoverVersion: 35, - }, - }, - }) - eft.ShardCtx.Resource.DomainCache.EXPECT().GetDomainByID("36047120-fe32-40d6-a242-f14864fc302c").Return(domainEntry, nil).AnyTimes() - }, - wantErr: true, - }, } for _, tc := range tests { @@ -308,6 +283,7 @@ func TestSignalWithStartWorkflowExecution(t *testing.T) { setupMocks: func(t *testing.T, eft *testdata.EngineForTest) { domainEntry := &cache.DomainCacheEntry{} eft.ShardCtx.Resource.DomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(domainEntry, nil).AnyTimes() + eft.ShardCtx.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), constants.TestDomainID, nil).Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil).AnyTimes() // Mock GetCurrentExecution to simulate a non-existent current execution getCurrentExecReq := &persistence.GetCurrentExecutionRequest{ DomainID: constants.TestDomainID, @@ -353,6 +329,7 @@ func TestSignalWithStartWorkflowExecution(t *testing.T) { setupMocks: func(t *testing.T, eft *testdata.EngineForTest) { domainEntry := &cache.DomainCacheEntry{} eft.ShardCtx.Resource.DomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(domainEntry, nil).AnyTimes() + eft.ShardCtx.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), constants.TestDomainID, nil).Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil).AnyTimes() // Simulate current workflow execution is running getCurrentExecReq := &persistence.GetCurrentExecutionRequest{ @@ -388,6 +365,7 @@ func TestSignalWithStartWorkflowExecution(t *testing.T) { On("GetWorkflowExecution", mock.Anything, getExecReq). Return(getExecResp, nil). Once() + eft.ShardCtx.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID).Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil).AnyTimes() var _ *persistence.UpdateWorkflowExecutionRequest updateExecResp := &persistence.UpdateWorkflowExecutionResponse{ MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{}, @@ -416,35 +394,6 @@ func TestSignalWithStartWorkflowExecution(t *testing.T) { }, wantErr: false, }, - { - name: "active-active domain - active cluster selection policy is required", - request: &types.HistorySignalWithStartWorkflowExecutionRequest{ - DomainUUID: "36047120-fe32-40d6-a242-f14864fc302c", - SignalWithStartRequest: &types.SignalWithStartWorkflowExecutionRequest{ - Domain: constants.TestDomainName, - WorkflowID: "workflow-id", - WorkflowType: &types.WorkflowType{Name: "workflow-type"}, - TaskList: &types.TaskList{Name: "default-task-list"}, - ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(3600), // 1 hour - TaskStartToCloseTimeoutSeconds: common.Int32Ptr(10), // 10 seconds - Identity: "workflow-starter", - RequestID: "request-id-for-start", - WorkflowIDReusePolicy: types.WorkflowIDReusePolicyRejectDuplicate.Ptr(), - }, - }, - setupMocks: func(t *testing.T, eft *testdata.EngineForTest) { - domainEntry := getDomainCacheEntry(-1, &types.ActiveClusters{ - ActiveClustersByRegion: map[string]types.ActiveClusterInfo{ - "region0": { - ActiveClusterName: "active", - FailoverVersion: 35, - }, - }, - }) - eft.ShardCtx.Resource.DomainCache.EXPECT().GetDomainByID("36047120-fe32-40d6-a242-f14864fc302c").Return(domainEntry, nil).AnyTimes() - }, - wantErr: true, - }, } for _, tc := range tests { @@ -475,14 +424,9 @@ func TestCreateMutableState(t *testing.T) { wantVersion int64 }{ { - name: "create mutable state successfully, active-passive domain's failover version is used as version", - domainEntry: getDomainCacheEntry(35, nil), - wantVersion: 35, - }, - { - name: "create mutable state successfully, active-active domain. failover version is looked up from active cluster manager", + name: "create mutable state successfully, failover version is looked up from active cluster manager", domainEntry: getDomainCacheEntry( - -1, /* doesn't matter for active-active domain */ + 0, &types.ActiveClusters{ ActiveClustersByRegion: map[string]types.ActiveClusterInfo{ "us-west": { @@ -496,17 +440,17 @@ func TestCreateMutableState(t *testing.T) { }, }), mockFn: func(ac *activecluster.MockManager) { - ac.EXPECT().LookupNewWorkflow(gomock.Any(), gomock.Any(), gomock.Any()). - Return(&activecluster.LookupResult{ + ac.EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), gomock.Any(), gomock.Any()). + Return(&types.ActiveClusterInfo{ FailoverVersion: 125, }, nil) }, wantVersion: 125, }, { - name: "failed to create mutable state for active-active domain. LookupNewWorkflow failed", + name: "failed to create mutable state. GetActiveClusterInfoByClusterAttribute failed", domainEntry: getDomainCacheEntry( - -1, /* doesn't matter for active-active domain */ + 0, &types.ActiveClusters{ ActiveClustersByRegion: map[string]types.ActiveClusterInfo{ "us-west": { @@ -520,7 +464,7 @@ func TestCreateMutableState(t *testing.T) { }, }), mockFn: func(ac *activecluster.MockManager) { - ac.EXPECT().LookupNewWorkflow(gomock.Any(), gomock.Any(), gomock.Any()). + ac.EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), gomock.Any(), gomock.Any()). Return(nil, errors.New("some error")) }, wantErr: true, diff --git a/service/history/engine/engineimpl/terminate_workflow_execution_test.go b/service/history/engine/engineimpl/terminate_workflow_execution_test.go index 33b6ed44131..c23b2fe35c5 100644 --- a/service/history/engine/engineimpl/terminate_workflow_execution_test.go +++ b/service/history/engine/engineimpl/terminate_workflow_execution_test.go @@ -27,6 +27,7 @@ import ( "testing" "github.com/stretchr/testify/mock" + "go.uber.org/mock/gomock" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" @@ -113,6 +114,8 @@ func TestTerminateWorkflowExecution(t *testing.T) { On("GetWorkflowExecution", mock.Anything, getExecReq). Return(getExecResp, nil).Once() + eft.ShardCtx.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID).Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil).AnyTimes() + // Mock the retrieval of the workflow's history branch historyBranchResp := &persistence.ReadHistoryBranchResponse{ HistoryEvents: []*types.HistoryEvent{ @@ -185,6 +188,8 @@ func TestTerminateWorkflowExecution(t *testing.T) { On("GetWorkflowExecution", mock.Anything, getExecReq). Return(getExecResp, nil).Once() + eft.ShardCtx.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID).Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil).AnyTimes() + eft.ShardCtx.Resource.ExecutionMgr. On("UpdateWorkflowExecution", mock.Anything, mock.Anything). Return(&persistence.UpdateWorkflowExecutionResponse{}, nil). @@ -285,6 +290,8 @@ func TestTerminateWorkflowExecution(t *testing.T) { Return(getExecResp, nil). Once() + eft.ShardCtx.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID).Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil).AnyTimes() + // ReadHistoryBranch prep historyBranchResp := &persistence.ReadHistoryBranchResponse{ HistoryEvents: []*types.HistoryEvent{ @@ -361,6 +368,8 @@ func TestTerminateWorkflowExecution(t *testing.T) { On("GetWorkflowExecution", mock.Anything, getExecReq). Return(getExecResp, nil).Once() + eft.ShardCtx.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID).Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil).AnyTimes() + getCurrentExecReq := &persistence.GetCurrentExecutionRequest{ DomainID: constants.TestDomainID, WorkflowID: constants.TestWorkflowID, @@ -433,6 +442,8 @@ func TestTerminateWorkflowExecution(t *testing.T) { On("GetWorkflowExecution", mock.Anything, getExecReq). Return(getExecResp, nil).Once() + eft.ShardCtx.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID).Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil).AnyTimes() + getCurrentExecReq := &persistence.GetCurrentExecutionRequest{ DomainID: constants.TestDomainID, WorkflowID: constants.TestWorkflowID, @@ -494,6 +505,8 @@ func TestTerminateWorkflowExecution(t *testing.T) { On("GetWorkflowExecution", mock.Anything, getExecReq). Return(getExecResp, nil).Once() + eft.ShardCtx.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID).Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil).AnyTimes() + getCurrentExecReq := &persistence.GetCurrentExecutionRequest{ DomainID: constants.TestDomainID, WorkflowID: constants.TestWorkflowID, diff --git a/service/history/execution/context.go b/service/history/execution/context.go index 8b3e00492ab..413855b3b9d 100644 --- a/service/history/execution/context.go +++ b/service/history/execution/context.go @@ -1406,17 +1406,12 @@ func (c *contextImpl) ReapplyEvents( ctx, cancel := context.WithTimeout(context.Background(), defaultRemoteCallTimeout) defer cancel() - activeCluster := domainEntry.GetReplicationConfig().ActiveClusterName - - if domainEntry.GetReplicationConfig().IsActiveActive() { - lookupRes, err := c.shard.GetActiveClusterManager().LookupWorkflow(ctx, domainID, workflowID, runID) - if err != nil { - return err - } - activeCluster = lookupRes.ClusterName + activeClusterInfo, err := c.shard.GetActiveClusterManager().GetActiveClusterInfoByWorkflow(ctx, domainID, workflowID, runID) + if err != nil { + return err } - if activeCluster == c.shard.GetClusterMetadata().GetCurrentClusterName() { + if activeClusterInfo.ActiveClusterName == c.shard.GetClusterMetadata().GetCurrentClusterName() { return c.shard.GetEngine().ReapplyEvents( ctx, domainID, @@ -1446,7 +1441,7 @@ func (c *contextImpl) ReapplyEvents( // The active cluster of the domain is differ from the current cluster // Use frontend client to route this request to the active cluster // Reapplication only happens in active cluster - sourceCluster, err := clientBean.GetRemoteAdminClient(activeCluster) + sourceCluster, err := clientBean.GetRemoteAdminClient(activeClusterInfo.ActiveClusterName) if err != nil { return &types.InternalServiceError{ Message: err.Error(), diff --git a/service/history/execution/context_test.go b/service/history/execution/context_test.go index 3303cc7cb69..d1841e2aef2 100644 --- a/service/history/execution/context_test.go +++ b/service/history/execution/context_test.go @@ -2881,7 +2881,7 @@ func TestReapplyEvents(t *testing.T) { }, 0), nil) mockShard.EXPECT().GetActiveClusterManager().Return(mockActiveClusterManager) - mockActiveClusterManager.EXPECT().LookupWorkflow(gomock.Any(), "test-domain-id", "test-workflow-id", "test-run-id").Return( + mockActiveClusterManager.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), "test-domain-id", "test-workflow-id", "test-run-id").Return( nil, errors.New("some error")) }, wantErr: true, @@ -2916,9 +2916,9 @@ func TestReapplyEvents(t *testing.T) { }, 0), nil) mockShard.EXPECT().GetActiveClusterManager().Return(mockActiveClusterManager) - mockActiveClusterManager.EXPECT().LookupWorkflow(gomock.Any(), "test-domain-id", "test-workflow-id", "test-run-id").Return( - &activecluster.LookupResult{ - ClusterName: cluster.TestCurrentClusterName, + mockActiveClusterManager.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), "test-domain-id", "test-workflow-id", "test-run-id").Return( + &types.ActiveClusterInfo{ + ActiveClusterName: cluster.TestCurrentClusterName, }, nil) mockShard.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata) mockShard.EXPECT().GetEngine().Return(mockEngine) @@ -2958,9 +2958,9 @@ func TestReapplyEvents(t *testing.T) { }, }, 0), nil) mockShard.EXPECT().GetActiveClusterManager().Return(mockActiveClusterManager) - mockActiveClusterManager.EXPECT().LookupWorkflow(gomock.Any(), "test-domain-id", "test-workflow-id", "test-run-id").Return( - &activecluster.LookupResult{ - ClusterName: cluster.TestAlternativeClusterName, + mockActiveClusterManager.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), "test-domain-id", "test-workflow-id", "test-run-id").Return( + &types.ActiveClusterInfo{ + ActiveClusterName: cluster.TestAlternativeClusterName, }, nil) mockShard.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata) mockShard.EXPECT().GetService().Return(mockResource).Times(2) @@ -2985,6 +2985,11 @@ func TestReapplyEvents(t *testing.T) { mockShard.EXPECT().GetDomainCache().Return(mockDomainCache) mockDomainCache.EXPECT().GetDomainByID("test-domain-id").Return(cache.NewGlobalDomainCacheEntryForTest(nil, nil, &persistence.DomainReplicationConfig{ActiveClusterName: cluster.TestCurrentClusterName}, 0), nil) mockShard.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata) + mockShard.EXPECT().GetActiveClusterManager().Return(mockActiveClusterManager) + mockActiveClusterManager.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), "test-domain-id", "test-workflow-id", "test-run-id").Return( + &types.ActiveClusterInfo{ + ActiveClusterName: cluster.TestCurrentClusterName, + }, nil) mockShard.EXPECT().GetEngine().Return(mockEngine) mockEngine.EXPECT().ReapplyEvents(gomock.Any(), "test-domain-id", "test-workflow-id", "test-run-id", []*types.HistoryEvent{ { @@ -3013,6 +3018,11 @@ func TestReapplyEvents(t *testing.T) { mockDomainCache.EXPECT().GetDomainByID("test-domain-id").Return(cache.NewGlobalDomainCacheEntryForTest(&persistence.DomainInfo{Name: "test-domain"}, nil, &persistence.DomainReplicationConfig{ActiveClusterName: cluster.TestAlternativeClusterName}, 0), nil) mockShard.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata) mockShard.EXPECT().GetService().Return(mockResource).Times(2) + mockShard.EXPECT().GetActiveClusterManager().Return(mockActiveClusterManager) + mockActiveClusterManager.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), "test-domain-id", "test-workflow-id", "test-run-id").Return( + &types.ActiveClusterInfo{ + ActiveClusterName: cluster.TestAlternativeClusterName, + }, nil) mockResource.RemoteAdminClient.EXPECT().ReapplyEvents(gomock.Any(), gomock.Any()).Return(nil) }, wantErr: false, diff --git a/service/history/execution/mutable_state_builder.go b/service/history/execution/mutable_state_builder.go index c9c28b731ff..140018c0a8b 100644 --- a/service/history/execution/mutable_state_builder.go +++ b/service/history/execution/mutable_state_builder.go @@ -385,15 +385,6 @@ func (e *mutableStateBuilder) Load( } } } - - if e.domainEntry.GetReplicationConfig().IsActiveActive() { - res, err := e.shard.GetActiveClusterManager().LookupWorkflow(ctx, e.executionInfo.DomainID, e.executionInfo.WorkflowID, e.executionInfo.RunID) - if err != nil { - return err - } - e.currentVersion = res.FailoverVersion - } - return nil } @@ -1270,34 +1261,27 @@ func (e *mutableStateBuilder) AddContinueAsNewEvent( } firstScheduleTime := currentStartEvent.GetWorkflowExecutionStartedEventAttributes().GetFirstScheduledTime() domainID := e.domainEntry.GetInfo().ID - newStateBuilder := NewMutableStateBuilderWithVersionHistories( - e.shard, - e.logger, - e.domainEntry, - e.domainEntry.GetFailoverVersion(), - ).(*mutableStateBuilder) - - // New mutable state initializes `currentVersion` to domain's failover version. - // This doesn't work for active-active domains. - // Set `currentVersion` of the new mutable state builder based on active cluster selection policy - // specified on continue-as-new attributes. - if e.domainEntry.GetReplicationConfig().IsActiveActive() { - res, err := e.shard.GetActiveClusterManager().LookupNewWorkflow(ctx, e.domainEntry.GetInfo().ID, attributes.ActiveClusterSelectionPolicy) - if err != nil { - return nil, nil, err - } - - newStateBuilder.logger.Debug("mutableStateBuilder.AddContinueAsNewEvent created newStateBuilder", - tag.WorkflowDomainID(e.domainEntry.GetInfo().ID), + activeClusterInfo, err := e.shard.GetActiveClusterManager().GetActiveClusterInfoByClusterAttribute(ctx, domainID, attributes.ActiveClusterSelectionPolicy.GetClusterAttribute()) + if err != nil { + return nil, nil, err + } + if e.logger.DebugOn() { + e.logger.Debug("mutableStateBuilder.AddContinueAsNewEvent created newStateBuilder", + tag.WorkflowDomainID(domainID), tag.WorkflowID(e.executionInfo.WorkflowID), tag.WorkflowRunID(e.executionInfo.RunID), tag.WorkflowRunID(newRunID), tag.CurrentVersion(e.currentVersion), tag.Dynamic("activecluster-sel-policy", attributes.ActiveClusterSelectionPolicy), - tag.Dynamic("activecluster-lookup-res", res), + tag.Dynamic("activecluster-info", activeClusterInfo), ) - newStateBuilder.UpdateCurrentVersion(res.FailoverVersion, true) } + newStateBuilder := NewMutableStateBuilderWithVersionHistories( + e.shard, + e.logger, + e.domainEntry, + activeClusterInfo.FailoverVersion, + ).(*mutableStateBuilder) if _, err = newStateBuilder.addWorkflowExecutionStartedEventForContinueAsNew( parentInfo, @@ -1428,21 +1412,15 @@ func (e *mutableStateBuilder) StartTransaction( domainEntry *cache.DomainCacheEntry, incomingTaskVersion int64, ) (bool, error) { - e.domainEntry = domainEntry - version := domainEntry.GetFailoverVersion() - if e.domainEntry.GetReplicationConfig().IsActiveActive() { - res, err := e.shard.GetActiveClusterManager().LookupWorkflow(ctx, e.executionInfo.DomainID, e.executionInfo.WorkflowID, e.executionInfo.RunID) - if err != nil { - return false, err - } - version = res.FailoverVersion + activeClusterInfo, err := e.shard.GetActiveClusterManager().GetActiveClusterInfoByWorkflow(ctx, e.executionInfo.DomainID, e.executionInfo.WorkflowID, e.executionInfo.RunID) + if err != nil { + return false, err } - if e.logger.DebugOn() { e.logger.Debugf("StartTransaction calling UpdateCurrentVersion for domain %s, wfID %v, incomingTaskVersion %v, version %v, stacktrace %v", - domainEntry.GetInfo().Name, e.executionInfo.WorkflowID, incomingTaskVersion, version, string(debug.Stack())) + domainEntry.GetInfo().Name, e.executionInfo.WorkflowID, incomingTaskVersion, activeClusterInfo.FailoverVersion, string(debug.Stack())) } - if err := e.UpdateCurrentVersion(version, false); err != nil { + if err := e.UpdateCurrentVersion(activeClusterInfo.FailoverVersion, false); err != nil { return false, err } diff --git a/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go b/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go index d899a282a87..dbc73ab40df 100644 --- a/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go +++ b/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go @@ -283,7 +283,11 @@ func TestAddContinueAsNewEvent(t *testing.T) { domainEntry: domainEntry, startingState: createStartingExecutionInfo(), startingHistory: createValidStartingHistory(domainFailoverVersion), - + actClMgrAffordance: func(actClMgr *activecluster.MockManager) { + actClMgr.EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), gomock.Any(), gomock.Any()).Return(&types.ActiveClusterInfo{ + FailoverVersion: domainFailoverVersion, + }, nil) + }, // when it goes to fetch the starting event historyManagerAffordance: func(historyManager *persistence.MockHistoryManager) { historyManager.EXPECT().ReadHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.ReadHistoryBranchResponse{ @@ -303,7 +307,7 @@ func TestAddContinueAsNewEvent(t *testing.T) { startingState: createStartingExecutionInfo(), startingHistory: createValidStartingHistory(1), actClMgrAffordance: func(actClMgr *activecluster.MockManager) { - actClMgr.EXPECT().LookupNewWorkflow(gomock.Any(), gomock.Any(), gomock.Any()).Return(&activecluster.LookupResult{ + actClMgr.EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), gomock.Any(), gomock.Any()).Return(&types.ActiveClusterInfo{ FailoverVersion: 2, // this version will be used by new mutable state builder for new tasks }, nil) }, @@ -333,6 +337,11 @@ func TestAddContinueAsNewEvent(t *testing.T) { domainEntry: domainEntry, startingState: createStartingExecutionInfo(), startingHistory: createValidStartingHistory(domainFailoverVersion), + actClMgrAffordance: func(actClMgr *activecluster.MockManager) { + actClMgr.EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), gomock.Any(), gomock.Any()).Return(&types.ActiveClusterInfo{ + FailoverVersion: domainFailoverVersion, + }, nil) + }, historyManagerAffordance: func(historyManager *persistence.MockHistoryManager) { historyManager.EXPECT().ReadHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.ReadHistoryBranchResponse{ HistoryEvents: []*types.HistoryEvent{ diff --git a/service/history/execution/mutable_state_builder_test.go b/service/history/execution/mutable_state_builder_test.go index 24a63c739fd..683235e1afd 100644 --- a/service/history/execution/mutable_state_builder_test.go +++ b/service/history/execution/mutable_state_builder_test.go @@ -4000,11 +4000,10 @@ func TestLoad_ActiveActive(t *testing.T) { expectedCurrentVersion int64 expectedErr error }{ - "Non-active-active domain - LookupWorkflow should not be called": { + "Non-active-active domain": { domainEntry: nonActiveActiveDomainEntry, mutableState: baseMutableState, activeClusterManagerAffordance: func(activeClusterManager *activecluster.MockManager) { - // No expectations - LookupWorkflow should not be called }, expectedCurrentVersion: commonconstants.EmptyVersion, expectedErr: nil, @@ -4013,98 +4012,10 @@ func TestLoad_ActiveActive(t *testing.T) { domainEntry: activeActiveDomainEntry, mutableState: baseMutableStateNoVersionHistory, activeClusterManagerAffordance: func(activeClusterManager *activecluster.MockManager) { - activeClusterManager.EXPECT().LookupWorkflow( - gomock.Any(), - domainID, - workflowID, - runID, - ).Return(&activecluster.LookupResult{ - FailoverVersion: int64(100), - }, nil).Times(1) }, expectedCurrentVersion: commonconstants.EmptyVersion, // GetCurrentVersion returns EmptyVersion when versionHistories is nil expectedErr: nil, }, - "Active-active domain - LookupWorkflow succeeds with different version": { - domainEntry: activeActiveDomainEntry, - mutableState: baseMutableState, - activeClusterManagerAffordance: func(activeClusterManager *activecluster.MockManager) { - activeClusterManager.EXPECT().LookupWorkflow( - gomock.Any(), - domainID, - workflowID, - runID, - ).Return(&activecluster.LookupResult{ - FailoverVersion: int64(100), - }, nil).Times(1) - }, - expectedCurrentVersion: int64(100), - expectedErr: nil, - }, - "Active-active domain - LookupWorkflow succeeds with same version as domain failover": { - domainEntry: activeActiveDomainEntry, - mutableState: baseMutableState, - activeClusterManagerAffordance: func(activeClusterManager *activecluster.MockManager) { - activeClusterManager.EXPECT().LookupWorkflow( - gomock.Any(), - domainID, - workflowID, - runID, - ).Return(&activecluster.LookupResult{ - FailoverVersion: activeActiveDomainEntry.GetFailoverVersion(), - }, nil).Times(1) - }, - expectedCurrentVersion: activeActiveDomainEntry.GetFailoverVersion(), - expectedErr: nil, - }, - "Active-active domain - LookupWorkflow fails with error": { - domainEntry: activeActiveDomainEntry, - mutableState: baseMutableState, - activeClusterManagerAffordance: func(activeClusterManager *activecluster.MockManager) { - activeClusterManager.EXPECT().LookupWorkflow( - gomock.Any(), - domainID, - workflowID, - runID, - ).Return(nil, &types.InternalServiceError{ - Message: "failed to lookup workflow", - }).Times(1) - }, - expectedCurrentVersion: commonconstants.EmptyVersion, - expectedErr: &types.InternalServiceError{ - Message: "failed to lookup workflow", - }, - }, - "Active-active domain - LookupWorkflow fails with generic error": { - domainEntry: activeActiveDomainEntry, - mutableState: baseMutableState, - activeClusterManagerAffordance: func(activeClusterManager *activecluster.MockManager) { - activeClusterManager.EXPECT().LookupWorkflow( - gomock.Any(), - domainID, - workflowID, - runID, - ).Return(nil, errors.New("connection failed")).Times(1) - }, - expectedCurrentVersion: commonconstants.EmptyVersion, - expectedErr: errors.New("connection failed"), - }, - "Active-active domain - LookupWorkflow with zero failover version": { - domainEntry: activeActiveDomainEntry, - mutableState: baseMutableState, - activeClusterManagerAffordance: func(activeClusterManager *activecluster.MockManager) { - activeClusterManager.EXPECT().LookupWorkflow( - gomock.Any(), - domainID, - workflowID, - runID, - ).Return(&activecluster.LookupResult{ - FailoverVersion: int64(0), - }, nil).Times(1) - }, - expectedCurrentVersion: int64(0), - expectedErr: nil, - }, } for name, td := range tests { @@ -4137,7 +4048,7 @@ func TestLoad_ActiveActive(t *testing.T) { // Verify results if td.expectedErr != nil { - assert.Error(t, err) + require.Error(t, err) assert.Equal(t, td.expectedErr.Error(), err.Error()) } else { assert.NoError(t, err) @@ -4212,36 +4123,45 @@ func TestStartTransaction(t *testing.T) { expectedErrorContains string }{ "it should successfully update the failover version": { - domainEntry: regularDomainEntry, - setupActiveClusterManager: func(activeClusterManager *activecluster.MockManager) {}, - setupMutableStateBuilder: func(msBuilder *mutableStateBuilder) {}, - incomingTaskVersion: int64(100), - expectedFlushDecision: false, - expectedVersion: 123, // domain failover version - expectedError: false, + domainEntry: regularDomainEntry, + setupActiveClusterManager: func(activeClusterManager *activecluster.MockManager) { + activeClusterManager.EXPECT().GetActiveClusterInfoByWorkflow( + gomock.Any(), + domainID, + workflowID, + runID, + ).Return(&types.ActiveClusterInfo{ + FailoverVersion: int64(123), + }, nil).Times(1) + }, + setupMutableStateBuilder: func(msBuilder *mutableStateBuilder) {}, + incomingTaskVersion: int64(100), + expectedFlushDecision: false, + expectedVersion: 123, + expectedError: false, }, "when the domain is active-active domain it should update the failover version": { domainEntry: activeActiveDomainEntry, setupActiveClusterManager: func(activeClusterManager *activecluster.MockManager) { - activeClusterManager.EXPECT().LookupWorkflow( + activeClusterManager.EXPECT().GetActiveClusterInfoByWorkflow( gomock.Any(), domainID, workflowID, runID, - ).Return(&activecluster.LookupResult{ + ).Return(&types.ActiveClusterInfo{ FailoverVersion: int64(999), }, nil).Times(1) }, setupMutableStateBuilder: func(msBuilder *mutableStateBuilder) {}, incomingTaskVersion: int64(200), expectedFlushDecision: false, - expectedVersion: 999, // from LookupWorkflow result + expectedVersion: 999, expectedError: false, }, "when the domain is active-active and workflow lookup fails it should return an error": { domainEntry: activeActiveDomainEntry, setupActiveClusterManager: func(activeClusterManager *activecluster.MockManager) { - activeClusterManager.EXPECT().LookupWorkflow( + activeClusterManager.EXPECT().GetActiveClusterInfoByWorkflow( gomock.Any(), domainID, workflowID, @@ -4256,8 +4176,17 @@ func TestStartTransaction(t *testing.T) { expectedErrorContains: "cluster lookup failed", }, "when unable to update current version it should return an error": { - domainEntry: regularDomainEntry, - setupActiveClusterManager: func(activeClusterManager *activecluster.MockManager) {}, + domainEntry: regularDomainEntry, + setupActiveClusterManager: func(activeClusterManager *activecluster.MockManager) { + activeClusterManager.EXPECT().GetActiveClusterInfoByWorkflow( + gomock.Any(), + domainID, + workflowID, + runID, + ).Return(&types.ActiveClusterInfo{ + FailoverVersion: int64(123), + }, nil).Times(1) + }, setupMutableStateBuilder: func(msBuilder *mutableStateBuilder) { // Create empty version histories to trigger GetCurrentVersionHistory error msBuilder.versionHistories = &persistence.VersionHistories{ diff --git a/service/history/handler/handler.go b/service/history/handler/handler.go index 19888065aca..c8cf5f77b09 100644 --- a/service/history/handler/handler.go +++ b/service/history/handler/handler.go @@ -129,6 +129,7 @@ func (h *handlerImpl) Start() { taskPriorityAssigner := task.NewPriorityAssigner( h.GetClusterMetadata().GetCurrentClusterName(), h.GetDomainCache(), + h.GetActiveClusterManager(), h.GetLogger(), h.GetMetricsClient(), h.config, diff --git a/service/history/queue/task_allocator.go b/service/history/queue/task_allocator.go index 482380f0907..6e276a0fbda 100644 --- a/service/history/queue/task_allocator.go +++ b/service/history/queue/task_allocator.go @@ -157,7 +157,7 @@ func (t *taskAllocatorImpl) verifyTaskActiveness(cluster string, domainID, wfID, // handle active-active domain if domainEntry.GetReplicationConfig().IsActiveActive() { - resp, err := t.activeClusterMgr.LookupWorkflow(context.Background(), domainID, wfID, rID) + resp, err := t.activeClusterMgr.GetActiveClusterInfoByWorkflow(context.Background(), domainID, wfID, rID) if err != nil { t.logger.Warn("Failed to lookup active cluster", tag.WorkflowDomainID(domainID), @@ -167,7 +167,7 @@ func (t *taskAllocatorImpl) verifyTaskActiveness(cluster string, domainID, wfID, ) return false, err } - if resp.ClusterName != cluster { + if resp.ActiveClusterName != cluster { t.logger.Debug("Skip task because workflow is not active on the given cluster", tag.WorkflowID(wfID), tag.WorkflowDomainID(domainID), @@ -180,7 +180,7 @@ func (t *taskAllocatorImpl) verifyTaskActiveness(cluster string, domainID, wfID, tag.WorkflowDomainID(domainID), tag.WorkflowID(wfID), tag.WorkflowRunID(rID), - tag.ClusterName(resp.ClusterName), + tag.ClusterName(resp.ActiveClusterName), ) return true, nil } diff --git a/service/history/queue/task_allocator_test.go b/service/history/queue/task_allocator_test.go index 897b2d1ce22..0a1f2d90dfb 100644 --- a/service/history/queue/task_allocator_test.go +++ b/service/history/queue/task_allocator_test.go @@ -118,9 +118,9 @@ func TestVerifyActiveTask(t *testing.T) { setupMocks: func(mockDomainCache *cache.MockDomainCache, activeClusterMgr *activecluster.MockManager) { domainEntry := activeActiveDomainEntry() mockDomainCache.EXPECT().GetDomainByID(domainID).Return(domainEntry, nil) - activeClusterMgr.EXPECT().LookupWorkflow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Return(&activecluster.LookupResult{ - ClusterName: "another-cluster", + activeClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(&types.ActiveClusterInfo{ + ActiveClusterName: "another-cluster", }, nil) }, expectedResult: false, @@ -130,9 +130,9 @@ func TestVerifyActiveTask(t *testing.T) { setupMocks: func(mockDomainCache *cache.MockDomainCache, activeClusterMgr *activecluster.MockManager) { domainEntry := activeActiveDomainEntry() mockDomainCache.EXPECT().GetDomainByID(domainID).Return(domainEntry, nil) - activeClusterMgr.EXPECT().LookupWorkflow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Return(&activecluster.LookupResult{ - ClusterName: "currentCluster", + activeClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(&types.ActiveClusterInfo{ + ActiveClusterName: "currentCluster", }, nil) }, expectedResult: true, @@ -142,7 +142,7 @@ func TestVerifyActiveTask(t *testing.T) { setupMocks: func(mockDomainCache *cache.MockDomainCache, activeClusterMgr *activecluster.MockManager) { domainEntry := activeActiveDomainEntry() mockDomainCache.EXPECT().GetDomainByID(domainID).Return(domainEntry, nil) - activeClusterMgr.EXPECT().LookupWorkflow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + activeClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(nil, errors.New("some error")) }, expectedResult: false, @@ -453,9 +453,9 @@ func TestVerifyStandbyTask(t *testing.T) { setupMocks: func(mockDomainCache *cache.MockDomainCache, activeClusterMgr *activecluster.MockManager) { domainEntry := activeActiveDomainEntry() mockDomainCache.EXPECT().GetDomainByID(domainID).Return(domainEntry, nil) - activeClusterMgr.EXPECT().LookupWorkflow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Return(&activecluster.LookupResult{ - ClusterName: "another-cluster", + activeClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(&types.ActiveClusterInfo{ + ActiveClusterName: "another-cluster", }, nil) }, standbyCluster: "standbyCluster", @@ -467,9 +467,9 @@ func TestVerifyStandbyTask(t *testing.T) { setupMocks: func(mockDomainCache *cache.MockDomainCache, activeClusterMgr *activecluster.MockManager) { domainEntry := activeActiveDomainEntry() mockDomainCache.EXPECT().GetDomainByID(domainID).Return(domainEntry, nil) - activeClusterMgr.EXPECT().LookupWorkflow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Return(&activecluster.LookupResult{ - ClusterName: "standbyCluster", + activeClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(&types.ActiveClusterInfo{ + ActiveClusterName: "standbyCluster", }, nil) }, standbyCluster: "standbyCluster", @@ -481,7 +481,7 @@ func TestVerifyStandbyTask(t *testing.T) { setupMocks: func(mockDomainCache *cache.MockDomainCache, activeClusterMgr *activecluster.MockManager) { domainEntry := activeActiveDomainEntry() mockDomainCache.EXPECT().GetDomainByID(domainID).Return(domainEntry, nil) - activeClusterMgr.EXPECT().LookupWorkflow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + activeClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(nil, errors.New("some error")) }, standbyCluster: "standbyCluster", diff --git a/service/history/queuev2/timer_queue_factory.go b/service/history/queuev2/timer_queue_factory.go index 6228dc68b10..8ef76809dde 100644 --- a/service/history/queuev2/timer_queue_factory.go +++ b/service/history/queuev2/timer_queue_factory.go @@ -125,7 +125,6 @@ func (f *timerQueueFactory) createQueuev2( ) executorWrapper := task.NewExecutorWrapper( shard.GetClusterMetadata().GetCurrentClusterName(), - shard.GetDomainCache(), shard.GetActiveClusterManager(), activeTaskExecutor, standbyTaskExecutor, diff --git a/service/history/queuev2/transfer_queue_factory.go b/service/history/queuev2/transfer_queue_factory.go index 63b83788a78..c46583d4d40 100644 --- a/service/history/queuev2/transfer_queue_factory.go +++ b/service/history/queuev2/transfer_queue_factory.go @@ -132,7 +132,6 @@ func (f *transferQueueFactory) createQueuev2( executorWrapper := task.NewExecutorWrapper( shard.GetClusterMetadata().GetCurrentClusterName(), - shard.GetDomainCache(), shard.GetActiveClusterManager(), activeTaskExecutor, standbyTaskExecutor, diff --git a/service/history/shard/context.go b/service/history/shard/context.go index e16f0b274fd..a1e9724f383 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -1312,17 +1312,17 @@ func (s *contextImpl) allocateTimerIDsLocked( if domainEntry.GetReplicationConfig().IsActiveActive() { // Note: This doesn't work for initial backoff timer task because the workflow's active-cluster-selection-policy row is not stored yet. - // Therefore LookupWorkflow returns current cluster (fallback logic in activecluster manager) + // Therefore GetActiveClusterInfoByWorkflow returns current cluster (fallback logic in activecluster manager) // Queue v2 doesn't use this logic and it must be enabled to properly handle initial backoff timer task for active-active domains. // Leaving this code block instead of rejecting the whole id allocation request. // Active-active domains should not be used in Cadence clusters that don't have queue v2 enabled. ctx, cancel := context.WithTimeout(context.Background(), activeClusterLookupTimeout) - lookupRes, err := s.GetActiveClusterManager().LookupWorkflow(ctx, task.GetDomainID(), task.GetWorkflowID(), task.GetRunID()) + lookupRes, err := s.GetActiveClusterManager().GetActiveClusterInfoByWorkflow(ctx, task.GetDomainID(), task.GetWorkflowID(), task.GetRunID()) cancel() if err != nil { return err } - cluster = lookupRes.ClusterName + cluster = lookupRes.ActiveClusterName } } diff --git a/service/history/shard/context_test.go b/service/history/shard/context_test.go index 2db0731b964..a107680a837 100644 --- a/service/history/shard/context_test.go +++ b/service/history/shard/context_test.go @@ -37,7 +37,6 @@ import ( "go.uber.org/mock/gomock" "github.com/uber/cadence/common" - "github.com/uber/cadence/common/activecluster" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/cluster" @@ -1401,10 +1400,10 @@ func (s *contextTestSuite) TestAllocateTimerIDsLocked_WhenDomainIsActiveActiveUs } // Setup active cluster manager mock - s.mockResource.ActiveClusterMgr.EXPECT().LookupWorkflow( + s.mockResource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow( gomock.Any(), testDomainID, testWorkflowID, gomock.Any(), - ).Return(&activecluster.LookupResult{ - ClusterName: "looked-up-cluster", + ).Return(&types.ActiveClusterInfo{ + ActiveClusterName: "looked-up-cluster", }, nil).Times(1) // Create task with non-empty version to trigger the lookup logic @@ -1522,7 +1521,7 @@ func (s *contextTestSuite) TestAllocateTimerIDsLocked_WhenClusterManagerLookupFa } // Setup active cluster manager mock to return error - s.mockResource.ActiveClusterMgr.EXPECT().LookupWorkflow( + s.mockResource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow( gomock.Any(), testDomainID, testWorkflowID, gomock.Any(), ).Return(nil, assert.AnError).Times(1) diff --git a/service/history/task/executor_wrapper.go b/service/history/task/executor_wrapper.go index cde06bee5c5..13c4f69169a 100644 --- a/service/history/task/executor_wrapper.go +++ b/service/history/task/executor_wrapper.go @@ -28,7 +28,6 @@ import ( "runtime/debug" "github.com/uber/cadence/common/activecluster" - "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" ) @@ -36,7 +35,6 @@ import ( type ( executorWrapper struct { currentClusterName string - registry cache.DomainCache activeClusterMgr activecluster.Manager activeExecutor Executor standbyExecutor Executor @@ -46,7 +44,6 @@ type ( func NewExecutorWrapper( currentClusterName string, - registry cache.DomainCache, activeClusterMgr activecluster.Manager, activeExecutor Executor, standbyExecutor Executor, @@ -54,7 +51,6 @@ func NewExecutorWrapper( ) Executor { return &executorWrapper{ currentClusterName: currentClusterName, - registry: registry, activeClusterMgr: activeClusterMgr, activeExecutor: activeExecutor, standbyExecutor: standbyExecutor, @@ -82,54 +78,34 @@ func (e *executorWrapper) isActiveTask( wfID := task.GetWorkflowID() rID := task.GetRunID() - entry, err := e.registry.GetDomainByID(domainID) + activeClusterInfo, err := e.activeClusterMgr.GetActiveClusterInfoByWorkflow(context.Background(), domainID, wfID, rID) if err != nil { - e.logger.Warn("Unable to find namespace, process task as active.", tag.WorkflowDomainID(domainID), tag.Value(task.GetInfo()), tag.Error(err)) + e.logger.Warn("Failed to get active cluster info, process task as active.", tag.WorkflowDomainID(domainID), tag.WorkflowID(wfID), tag.WorkflowRunID(rID), tag.Error(err)) return true } - if entry.GetReplicationConfig().IsActiveActive() { - resp, err := e.activeClusterMgr.LookupWorkflow(context.Background(), domainID, wfID, rID) - if err != nil { - e.logger.Warn("Failed to lookup active cluster, process task as active.", - tag.WorkflowDomainID(domainID), - tag.WorkflowID(wfID), - tag.WorkflowRunID(rID), - tag.Error(err), - ) - return true - } - if resp.ClusterName != e.currentClusterName { - if e.logger.DebugOn() { - taskJSON, _ := json.Marshal(task) - e.logger.Debug("Process task as standby.", - tag.WorkflowDomainID(domainID), - tag.Dynamic("task", string(taskJSON)), - tag.Dynamic("taskType", task.GetTaskType()), - tag.ClusterName(resp.ClusterName), - tag.Dynamic("stack", string(debug.Stack())), - ) - } - return false - } + if activeClusterInfo.ActiveClusterName != e.currentClusterName { if e.logger.DebugOn() { taskJSON, _ := json.Marshal(task) - e.logger.Debug("Process task as active.", + e.logger.Debug("Process task as standby.", tag.WorkflowDomainID(domainID), tag.Dynamic("task", string(taskJSON)), tag.Dynamic("taskType", task.GetTaskType()), - tag.ClusterName(e.currentClusterName), + tag.ClusterName(activeClusterInfo.ActiveClusterName), tag.Dynamic("stack", string(debug.Stack())), ) } - return true - } - - if !entry.IsActiveIn(e.currentClusterName) { - e.logger.Debug("Process task as standby.", tag.WorkflowDomainID(domainID), tag.Value(task.GetInfo()), tag.ClusterName(e.currentClusterName)) return false } - - e.logger.Debug("Process task as active.", tag.WorkflowDomainID(domainID), tag.Value(task.GetInfo()), tag.ClusterName(e.currentClusterName)) + if e.logger.DebugOn() { + taskJSON, _ := json.Marshal(task) + e.logger.Debug("Process task as active.", + tag.WorkflowDomainID(domainID), + tag.Dynamic("task", string(taskJSON)), + tag.Dynamic("taskType", task.GetTaskType()), + tag.ClusterName(activeClusterInfo.ActiveClusterName), + tag.Dynamic("stack", string(debug.Stack())), + ) + } return true } diff --git a/service/history/task/executor_wrapper_test.go b/service/history/task/executor_wrapper_test.go index e290ca2b7b1..905cdba9fca 100644 --- a/service/history/task/executor_wrapper_test.go +++ b/service/history/task/executor_wrapper_test.go @@ -7,7 +7,6 @@ import ( "go.uber.org/mock/gomock" "github.com/uber/cadence/common/activecluster" - "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" @@ -26,15 +25,6 @@ func TestExecutorWrapper_IsActiveTask(t *testing.T) { lookupError error expectedResult bool }{ - { - name: "Domain not found - process as active", - currentCluster: "cluster1", - domainID: "domain1", - workflowID: "workflow1", - runID: "run1", - domainError: assert.AnError, - expectedResult: true, - }, { name: "Active-Active domain - current cluster is active", currentCluster: "cluster1", @@ -65,26 +55,6 @@ func TestExecutorWrapper_IsActiveTask(t *testing.T) { lookupError: assert.AnError, expectedResult: true, }, - { - name: "Non-Active-Active domain - current cluster is active", - currentCluster: "cluster1", - activeCluster: "cluster1", - domainID: "domain1", - workflowID: "workflow1", - runID: "run1", - isActiveActive: false, - expectedResult: true, - }, - { - name: "Non-Active-Active domain - current cluster is not active", - currentCluster: "cluster1", - activeCluster: "cluster2", - domainID: "domain1", - workflowID: "workflow1", - runID: "run1", - isActiveActive: false, - expectedResult: false, - }, } for _, tt := range tests { @@ -98,53 +68,13 @@ func TestExecutorWrapper_IsActiveTask(t *testing.T) { mockTask.EXPECT().GetInfo().Return(&persistence.DecisionTask{}).AnyTimes() mockTask.EXPECT().GetTaskType().Return(0).AnyTimes() // called by debug log - mockDomainCache := cache.NewMockDomainCache(ctrl) mockActiveClusterMgr := activecluster.NewMockManager(ctrl) - if tt.isActiveActive { - domainEntry := cache.NewDomainCacheEntryForTest( - &persistence.DomainInfo{ID: tt.domainID}, - &persistence.DomainConfig{}, - true, - &persistence.DomainReplicationConfig{ - ActiveClusters: &types.ActiveClusters{ - ActiveClustersByRegion: map[string]types.ActiveClusterInfo{ - "region1": {ActiveClusterName: tt.activeCluster}, - }, - }, - }, - 0, - nil, - 0, - 0, - 0, - ) - mockDomainCache.EXPECT().GetDomainByID(tt.domainID).Return(domainEntry, tt.domainError) - if tt.lookupError == nil { - mockActiveClusterMgr.EXPECT().LookupWorkflow(gomock.Any(), tt.domainID, tt.workflowID, tt.runID). - Return(&activecluster.LookupResult{ClusterName: tt.activeCluster}, nil) - } else { - mockActiveClusterMgr.EXPECT().LookupWorkflow(gomock.Any(), tt.domainID, tt.workflowID, tt.runID). - Return(nil, tt.lookupError) - } + if tt.lookupError == nil { + mockActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), tt.domainID, tt.workflowID, tt.runID). + Return(&types.ActiveClusterInfo{ActiveClusterName: tt.activeCluster}, nil) } else { - domainEntry := cache.NewDomainCacheEntryForTest( - &persistence.DomainInfo{ID: tt.domainID}, - &persistence.DomainConfig{}, - true, - &persistence.DomainReplicationConfig{ - ActiveClusterName: tt.activeCluster, - Clusters: []*persistence.ClusterReplicationConfig{ - {ClusterName: tt.currentCluster}, - {ClusterName: tt.activeCluster}, - }, - }, - 0, - nil, - 0, - 0, - 0, - ) - mockDomainCache.EXPECT().GetDomainByID(tt.domainID).Return(domainEntry, tt.domainError) + mockActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), tt.domainID, tt.workflowID, tt.runID). + Return(nil, tt.lookupError) } mockLogger := testlogger.New(t) @@ -152,7 +82,6 @@ func TestExecutorWrapper_IsActiveTask(t *testing.T) { // Create executor wrapper wrapper := NewExecutorWrapper( tt.currentCluster, - mockDomainCache, mockActiveClusterMgr, NewMockExecutor(ctrl), NewMockExecutor(ctrl), diff --git a/service/history/task/priority_assigner.go b/service/history/task/priority_assigner.go index 7cbb28f69ec..b93ff511ebb 100644 --- a/service/history/task/priority_assigner.go +++ b/service/history/task/priority_assigner.go @@ -21,8 +21,10 @@ package task import ( + "context" "sync" + "github.com/uber/cadence/common/activecluster" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/constants" dynamicquotas "github.com/uber/cadence/common/dynamicconfig/quotas" @@ -49,12 +51,14 @@ type priorityAssignerImpl struct { logger log.Logger scope metrics.Scope rateLimiters *quotas.Collection + activeClusterMgr activecluster.Manager } // NewPriorityAssigner creates a new task priority assigner func NewPriorityAssigner( currentClusterName string, domainCache cache.DomainCache, + activeClusterMgr activecluster.Manager, logger log.Logger, metricClient metrics.Client, config *config.Config, @@ -62,6 +66,7 @@ func NewPriorityAssigner( return &priorityAssignerImpl{ currentClusterName: currentClusterName, domainCache: domainCache, + activeClusterMgr: activeClusterMgr, config: config, logger: logger, scope: metricClient.Scope(metrics.TaskPriorityAssignerScope), @@ -89,7 +94,7 @@ func (a *priorityAssignerImpl) Assign(queueTask Task) error { // timer, transfer or cross cluster task, first check if task is active or not and if domain is active or not isActiveTask := queueType == QueueTypeActiveTimer || queueType == QueueTypeActiveTransfer - domainName, isActiveDomain, err := a.getDomainInfo(queueTask.GetDomainID()) + domainName, isActiveDomain, err := a.getDomainInfo(queueTask.GetDomainID(), queueTask.GetWorkflowID(), queueTask.GetRunID()) if err != nil { return err } @@ -130,7 +135,7 @@ func (a *priorityAssignerImpl) Assign(queueTask Task) error { // 1. domain name // 2. if domain is active // 3. error, if any -func (a *priorityAssignerImpl) getDomainInfo(domainID string) (string, bool, error) { +func (a *priorityAssignerImpl) getDomainInfo(domainID, wfID, rID string) (string, bool, error) { domainEntry, err := a.domainCache.GetDomainByID(domainID) if err != nil { if _, ok := err.(*types.EntityNotExistsError); !ok { @@ -143,16 +148,13 @@ func (a *priorityAssignerImpl) getDomainInfo(domainID string) (string, bool, err return "", true, nil } - if domainEntry.GetReplicationConfig().IsActiveActive() { - active := domainEntry.IsActiveIn(a.currentClusterName) - return domainEntry.GetInfo().Name, active, nil + activeClusterInfo, err := a.activeClusterMgr.GetActiveClusterInfoByWorkflow(context.Background(), domainID, wfID, rID) + if err != nil { + a.logger.Warn("Failed to get active cluster info", tag.WorkflowDomainID(domainID), tag.Error(err)) + return "", true, nil } - // TODO(active-active): The logic below ignores pending active case for active-passive domains. - // However IsActiveIn() that is used above for active-active domains returns false for pending active domains. - // What should be the behavior for pending active domains? - - if domainEntry.IsGlobalDomain() && a.currentClusterName != domainEntry.GetReplicationConfig().ActiveClusterName { + if activeClusterInfo.ActiveClusterName != a.currentClusterName { return domainEntry.GetInfo().Name, false, nil } return domainEntry.GetInfo().Name, true, nil diff --git a/service/history/task/priority_assigner_test.go b/service/history/task/priority_assigner_test.go index e7546918b81..0dc75a6dedc 100644 --- a/service/history/task/priority_assigner_test.go +++ b/service/history/task/priority_assigner_test.go @@ -29,6 +29,7 @@ import ( "github.com/uber-go/tally" "go.uber.org/mock/gomock" + "github.com/uber/cadence/common/activecluster" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/cluster" commonconstants "github.com/uber/cadence/common/constants" @@ -46,8 +47,9 @@ type ( *require.Assertions suite.Suite - controller *gomock.Controller - mockDomainCache *cache.MockDomainCache + controller *gomock.Controller + mockDomainCache *cache.MockDomainCache + mockActiveClusterMgr *activecluster.MockManager config *config.Config priorityAssigner *priorityAssignerImpl @@ -65,6 +67,7 @@ func (s *taskPriorityAssignerSuite) SetupTest() { s.controller = gomock.NewController(s.T()) s.mockDomainCache = cache.NewMockDomainCache(s.controller) + s.mockActiveClusterMgr = activecluster.NewMockManager(s.controller) s.testTaskProcessRPS = 10 client := dynamicconfig.NewInMemoryClient() @@ -77,6 +80,7 @@ func (s *taskPriorityAssignerSuite) SetupTest() { s.priorityAssigner = NewPriorityAssigner( cluster.TestCurrentClusterName, s.mockDomainCache, + s.mockActiveClusterMgr, log.NewNoop(), metrics.NewClient(tally.NoopScope, metrics.History, metrics.HistogramMigration{}), s.config, @@ -89,42 +93,31 @@ func (s *taskPriorityAssignerSuite) TearDownTest() { func (s *taskPriorityAssignerSuite) TestGetDomainInfo_Success_Active() { s.mockDomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(constants.TestGlobalDomainEntry, nil) + s.mockActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID).Return(&types.ActiveClusterInfo{ActiveClusterName: cluster.TestCurrentClusterName}, nil) - domainName, isActive, err := s.priorityAssigner.getDomainInfo(constants.TestDomainID) + domainName, isActive, err := s.priorityAssigner.getDomainInfo(constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID) s.NoError(err) s.Equal(constants.TestDomainName, domainName) s.True(isActive) } func (s *taskPriorityAssignerSuite) TestGetDomainInfo_Success_Passive() { - constants.TestGlobalDomainEntry.GetReplicationConfig().ActiveClusterName = cluster.TestAlternativeClusterName - defer func() { - constants.TestGlobalDomainEntry.GetReplicationConfig().ActiveClusterName = cluster.TestCurrentClusterName - }() s.mockDomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(constants.TestGlobalDomainEntry, nil) + s.mockActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID).Return(&types.ActiveClusterInfo{ActiveClusterName: cluster.TestAlternativeClusterName}, nil) - domainName, isActive, err := s.priorityAssigner.getDomainInfo(constants.TestDomainID) + domainName, isActive, err := s.priorityAssigner.getDomainInfo(constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID) s.NoError(err) s.Equal(constants.TestDomainName, domainName) s.False(isActive) } -func (s *taskPriorityAssignerSuite) TestGetDomainInfo_Success_Local() { - s.mockDomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(constants.TestLocalDomainEntry, nil) - - domainName, isActive, err := s.priorityAssigner.getDomainInfo(constants.TestDomainID) - s.NoError(err) - s.Equal(constants.TestDomainName, domainName) - s.True(isActive) -} - func (s *taskPriorityAssignerSuite) TestGetDomainInfo_Fail_DomainNotExist() { s.mockDomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return( nil, &types.EntityNotExistsError{Message: "domain not exist"}, ) - domainName, isActive, err := s.priorityAssigner.getDomainInfo(constants.TestDomainID) + domainName, isActive, err := s.priorityAssigner.getDomainInfo(constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID) s.NoError(err) s.Empty(domainName) s.True(isActive) @@ -136,7 +129,7 @@ func (s *taskPriorityAssignerSuite) TestGetDomainInfo_Fail_UnknownError() { errors.New("some random error"), ) - domainName, isActive, err := s.priorityAssigner.getDomainInfo(constants.TestDomainID) + domainName, isActive, err := s.priorityAssigner.getDomainInfo(constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID) s.Error(err) s.Empty(domainName) s.False(isActive) @@ -153,15 +146,14 @@ func (s *taskPriorityAssignerSuite) TestAssign_ReplicationTask() { } func (s *taskPriorityAssignerSuite) TestAssign_StandbyTask_StandbyDomain() { - constants.TestGlobalDomainEntry.GetReplicationConfig().ActiveClusterName = cluster.TestAlternativeClusterName - defer func() { - constants.TestGlobalDomainEntry.GetReplicationConfig().ActiveClusterName = cluster.TestCurrentClusterName - }() s.mockDomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(constants.TestGlobalDomainEntry, nil) + s.mockActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID).Return(&types.ActiveClusterInfo{ActiveClusterName: cluster.TestAlternativeClusterName}, nil) mockTask := NewMockTask(s.controller) mockTask.EXPECT().GetQueueType().Return(QueueTypeStandbyTransfer).AnyTimes() mockTask.EXPECT().GetDomainID().Return(constants.TestDomainID).Times(1) + mockTask.EXPECT().GetWorkflowID().Return(constants.TestWorkflowID).Times(1) + mockTask.EXPECT().GetRunID().Return(constants.TestRunID).Times(1) mockTask.EXPECT().Priority().Return(noPriority).Times(1) mockTask.EXPECT().SetPriority(commonconstants.GetTaskPriority(commonconstants.LowPriorityClass, commonconstants.DefaultPrioritySubclass)).Times(1) @@ -171,10 +163,13 @@ func (s *taskPriorityAssignerSuite) TestAssign_StandbyTask_StandbyDomain() { func (s *taskPriorityAssignerSuite) TestAssign_StandbyTask_ActiveDomain() { s.mockDomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(constants.TestGlobalDomainEntry, nil) + s.mockActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID).Return(&types.ActiveClusterInfo{ActiveClusterName: cluster.TestCurrentClusterName}, nil) mockTask := NewMockTask(s.controller) mockTask.EXPECT().GetQueueType().Return(QueueTypeStandbyTransfer).AnyTimes() mockTask.EXPECT().GetDomainID().Return(constants.TestDomainID).Times(1) + mockTask.EXPECT().GetWorkflowID().Return(constants.TestWorkflowID).Times(1) + mockTask.EXPECT().GetRunID().Return(constants.TestRunID).Times(1) mockTask.EXPECT().Priority().Return(noPriority).Times(1) mockTask.EXPECT().SetPriority(commonconstants.GetTaskPriority(commonconstants.HighPriorityClass, commonconstants.DefaultPrioritySubclass)).Times(1) @@ -183,15 +178,14 @@ func (s *taskPriorityAssignerSuite) TestAssign_StandbyTask_ActiveDomain() { } func (s *taskPriorityAssignerSuite) TestAssign_ActiveTask_StandbyDomain() { - constants.TestGlobalDomainEntry.GetReplicationConfig().ActiveClusterName = cluster.TestAlternativeClusterName - defer func() { - constants.TestGlobalDomainEntry.GetReplicationConfig().ActiveClusterName = cluster.TestCurrentClusterName - }() s.mockDomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(constants.TestGlobalDomainEntry, nil) + s.mockActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID).Return(&types.ActiveClusterInfo{ActiveClusterName: cluster.TestAlternativeClusterName}, nil) mockTask := NewMockTask(s.controller) mockTask.EXPECT().GetQueueType().Return(QueueTypeActiveTimer).AnyTimes() mockTask.EXPECT().GetDomainID().Return(constants.TestDomainID).Times(1) + mockTask.EXPECT().GetWorkflowID().Return(constants.TestWorkflowID).Times(1) + mockTask.EXPECT().GetRunID().Return(constants.TestRunID).Times(1) mockTask.EXPECT().Priority().Return(noPriority).Times(1) mockTask.EXPECT().SetPriority(commonconstants.GetTaskPriority(commonconstants.HighPriorityClass, commonconstants.DefaultPrioritySubclass)).Times(1) @@ -201,10 +195,13 @@ func (s *taskPriorityAssignerSuite) TestAssign_ActiveTask_StandbyDomain() { func (s *taskPriorityAssignerSuite) TestAssign_ActiveTransferTask_ActiveDomain() { s.mockDomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(constants.TestGlobalDomainEntry, nil) + s.mockActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID).Return(&types.ActiveClusterInfo{ActiveClusterName: cluster.TestCurrentClusterName}, nil) mockTask := NewMockTask(s.controller) mockTask.EXPECT().GetQueueType().Return(QueueTypeActiveTransfer).AnyTimes() mockTask.EXPECT().GetDomainID().Return(constants.TestDomainID).Times(1) + mockTask.EXPECT().GetWorkflowID().Return(constants.TestWorkflowID).Times(1) + mockTask.EXPECT().GetRunID().Return(constants.TestRunID).Times(1) mockTask.EXPECT().Priority().Return(noPriority).Times(1) mockTask.EXPECT().SetPriority(commonconstants.GetTaskPriority(commonconstants.HighPriorityClass, commonconstants.DefaultPrioritySubclass)).Times(1) @@ -214,10 +211,13 @@ func (s *taskPriorityAssignerSuite) TestAssign_ActiveTransferTask_ActiveDomain() func (s *taskPriorityAssignerSuite) TestAssign_ActiveTimerTask_ActiveDomain() { s.mockDomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(constants.TestGlobalDomainEntry, nil) + s.mockActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID).Return(&types.ActiveClusterInfo{ActiveClusterName: cluster.TestCurrentClusterName}, nil) mockTask := NewMockTask(s.controller) mockTask.EXPECT().GetQueueType().Return(QueueTypeActiveTimer).AnyTimes() mockTask.EXPECT().GetDomainID().Return(constants.TestDomainID).Times(1) + mockTask.EXPECT().GetWorkflowID().Return(constants.TestWorkflowID).Times(1) + mockTask.EXPECT().GetRunID().Return(constants.TestRunID).Times(1) mockTask.EXPECT().Priority().Return(noPriority).Times(1) mockTask.EXPECT().SetPriority(commonconstants.GetTaskPriority(commonconstants.HighPriorityClass, commonconstants.DefaultPrioritySubclass)).Times(1) @@ -227,11 +227,14 @@ func (s *taskPriorityAssignerSuite) TestAssign_ActiveTimerTask_ActiveDomain() { func (s *taskPriorityAssignerSuite) TestAssign_ThrottledTask() { s.mockDomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(constants.TestGlobalDomainEntry, nil).AnyTimes() + s.mockActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID).Return(&types.ActiveClusterInfo{ActiveClusterName: cluster.TestCurrentClusterName}, nil).AnyTimes() for i := 0; i != s.testTaskProcessRPS*2; i++ { mockTask := NewMockTask(s.controller) mockTask.EXPECT().GetQueueType().Return(QueueTypeActiveTimer).AnyTimes() mockTask.EXPECT().GetDomainID().Return(constants.TestDomainID).Times(1) + mockTask.EXPECT().GetWorkflowID().Return(constants.TestWorkflowID).Times(1) + mockTask.EXPECT().GetRunID().Return(constants.TestRunID).Times(1) mockTask.EXPECT().Priority().Return(noPriority).Times(1) if i < s.testTaskProcessRPS { mockTask.EXPECT().SetPriority(commonconstants.GetTaskPriority(commonconstants.HighPriorityClass, commonconstants.DefaultPrioritySubclass)).Times(1) diff --git a/service/history/task/standby_task_util.go b/service/history/task/standby_task_util.go index 9614100458d..fea1f0b92af 100644 --- a/service/history/task/standby_task_util.go +++ b/service/history/task/standby_task_util.go @@ -28,7 +28,6 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/activecluster" - "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/persistence" @@ -210,31 +209,15 @@ func getStandbyPostActionFn( func getRemoteClusterName( ctx context.Context, currentCluster string, - domainCache cache.DomainCache, activeClusterMgr activecluster.Manager, taskInfo persistence.Task, ) (string, error) { - domainEntry, err := domainCache.GetDomainByID(taskInfo.GetDomainID()) + activeClusterInfo, err := activeClusterMgr.GetActiveClusterInfoByWorkflow(ctx, taskInfo.GetDomainID(), taskInfo.GetWorkflowID(), taskInfo.GetRunID()) if err != nil { return "", err } - - if domainEntry.GetReplicationConfig().IsActiveActive() { - resp, err := activeClusterMgr.LookupWorkflow(ctx, taskInfo.GetDomainID(), taskInfo.GetWorkflowID(), taskInfo.GetRunID()) - if err != nil { - return "", err - } - if resp.ClusterName == currentCluster { - // domain has turned active, retry the task - return "", errDomainBecomesActive - } - return resp.ClusterName, nil - } - - remoteClusterName := domainEntry.GetReplicationConfig().ActiveClusterName - if remoteClusterName == currentCluster { - // domain has turned active, retry the task + if activeClusterInfo.ActiveClusterName == currentCluster { return "", errDomainBecomesActive } - return remoteClusterName, nil + return activeClusterInfo.ActiveClusterName, nil } diff --git a/service/history/task/standby_task_util_test.go b/service/history/task/standby_task_util_test.go index 73166502ea7..7e91184e2eb 100644 --- a/service/history/task/standby_task_util_test.go +++ b/service/history/task/standby_task_util_test.go @@ -9,7 +9,6 @@ import ( "go.uber.org/mock/gomock" "github.com/uber/cadence/common/activecluster" - "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" ) @@ -23,69 +22,21 @@ func TestGetRemoteClusterName(t *testing.T) { tests := []struct { name string - setupMocks func(*gomock.Controller) (cache.DomainCache, activecluster.Manager) + setupMocks func(*gomock.Controller) activecluster.Manager taskInfo persistence.Task expectedResult string expectedError error }{ - { - name: "domain cache error", - setupMocks: func(ctrl *gomock.Controller) (cache.DomainCache, activecluster.Manager) { - mockDomainCache := cache.NewMockDomainCache(ctrl) - mockActiveClusterMgr := activecluster.NewMockManager(ctrl) - - mockDomainCache.EXPECT(). - GetDomainByID(testDomainID). - Return(nil, errors.New("domain cache error")) - - return mockDomainCache, mockActiveClusterMgr - }, - taskInfo: &persistence.DecisionTask{ - WorkflowIdentifier: persistence.WorkflowIdentifier{ - DomainID: testDomainID, - WorkflowID: testWorkflowID, - RunID: testRunID, - }, - }, - expectedResult: "", - expectedError: errors.New("domain cache error"), - }, { name: "active-active domain with lookup error", - setupMocks: func(ctrl *gomock.Controller) (cache.DomainCache, activecluster.Manager) { - mockDomainCache := cache.NewMockDomainCache(ctrl) + setupMocks: func(ctrl *gomock.Controller) activecluster.Manager { mockActiveClusterMgr := activecluster.NewMockManager(ctrl) - domainEntry := cache.NewDomainCacheEntryForTest( - &persistence.DomainInfo{ID: testDomainID}, - &persistence.DomainConfig{}, - true, - &persistence.DomainReplicationConfig{ - Clusters: []*persistence.ClusterReplicationConfig{ - {ClusterName: currentCluster}, - {ClusterName: remoteCluster}, - }, - ActiveClusters: &types.ActiveClusters{ - ActiveClustersByRegion: map[string]types.ActiveClusterInfo{ - "region1": {ActiveClusterName: currentCluster}, - "region2": {ActiveClusterName: remoteCluster}, - }, - }, - }, - 0, - nil, - 0, - 0, - 0, - ) - mockDomainCache.EXPECT(). - GetDomainByID(testDomainID). - Return(domainEntry, nil) mockActiveClusterMgr.EXPECT(). - LookupWorkflow(gomock.Any(), testDomainID, testWorkflowID, testRunID). + GetActiveClusterInfoByWorkflow(gomock.Any(), testDomainID, testWorkflowID, testRunID). Return(nil, errors.New("lookup error")) - return mockDomainCache, mockActiveClusterMgr + return mockActiveClusterMgr }, taskInfo: &persistence.DecisionTask{ WorkflowIdentifier: persistence.WorkflowIdentifier{ @@ -99,42 +50,16 @@ func TestGetRemoteClusterName(t *testing.T) { }, { name: "active-active domain becomes active", - setupMocks: func(ctrl *gomock.Controller) (cache.DomainCache, activecluster.Manager) { - mockDomainCache := cache.NewMockDomainCache(ctrl) + setupMocks: func(ctrl *gomock.Controller) activecluster.Manager { mockActiveClusterMgr := activecluster.NewMockManager(ctrl) - domainEntry := cache.NewDomainCacheEntryForTest( - &persistence.DomainInfo{ID: testDomainID}, - &persistence.DomainConfig{}, - true, - &persistence.DomainReplicationConfig{ - Clusters: []*persistence.ClusterReplicationConfig{ - {ClusterName: currentCluster}, - {ClusterName: remoteCluster}, - }, - ActiveClusters: &types.ActiveClusters{ - ActiveClustersByRegion: map[string]types.ActiveClusterInfo{ - "region1": {ActiveClusterName: currentCluster}, - "region2": {ActiveClusterName: remoteCluster}, - }, - }, - }, - 0, - nil, - 0, - 0, - 0, - ) - mockDomainCache.EXPECT(). - GetDomainByID(testDomainID). - Return(domainEntry, nil) mockActiveClusterMgr.EXPECT(). - LookupWorkflow(gomock.Any(), testDomainID, testWorkflowID, testRunID). - Return(&activecluster.LookupResult{ - ClusterName: currentCluster, + GetActiveClusterInfoByWorkflow(gomock.Any(), testDomainID, testWorkflowID, testRunID). + Return(&types.ActiveClusterInfo{ + ActiveClusterName: currentCluster, }, nil) - return mockDomainCache, mockActiveClusterMgr + return mockActiveClusterMgr }, taskInfo: &persistence.DecisionTask{ WorkflowIdentifier: persistence.WorkflowIdentifier{ @@ -148,112 +73,16 @@ func TestGetRemoteClusterName(t *testing.T) { }, { name: "active-active domain successful lookup", - setupMocks: func(ctrl *gomock.Controller) (cache.DomainCache, activecluster.Manager) { - mockDomainCache := cache.NewMockDomainCache(ctrl) + setupMocks: func(ctrl *gomock.Controller) activecluster.Manager { mockActiveClusterMgr := activecluster.NewMockManager(ctrl) - domainEntry := cache.NewDomainCacheEntryForTest( - &persistence.DomainInfo{ID: testDomainID}, - &persistence.DomainConfig{}, - true, - &persistence.DomainReplicationConfig{ - Clusters: []*persistence.ClusterReplicationConfig{ - {ClusterName: currentCluster}, - {ClusterName: remoteCluster}, - }, - ActiveClusters: &types.ActiveClusters{ - ActiveClustersByRegion: map[string]types.ActiveClusterInfo{ - "region1": {ActiveClusterName: currentCluster}, - "region2": {ActiveClusterName: remoteCluster}, - }, - }, - }, - 0, - nil, - 0, - 0, - 0, - ) - mockDomainCache.EXPECT(). - GetDomainByID(testDomainID). - Return(domainEntry, nil) mockActiveClusterMgr.EXPECT(). - LookupWorkflow(gomock.Any(), testDomainID, testWorkflowID, testRunID). - Return(&activecluster.LookupResult{ - ClusterName: remoteCluster, - }, nil) - - return mockDomainCache, mockActiveClusterMgr - }, - taskInfo: &persistence.DecisionTask{ - WorkflowIdentifier: persistence.WorkflowIdentifier{ - DomainID: testDomainID, - WorkflowID: testWorkflowID, - RunID: testRunID, - }, - }, - expectedResult: remoteCluster, - expectedError: nil, - }, - { - name: "non-active-active domain becomes active", - setupMocks: func(ctrl *gomock.Controller) (cache.DomainCache, activecluster.Manager) { - mockDomainCache := cache.NewMockDomainCache(ctrl) - mockActiveClusterMgr := activecluster.NewMockManager(ctrl) - - domainEntry := cache.NewDomainCacheEntryForTest( - &persistence.DomainInfo{ID: testDomainID}, - &persistence.DomainConfig{}, - false, - &persistence.DomainReplicationConfig{ - ActiveClusterName: currentCluster, - }, - 0, - nil, - 0, - 0, - 0, - ) - mockDomainCache.EXPECT(). - GetDomainByID(testDomainID). - Return(domainEntry, nil) - - return mockDomainCache, mockActiveClusterMgr - }, - taskInfo: &persistence.DecisionTask{ - WorkflowIdentifier: persistence.WorkflowIdentifier{ - DomainID: testDomainID, - WorkflowID: testWorkflowID, - RunID: testRunID, - }, - }, - expectedResult: "", - expectedError: errors.New("domain becomes active when processing task as standby"), - }, - { - name: "non-active-active domain successful lookup", - setupMocks: func(ctrl *gomock.Controller) (cache.DomainCache, activecluster.Manager) { - mockDomainCache := cache.NewMockDomainCache(ctrl) - mockActiveClusterMgr := activecluster.NewMockManager(ctrl) - - domainEntry := cache.NewDomainCacheEntryForTest( - &persistence.DomainInfo{ID: testDomainID}, - &persistence.DomainConfig{}, - false, - &persistence.DomainReplicationConfig{ + GetActiveClusterInfoByWorkflow(gomock.Any(), testDomainID, testWorkflowID, testRunID). + Return(&types.ActiveClusterInfo{ ActiveClusterName: remoteCluster, - }, - 0, - nil, - 0, - 0, - 0, - ) - mockDomainCache.EXPECT(). - GetDomainByID(testDomainID). - Return(domainEntry, nil) + }, nil) - return mockDomainCache, mockActiveClusterMgr + return mockActiveClusterMgr }, taskInfo: &persistence.DecisionTask{ WorkflowIdentifier: persistence.WorkflowIdentifier{ @@ -272,12 +101,11 @@ func TestGetRemoteClusterName(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockDomainCache, mockActiveClusterMgr := tt.setupMocks(ctrl) + mockActiveClusterMgr := tt.setupMocks(ctrl) result, err := getRemoteClusterName( context.Background(), currentCluster, - mockDomainCache, mockActiveClusterMgr, tt.taskInfo, ) diff --git a/service/history/task/task_util.go b/service/history/task/task_util.go index 9d1b68036ac..d3f6edaf61d 100644 --- a/service/history/task/task_util.go +++ b/service/history/task/task_util.go @@ -345,11 +345,11 @@ func shouldPushToMatching( // For active-active domains, only push to matching if the workflow is active in current cluster // We may revisit this logic in the future. Current idea is to not pollute tasklists with passive workflows of active-active domains // because they would cause head-of-line blocking in the tasklist. Passive task completion logic doesn't apply to active-active domains. - lookupRes, err := shard.GetActiveClusterManager().LookupWorkflow(ctx, taskInfo.GetDomainID(), taskInfo.GetWorkflowID(), taskInfo.GetRunID()) + activeClusterInfo, err := shard.GetActiveClusterManager().GetActiveClusterInfoByWorkflow(ctx, taskInfo.GetDomainID(), taskInfo.GetWorkflowID(), taskInfo.GetRunID()) if err != nil { return false, err } - if lookupRes.ClusterName != shard.GetClusterMetadata().GetCurrentClusterName() { + if activeClusterInfo.ActiveClusterName != shard.GetClusterMetadata().GetCurrentClusterName() { return false, nil } diff --git a/service/history/task/task_util_test.go b/service/history/task/task_util_test.go index e24c4a24ee0..42dcf6aa1d8 100644 --- a/service/history/task/task_util_test.go +++ b/service/history/task/task_util_test.go @@ -877,9 +877,9 @@ func TestShouldPushToMatching(t *testing.T) { name: "active-active domain and workflow is active in current cluster so returns true", setupMock: func(mockDomainCache *cache.MockDomainCache, mockActiveClusterMgr *activecluster.MockManager) { mockDomainCache.EXPECT().GetDomainByID(domainID).Return(getDomainCacheEntry(true, true), nil) - mockActiveClusterMgr.EXPECT().LookupWorkflow(gomock.Any(), domainID, wfid, rid). - Return(&activecluster.LookupResult{ - ClusterName: currentClusterName, + mockActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), domainID, wfid, rid). + Return(&types.ActiveClusterInfo{ + ActiveClusterName: currentClusterName, }, nil) }, expectedResult: true, @@ -888,9 +888,9 @@ func TestShouldPushToMatching(t *testing.T) { name: "active-active domain and workflow is not active in current cluster so returns false", setupMock: func(mockDomainCache *cache.MockDomainCache, mockActiveClusterMgr *activecluster.MockManager) { mockDomainCache.EXPECT().GetDomainByID(domainID).Return(getDomainCacheEntry(true, true), nil) - mockActiveClusterMgr.EXPECT().LookupWorkflow(gomock.Any(), domainID, wfid, rid). - Return(&activecluster.LookupResult{ - ClusterName: "otherCluster", + mockActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), domainID, wfid, rid). + Return(&types.ActiveClusterInfo{ + ActiveClusterName: "otherCluster", }, nil) }, expectedResult: false, @@ -899,7 +899,7 @@ func TestShouldPushToMatching(t *testing.T) { name: "active-active domain - failed to lookup workflow", setupMock: func(mockDomainCache *cache.MockDomainCache, mockActiveClusterMgr *activecluster.MockManager) { mockDomainCache.EXPECT().GetDomainByID(domainID).Return(getDomainCacheEntry(true, true), nil) - mockActiveClusterMgr.EXPECT().LookupWorkflow(gomock.Any(), domainID, wfid, rid). + mockActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), domainID, wfid, rid). Return(nil, errors.New("failed to lookup workflow")) }, expectedResult: false, diff --git a/service/history/task/timer_active_task_executor_test.go b/service/history/task/timer_active_task_executor_test.go index 3c36a6330a7..dfb3932e0f0 100644 --- a/service/history/task/timer_active_task_executor_test.go +++ b/service/history/task/timer_active_task_executor_test.go @@ -134,6 +134,12 @@ func (s *timerActiveTaskExecutorSuite) SetupTest() { s.mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(constants.TestGlobalDomainEntry, nil).AnyTimes() s.mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return(constants.TestDomainName, nil).AnyTimes() + testActiveClusterInfo := &types.ActiveClusterInfo{ + ActiveClusterName: constants.TestGlobalDomainEntry.GetReplicationConfig().ActiveClusterName, + FailoverVersion: constants.TestGlobalDomainEntry.GetFailoverVersion(), + } + s.mockShard.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID).Return(testActiveClusterInfo, nil).AnyTimes() + s.logger = s.mockShard.GetLogger() s.executionCache = execution.NewCache(s.mockShard) s.timerActiveTaskExecutor = NewTimerActiveTaskExecutor( @@ -1114,6 +1120,7 @@ func (s *timerActiveTaskExecutorSuite) TestWorkflowTimeout_ContinueAsNew_Retry() // one for current workflow, one for new s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&persistence.AppendHistoryNodesResponse{}, nil).Times(2) s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{}}, nil).Once() + s.mockShard.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), s.domainID, gomock.Any()).Return(&types.ActiveClusterInfo{}, nil).AnyTimes() _, err = s.timerActiveTaskExecutor.Execute(timerTask) s.NoError(err) @@ -1152,6 +1159,8 @@ func (s *timerActiveTaskExecutorSuite) TestWorkflowTimeout_ContinueAsNew_Cron() s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&persistence.AppendHistoryNodesResponse{}, nil).Times(2) s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{}}, nil).Once() + s.mockShard.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByClusterAttribute(gomock.Any(), s.domainID, gomock.Any()).Return(&types.ActiveClusterInfo{}, nil).AnyTimes() + _, err = s.timerActiveTaskExecutor.Execute(timerTask) s.NoError(err) diff --git a/service/history/task/timer_standby_task_executor.go b/service/history/task/timer_standby_task_executor.go index 7a80fa7ad58..33910ead4c8 100644 --- a/service/history/task/timer_standby_task_executor.go +++ b/service/history/task/timer_standby_task_executor.go @@ -78,7 +78,7 @@ func NewTimerStandbyTaskExecutor( historyResender: historyResender, getRemoteClusterNameFn: func(ctx context.Context, taskInfo persistence.Task) (string, error) { if shard.GetConfig().EnableTimerQueueV2(shard.GetShardID()) { - return getRemoteClusterName(ctx, shard.GetClusterMetadata().GetCurrentClusterName(), shard.GetDomainCache(), shard.GetActiveClusterManager(), taskInfo) + return getRemoteClusterName(ctx, shard.GetClusterMetadata().GetCurrentClusterName(), shard.GetActiveClusterManager(), taskInfo) } return clusterName, nil }, diff --git a/service/history/task/timer_standby_task_executor_test.go b/service/history/task/timer_standby_task_executor_test.go index 971c5b12787..0d3a159fca2 100644 --- a/service/history/task/timer_standby_task_executor_test.go +++ b/service/history/task/timer_standby_task_executor_test.go @@ -134,6 +134,12 @@ func (s *timerStandbyTaskExecutorSuite) SetupTest() { s.mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(constants.TestGlobalDomainEntry, nil).AnyTimes() s.mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return(constants.TestDomainName, nil).AnyTimes() + testActiveClusterInfo := &types.ActiveClusterInfo{ + ActiveClusterName: constants.TestGlobalDomainEntry.GetReplicationConfig().ActiveClusterName, + FailoverVersion: constants.TestGlobalDomainEntry.GetFailoverVersion(), + } + s.mockShard.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID).Return(testActiveClusterInfo, nil).AnyTimes() + s.logger = s.mockShard.GetLogger() s.timerStandbyTaskExecutor = NewTimerStandbyTaskExecutor( s.mockShard, diff --git a/service/history/task/timer_task_executor_base_test.go b/service/history/task/timer_task_executor_base_test.go index c06ad13c529..b44eac76873 100644 --- a/service/history/task/timer_task_executor_base_test.go +++ b/service/history/task/timer_task_executor_base_test.go @@ -325,6 +325,7 @@ func TestExecuteDeleteHistoryEventTask(t *testing.T) { 0, 0, ), nil).AnyTimes() + mockShard.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), "domain", "wf", "run").Return(&types.ActiveClusterInfo{}, nil).AnyTimes() timerTask := &persistence.DeleteHistoryEventTask{ TaskData: persistence.TaskData{ @@ -361,6 +362,9 @@ func TestExecuteDeleteHistoryEventTask(t *testing.T) { State: &persistence.WorkflowMutableState{ ExecutionStats: &persistence.ExecutionStats{}, ExecutionInfo: &persistence.WorkflowExecutionInfo{ + DomainID: "domain", + WorkflowID: "wf", + RunID: "run", CloseStatus: 0, State: persistence.WorkflowStateRunning, }, diff --git a/service/history/task/transfer_active_task_executor_test.go b/service/history/task/transfer_active_task_executor_test.go index b1d50255536..1b98bc37ae4 100644 --- a/service/history/task/transfer_active_task_executor_test.go +++ b/service/history/task/transfer_active_task_executor_test.go @@ -170,6 +170,12 @@ func (s *transferActiveTaskExecutorSuite) SetupTest() { s.mockDomainCache.EXPECT().GetDomainName(constants.TestRateLimitedDomainID).Return(constants.TestRateLimitedDomainName, nil).AnyTimes() s.mockDomainCache.EXPECT().GetDomain(constants.TestRateLimitedDomainName).Return(constants.TestRateLimitedDomainEntry, nil).AnyTimes() + testActiveClusterInfo := &types.ActiveClusterInfo{ + ActiveClusterName: constants.TestGlobalDomainEntry.GetReplicationConfig().ActiveClusterName, + FailoverVersion: constants.TestGlobalDomainEntry.GetFailoverVersion(), + } + s.mockShard.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID).Return(testActiveClusterInfo, nil).AnyTimes() + s.logger = s.mockShard.GetLogger() s.transferActiveTaskExecutor = NewTransferActiveTaskExecutor( s.mockShard, diff --git a/service/history/task/transfer_standby_task_executor.go b/service/history/task/transfer_standby_task_executor.go index 5d519041153..11d332e00c7 100644 --- a/service/history/task/transfer_standby_task_executor.go +++ b/service/history/task/transfer_standby_task_executor.go @@ -71,7 +71,7 @@ func NewTransferStandbyTaskExecutor( historyResender: historyResender, getRemoteClusterNameFn: func(ctx context.Context, taskInfo persistence.Task) (string, error) { if shard.GetConfig().EnableTransferQueueV2(shard.GetShardID()) { - return getRemoteClusterName(ctx, shard.GetClusterMetadata().GetCurrentClusterName(), shard.GetDomainCache(), shard.GetActiveClusterManager(), taskInfo) + return getRemoteClusterName(ctx, shard.GetClusterMetadata().GetCurrentClusterName(), shard.GetActiveClusterManager(), taskInfo) } return clusterName, nil }, diff --git a/service/history/task/transfer_standby_task_executor_test.go b/service/history/task/transfer_standby_task_executor_test.go index 7cf2551d14a..5d4eaf2d661 100644 --- a/service/history/task/transfer_standby_task_executor_test.go +++ b/service/history/task/transfer_standby_task_executor_test.go @@ -147,6 +147,12 @@ func (s *transferStandbyTaskExecutorSuite) SetupTest() { s.mockDomainCache.EXPECT().GetDomain(constants.TestRateLimitedDomainName).Return(constants.TestRateLimitedDomainEntry, nil).AnyTimes() s.mockDomainCache.EXPECT().GetDomainID(constants.TestRateLimitedDomainName).Return(constants.TestRateLimitedDomainID, nil).AnyTimes() + testActiveClusterInfo := &types.ActiveClusterInfo{ + ActiveClusterName: constants.TestGlobalDomainEntry.GetReplicationConfig().ActiveClusterName, + FailoverVersion: constants.TestGlobalDomainEntry.GetFailoverVersion(), + } + s.mockShard.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID).Return(testActiveClusterInfo, nil).AnyTimes() + s.logger = s.mockShard.GetLogger() s.clusterName = cluster.TestAlternativeClusterName s.transferStandbyTaskExecutor = NewTransferStandbyTaskExecutor( diff --git a/service/history/workflow/util_test.go b/service/history/workflow/util_test.go index e8ce681ec9b..944a739c874 100644 --- a/service/history/workflow/util_test.go +++ b/service/history/workflow/util_test.go @@ -33,6 +33,7 @@ import ( commonconstants "github.com/uber/cadence/common/constants" "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/constants" "github.com/uber/cadence/service/history/execution" @@ -139,6 +140,7 @@ func TestWorkflowLoad(t *testing.T) { }, nil, ).Times(1) + mockShard.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID).Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil).AnyTimes() }, }, { @@ -152,6 +154,7 @@ func TestWorkflowLoad(t *testing.T) { }, nil, ).Times(1) + mockShard.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID).Return(&types.ActiveClusterInfo{ActiveClusterName: "test-active-cluster"}, nil).AnyTimes() mockShard.Resource.ExecutionMgr.On("GetCurrentExecution", mock.Anything, mock.Anything).Return( &persistence.GetCurrentExecutionResponse{ RunID: constants.TestRunID,