Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions common/domain/attrValidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,40 @@ func (d *AttrValidatorImpl) validateClusterName(
}
return nil
}

func (d *AttrValidatorImpl) validateActiveActiveDomainReplicationConfig(
activeClusters *types.ActiveClusters,
) error {

clusters := d.clusterMetadata.GetEnabledClusterInfo()

for _, scopeData := range activeClusters.AttributeScopes {
for _, activeCluster := range scopeData.ClusterAttributes {
_, ok := clusters[activeCluster.ActiveClusterName]
if !ok {
return &types.BadRequestError{Message: fmt.Sprintf(
"Invalid active cluster name: %v",
activeCluster.ActiveClusterName,
)}
}
if activeCluster.FailoverVersion < 0 {
return &types.BadRequestError{Message: fmt.Sprintf(
"invalid failover version: %d",
activeCluster.FailoverVersion,
)}
}
}
}

// todo (david.porter) remove this once we have completely migrated to AttributeScopes
for _, activeCluster := range activeClusters.ActiveClustersByRegion {
if _, ok := clusters[activeCluster.ActiveClusterName]; !ok {
return &types.BadRequestError{Message: fmt.Sprintf(
"Invalid active cluster name: %v",
activeCluster.ActiveClusterName,
)}
}
}

return nil
}
146 changes: 146 additions & 0 deletions common/domain/attrValidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ package domain
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"

"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
)
Expand Down Expand Up @@ -333,3 +337,145 @@ func (s *attrValidatorSuite) TestValidateDomainReplicationConfigClustersDoesNotR
)
s.IsType(&types.BadRequestError{}, err)
}

func TestValidateActiveActiveDomainReplicationConfig(t *testing.T) {
// Define test cluster names
const (
clusterA = "cluster-a"
clusterB = "cluster-b"
)

// Create explicit cluster metadata for the test
clusterMetadata := cluster.NewMetadata(
config.ClusterGroupMetadata{
FailoverVersionIncrement: 10,
PrimaryClusterName: clusterA,
CurrentClusterName: clusterA,
ClusterGroup: map[string]config.ClusterInformation{
clusterA: {
Enabled: true,
InitialFailoverVersion: 1,
},
clusterB: {
Enabled: true,
InitialFailoverVersion: 2,
},
},
},
func(d string) bool { return false },
metrics.NewNoopMetricsClient(),
log.NewNoop(),
)

validator := newAttrValidator(clusterMetadata, 1)

testCases := []struct {
name string
activeClusters *types.ActiveClusters
expectedErr bool
errType interface{}
}{
{
name: "valid clusters in ActiveClustersByRegion only",
activeClusters: &types.ActiveClusters{
AttributeScopes: map[string]types.ClusterAttributeScope{
"city": {
ClusterAttributes: map[string]types.ActiveClusterInfo{
"portland": {
ActiveClusterName: clusterB,
FailoverVersion: 200,
},
},
},
},
ActiveClustersByRegion: map[string]types.ActiveClusterInfo{
"region1": {
ActiveClusterName: clusterA,
FailoverVersion: 100,
},
"region2": {
ActiveClusterName: clusterB,
FailoverVersion: 200,
},
},
},
expectedErr: false,
},
{
name: "invalid cluster in ActiveClustersByRegion",
activeClusters: &types.ActiveClusters{
ActiveClustersByRegion: map[string]types.ActiveClusterInfo{
"region1": {
ActiveClusterName: "invalid-cluster",
FailoverVersion: 100,
},
},
},
expectedErr: true,
errType: &types.BadRequestError{},
},
{
name: "invalid cluster in AttributeScopes",
activeClusters: &types.ActiveClusters{
AttributeScopes: map[string]types.ClusterAttributeScope{
"city": {
ClusterAttributes: map[string]types.ActiveClusterInfo{
"seattle": {
ActiveClusterName: "invalid-cluster",
FailoverVersion: 100,
},
},
},
},
},
expectedErr: true,
errType: &types.BadRequestError{},
},
{
name: "empty ActiveClusters - all maps nil",
activeClusters: &types.ActiveClusters{
ActiveClustersByRegion: nil,
AttributeScopes: nil,
},
expectedErr: false,
},
{
name: "empty ActiveClusters - maps initialized but empty",
activeClusters: &types.ActiveClusters{
ActiveClustersByRegion: map[string]types.ActiveClusterInfo{},
AttributeScopes: map[string]types.ClusterAttributeScope{},
},
expectedErr: false,
},
{
name: "an invalid failover version should return an error",
activeClusters: &types.ActiveClusters{
AttributeScopes: map[string]types.ClusterAttributeScope{
"city": {
ClusterAttributes: map[string]types.ActiveClusterInfo{
"seattle": {
ActiveClusterName: clusterA,
FailoverVersion: -1,
},
},
},
},
},
expectedErr: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := validator.validateActiveActiveDomainReplicationConfig(tc.activeClusters)
if tc.expectedErr {
assert.Error(t, err)
if tc.errType != nil {
assert.IsType(t, tc.errType, err)
}
} else {
assert.NoError(t, err)
}
})
}
}
47 changes: 36 additions & 11 deletions common/domain/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1566,12 +1566,15 @@ func (d *handlerImpl) updateReplicationConfig(
domainName string,
config *persistence.DomainReplicationConfig,
updateRequest *types.UpdateDomainRequest,
) (*persistence.DomainReplicationConfig, bool, bool, error) {
) (
mutatedCfg *persistence.DomainReplicationConfig,
replicationConfigChanged bool, // this being turned on will trigger an increment in the configVersion
activeClusterChanged bool, // this indicates a failover is happening and a failover version is to be incremented
err error,
) {

clusterUpdated := false
activeClusterUpdated := false
if len(updateRequest.Clusters) != 0 {
clusterUpdated = true
replicationConfigChanged = true
clustersNew := []*persistence.ClusterReplicationConfig{}
for _, clusterConfig := range updateRequest.Clusters {
clustersNew = append(clustersNew, &persistence.ClusterReplicationConfig{
Expand All @@ -1589,11 +1592,33 @@ func (d *handlerImpl) updateReplicationConfig(
}

if updateRequest.ActiveClusterName != nil {
activeClusterUpdated = true
activeClusterChanged = true
config.ActiveClusterName = *updateRequest.ActiveClusterName
}

if updateRequest.ActiveClusters != nil && updateRequest.ActiveClusters.ActiveClustersByRegion != nil {
if updateRequest.ActiveClusters != nil {
return d.updateReplicationConfigForActiveActive(domainName, config, updateRequest)
}

return config, replicationConfigChanged, activeClusterChanged, nil
}

func (d *handlerImpl) updateReplicationConfigForActiveActive(
domainName string,
config *persistence.DomainReplicationConfig,
updateRequest *types.UpdateDomainRequest,
) (
mutatedCfg *persistence.DomainReplicationConfig,
replicationConfigChanged bool, // this being turned on will trigger an increment in the configVersion
activeClusterChanged bool, // this indicates a failover is happening and a failover version is to be incremented
err error,
) {

if err := d.domainAttrValidator.validateActiveActiveDomainReplicationConfig(updateRequest.ActiveClusters); err != nil {
return nil, false, false, err
}

if updateRequest.ActiveClusters.ActiveClustersByRegion != nil {
existingActiveClusters := config.ActiveClusters
if existingActiveClusters == nil { // migration from active-passive to active-active
existingActiveClusters = &types.ActiveClusters{
Expand Down Expand Up @@ -1639,10 +1664,10 @@ func (d *handlerImpl) updateReplicationConfig(
ActiveClustersByRegion: finalActiveClusters,
}
d.logger.Debugf("Setting active clusters to %v, updateRequest.ActiveClusters.ActiveClustersByRegion: %v", finalActiveClusters, updateRequest.ActiveClusters.ActiveClustersByRegion)
activeClusterUpdated = true
activeClusterChanged = true
}

if updateRequest != nil && updateRequest.ActiveClusters != nil && updateRequest.ActiveClusters.AttributeScopes != nil {
if updateRequest.ActiveClusters.AttributeScopes != nil {
result, isCh := d.buildActiveActiveClusterScopesFromUpdateRequest(updateRequest, config, domainName)
if isCh {

Expand All @@ -1653,12 +1678,12 @@ func (d *handlerImpl) updateReplicationConfig(
}

config.ActiveClusters.AttributeScopes = result.AttributeScopes
activeClusterUpdated = true
clusterUpdated = true
activeClusterChanged = true
replicationConfigChanged = true
}
}

return config, clusterUpdated, activeClusterUpdated, nil
return config, replicationConfigChanged, activeClusterChanged, nil
}

func (d *handlerImpl) handleGracefulFailover(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ operations:
activeClusterSelectionPolicy:
clusterAttribute:
scope: region
name: region0
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need the clusters to be valid

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not the cluster name. It's the cluster attribute name

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right, I'm confused 🤦

name: cluster0

# start workflow in cluster1
- op: start_workflow
Expand All @@ -51,7 +51,7 @@ operations:
activeClusterSelectionPolicy:
clusterAttribute:
scope: region
name: region1
name: cluster1

# validate that wf1 is started in cluster0 and completed in cluster1
- op: validate
Expand Down
Loading