Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,17 @@ func (s *server) startService() common.Daemon {
if len(s.cfg.ShardDistributorMatchingConfig.Namespaces) > 1 {
s.logger.Fatal("spectator does not support multiple namespaces", tag.Value(s.cfg.ShardDistributorMatchingConfig.Namespaces))
}
matchingShardDistributionMode := dc.GetStringProperty(dynamicproperties.MatchingShardDistributionMode)

spectatorParams := spectatorclient.Params{
Client: shardDistributorClient,
MetricsScope: params.MetricScope,
Logger: params.Logger,
Config: s.cfg.ShardDistributorMatchingConfig,
TimeSource: clock.NewRealTimeSource(),
Enabled: func() bool {
return membership.ModeKey(matchingShardDistributionMode()) != membership.ModeKeyHashRing
},
}
namespace := s.cfg.ShardDistributorMatchingConfig.Namespaces[0].Namespace
spectator, err = spectatorclient.NewSpectatorWithNamespace(
Expand Down
28 changes: 16 additions & 12 deletions common/membership/sharddistributorresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@ import (
"github.com/uber/cadence/service/sharddistributor/client/spectatorclient"
)

type modeKey string

var (
modeKeyHashRing modeKey = "hash_ring"
modeKeyShardDistributor modeKey = "shard_distributor"
modeKeyHashRingShadowShardDistributor modeKey = "hash_ring-shadow-shard_distributor"
modeKeyShardDistributorShadowHashRing modeKey = "shard_distributor-shadow-hash_ring"
type ModeKey string

const (
// ModeKeyHashRing represents the hash ring shard distribution mode
ModeKeyHashRing ModeKey = "hash_ring"
// ModeKeyShardDistributor represents the shard distributor mode
ModeKeyShardDistributor ModeKey = "shard_distributor"
// ModeKeyHashRingShadowShardDistributor represents hash ring mode with shard distributor shadow
ModeKeyHashRingShadowShardDistributor ModeKey = "hash_ring-shadow-shard_distributor"
// ModeKeyShardDistributorShadowHashRing represents shard distributor mode with hash ring shadow
ModeKeyShardDistributorShadowHashRing ModeKey = "shard_distributor-shadow-hash_ring"
)

type shardDistributorResolver struct {
Expand Down Expand Up @@ -87,12 +91,12 @@ func (s shardDistributorResolver) Lookup(key string) (HostInfo, error) {
return s.ring.Lookup(key)
}

switch modeKey(s.shardDistributionMode()) {
case modeKeyHashRing:
switch ModeKey(s.shardDistributionMode()) {
case ModeKeyHashRing:
return s.ring.Lookup(key)
case modeKeyShardDistributor:
case ModeKeyShardDistributor:
return s.lookUpInShardDistributor(key)
case modeKeyHashRingShadowShardDistributor:
case ModeKeyHashRingShadowShardDistributor:
hashRingResult, err := s.ring.Lookup(key)
if err != nil {
return HostInfo{}, err
Expand All @@ -108,7 +112,7 @@ func (s shardDistributorResolver) Lookup(key string) (HostInfo, error) {
}()

return hashRingResult, nil
case modeKeyShardDistributorShadowHashRing:
case ModeKeyShardDistributorShadowHashRing:
shardDistributorResult, err := s.lookUpInShardDistributor(key)
if err != nil {
return HostInfo{}, err
Expand Down
8 changes: 4 additions & 4 deletions common/membership/sharddistributorresolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
func TestShardDistributorResolver_Lookup_modeHashRing(t *testing.T) {
resolver, ring, _ := newShardDistributorResolver(t)
resolver.shardDistributionMode = func(...dynamicproperties.FilterOption) string {
return string(modeKeyHashRing)
return string(ModeKeyHashRing)
}

ring.EXPECT().Lookup("test-key").Return(HostInfo{addr: "test-addr"}, nil)
Expand All @@ -51,7 +51,7 @@ func TestShardDistributorResolver_Lookup_modeHashRing(t *testing.T) {
func TestShardDistributorResolver_Lookup_modeShardDistributor(t *testing.T) {
resolver, _, shardDistributorMock := newShardDistributorResolver(t)
resolver.shardDistributionMode = func(...dynamicproperties.FilterOption) string {
return string(modeKeyShardDistributor)
return string(ModeKeyShardDistributor)
}

shardDistributorMock.EXPECT().GetShardOwner(gomock.Any(), "test-key").
Expand All @@ -72,7 +72,7 @@ func TestShardDistributorResolver_Lookup_modeShardDistributor(t *testing.T) {
func TestShardDistributorResolver_Lookup_modeHashRingShadowShardDistributor(t *testing.T) {
resolver, ring, shardDistributorMock := newShardDistributorResolver(t)
resolver.shardDistributionMode = func(...dynamicproperties.FilterOption) string {
return string(modeKeyHashRingShadowShardDistributor)
return string(ModeKeyHashRingShadowShardDistributor)
}

cases := []struct {
Expand Down Expand Up @@ -158,7 +158,7 @@ func TestShardDistributorResolver_Lookup_modeHashRingShadowShardDistributor(t *t
func TestShardDistributorResolver_Lookup_modeShardDistributorShadowHashRing(t *testing.T) {
resolver, ring, shardDistributorMock := newShardDistributorResolver(t)
resolver.shardDistributionMode = func(...dynamicproperties.FilterOption) string {
return string(modeKeyShardDistributorShadowHashRing)
return string(ModeKeyShardDistributorShadowHashRing)
}

cases := []struct {
Expand Down