diff --git a/common/domain/attrValidator.go b/common/domain/attrValidator.go index f48efd57096..ed9e02ce125 100644 --- a/common/domain/attrValidator.go +++ b/common/domain/attrValidator.go @@ -182,3 +182,40 @@ func (d *AttrValidatorImpl) validateClusterName( } return nil } + +func (d *AttrValidatorImpl) validateActiveActiveDomainReplicationConfig( + activeClusters *types.ActiveClusters, +) error { + + clusters := d.clusterMetadata.GetEnabledClusterInfo() + + for _, scopeData := range activeClusters.AttributeScopes { + for _, activeCluster := range scopeData.ClusterAttributes { + _, ok := clusters[activeCluster.ActiveClusterName] + if !ok { + return &types.BadRequestError{Message: fmt.Sprintf( + "Invalid active cluster name: %v", + activeCluster.ActiveClusterName, + )} + } + if activeCluster.FailoverVersion < 0 { + return &types.BadRequestError{Message: fmt.Sprintf( + "invalid failover version: %d", + activeCluster.FailoverVersion, + )} + } + } + } + + // todo (david.porter) remove this once we have completely migrated to AttributeScopes + for _, activeCluster := range activeClusters.ActiveClustersByRegion { + if _, ok := clusters[activeCluster.ActiveClusterName]; !ok { + return &types.BadRequestError{Message: fmt.Sprintf( + "Invalid active cluster name: %v", + activeCluster.ActiveClusterName, + )} + } + } + + return nil +} diff --git a/common/domain/attrValidator_test.go b/common/domain/attrValidator_test.go index f542de119de..1203007e9e8 100644 --- a/common/domain/attrValidator_test.go +++ b/common/domain/attrValidator_test.go @@ -23,9 +23,13 @@ package domain import ( "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" ) @@ -333,3 +337,145 @@ func (s *attrValidatorSuite) TestValidateDomainReplicationConfigClustersDoesNotR ) s.IsType(&types.BadRequestError{}, err) } + +func TestValidateActiveActiveDomainReplicationConfig(t *testing.T) { + // Define test cluster names + const ( + clusterA = "cluster-a" + clusterB = "cluster-b" + ) + + // Create explicit cluster metadata for the test + clusterMetadata := cluster.NewMetadata( + config.ClusterGroupMetadata{ + FailoverVersionIncrement: 10, + PrimaryClusterName: clusterA, + CurrentClusterName: clusterA, + ClusterGroup: map[string]config.ClusterInformation{ + clusterA: { + Enabled: true, + InitialFailoverVersion: 1, + }, + clusterB: { + Enabled: true, + InitialFailoverVersion: 2, + }, + }, + }, + func(d string) bool { return false }, + metrics.NewNoopMetricsClient(), + log.NewNoop(), + ) + + validator := newAttrValidator(clusterMetadata, 1) + + testCases := []struct { + name string + activeClusters *types.ActiveClusters + expectedErr bool + errType interface{} + }{ + { + name: "valid clusters in ActiveClustersByRegion only", + activeClusters: &types.ActiveClusters{ + AttributeScopes: map[string]types.ClusterAttributeScope{ + "city": { + ClusterAttributes: map[string]types.ActiveClusterInfo{ + "portland": { + ActiveClusterName: clusterB, + FailoverVersion: 200, + }, + }, + }, + }, + ActiveClustersByRegion: map[string]types.ActiveClusterInfo{ + "region1": { + ActiveClusterName: clusterA, + FailoverVersion: 100, + }, + "region2": { + ActiveClusterName: clusterB, + FailoverVersion: 200, + }, + }, + }, + expectedErr: false, + }, + { + name: "invalid cluster in ActiveClustersByRegion", + activeClusters: &types.ActiveClusters{ + ActiveClustersByRegion: map[string]types.ActiveClusterInfo{ + "region1": { + ActiveClusterName: "invalid-cluster", + FailoverVersion: 100, + }, + }, + }, + expectedErr: true, + errType: &types.BadRequestError{}, + }, + { + name: "invalid cluster in AttributeScopes", + activeClusters: &types.ActiveClusters{ + AttributeScopes: map[string]types.ClusterAttributeScope{ + "city": { + ClusterAttributes: map[string]types.ActiveClusterInfo{ + "seattle": { + ActiveClusterName: "invalid-cluster", + FailoverVersion: 100, + }, + }, + }, + }, + }, + expectedErr: true, + errType: &types.BadRequestError{}, + }, + { + name: "empty ActiveClusters - all maps nil", + activeClusters: &types.ActiveClusters{ + ActiveClustersByRegion: nil, + AttributeScopes: nil, + }, + expectedErr: false, + }, + { + name: "empty ActiveClusters - maps initialized but empty", + activeClusters: &types.ActiveClusters{ + ActiveClustersByRegion: map[string]types.ActiveClusterInfo{}, + AttributeScopes: map[string]types.ClusterAttributeScope{}, + }, + expectedErr: false, + }, + { + name: "an invalid failover version should return an error", + activeClusters: &types.ActiveClusters{ + AttributeScopes: map[string]types.ClusterAttributeScope{ + "city": { + ClusterAttributes: map[string]types.ActiveClusterInfo{ + "seattle": { + ActiveClusterName: clusterA, + FailoverVersion: -1, + }, + }, + }, + }, + }, + expectedErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := validator.validateActiveActiveDomainReplicationConfig(tc.activeClusters) + if tc.expectedErr { + assert.Error(t, err) + if tc.errType != nil { + assert.IsType(t, tc.errType, err) + } + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/common/domain/handler.go b/common/domain/handler.go index 35b1e901949..5e36b1bfdbd 100644 --- a/common/domain/handler.go +++ b/common/domain/handler.go @@ -1566,12 +1566,15 @@ func (d *handlerImpl) updateReplicationConfig( domainName string, config *persistence.DomainReplicationConfig, updateRequest *types.UpdateDomainRequest, -) (*persistence.DomainReplicationConfig, bool, bool, error) { +) ( + mutatedCfg *persistence.DomainReplicationConfig, + replicationConfigChanged bool, // this being turned on will trigger an increment in the configVersion + activeClusterChanged bool, // this indicates a failover is happening and a failover version is to be incremented + err error, +) { - clusterUpdated := false - activeClusterUpdated := false if len(updateRequest.Clusters) != 0 { - clusterUpdated = true + replicationConfigChanged = true clustersNew := []*persistence.ClusterReplicationConfig{} for _, clusterConfig := range updateRequest.Clusters { clustersNew = append(clustersNew, &persistence.ClusterReplicationConfig{ @@ -1589,11 +1592,33 @@ func (d *handlerImpl) updateReplicationConfig( } if updateRequest.ActiveClusterName != nil { - activeClusterUpdated = true + activeClusterChanged = true config.ActiveClusterName = *updateRequest.ActiveClusterName } - if updateRequest.ActiveClusters != nil && updateRequest.ActiveClusters.ActiveClustersByRegion != nil { + if updateRequest.ActiveClusters != nil { + return d.updateReplicationConfigForActiveActive(domainName, config, updateRequest) + } + + return config, replicationConfigChanged, activeClusterChanged, nil +} + +func (d *handlerImpl) updateReplicationConfigForActiveActive( + domainName string, + config *persistence.DomainReplicationConfig, + updateRequest *types.UpdateDomainRequest, +) ( + mutatedCfg *persistence.DomainReplicationConfig, + replicationConfigChanged bool, // this being turned on will trigger an increment in the configVersion + activeClusterChanged bool, // this indicates a failover is happening and a failover version is to be incremented + err error, +) { + + if err := d.domainAttrValidator.validateActiveActiveDomainReplicationConfig(updateRequest.ActiveClusters); err != nil { + return nil, false, false, err + } + + if updateRequest.ActiveClusters.ActiveClustersByRegion != nil { existingActiveClusters := config.ActiveClusters if existingActiveClusters == nil { // migration from active-passive to active-active existingActiveClusters = &types.ActiveClusters{ @@ -1639,10 +1664,10 @@ func (d *handlerImpl) updateReplicationConfig( ActiveClustersByRegion: finalActiveClusters, } d.logger.Debugf("Setting active clusters to %v, updateRequest.ActiveClusters.ActiveClustersByRegion: %v", finalActiveClusters, updateRequest.ActiveClusters.ActiveClustersByRegion) - activeClusterUpdated = true + activeClusterChanged = true } - if updateRequest != nil && updateRequest.ActiveClusters != nil && updateRequest.ActiveClusters.AttributeScopes != nil { + if updateRequest.ActiveClusters.AttributeScopes != nil { result, isCh := d.buildActiveActiveClusterScopesFromUpdateRequest(updateRequest, config, domainName) if isCh { @@ -1653,12 +1678,12 @@ func (d *handlerImpl) updateReplicationConfig( } config.ActiveClusters.AttributeScopes = result.AttributeScopes - activeClusterUpdated = true - clusterUpdated = true + activeClusterChanged = true + replicationConfigChanged = true } } - return config, clusterUpdated, activeClusterUpdated, nil + return config, replicationConfigChanged, activeClusterChanged, nil } func (d *handlerImpl) handleGracefulFailover( diff --git a/simulation/replication/testdata/replication_simulation_activeactive.yaml b/simulation/replication/testdata/replication_simulation_activeactive.yaml index e44d8c43c83..65e93a8cd0a 100644 --- a/simulation/replication/testdata/replication_simulation_activeactive.yaml +++ b/simulation/replication/testdata/replication_simulation_activeactive.yaml @@ -37,7 +37,7 @@ operations: activeClusterSelectionPolicy: clusterAttribute: scope: region - name: region0 + name: cluster0 # start workflow in cluster1 - op: start_workflow @@ -51,7 +51,7 @@ operations: activeClusterSelectionPolicy: clusterAttribute: scope: region - name: region1 + name: cluster1 # validate that wf1 is started in cluster0 and completed in cluster1 - op: validate