Skip to content
15 changes: 15 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1494,6 +1494,9 @@ const (
// The scope for the shard distributor executor
ShardDistributorExecutorScope

// ShardDistributorWatchScope tracks etcd watch stream processing
ShardDistributorWatchScope

NumShardDistributorScopes
)

Expand Down Expand Up @@ -2182,6 +2185,7 @@ var ScopeDefs = map[ServiceIdx]map[ScopeIdx]scopeDefinition{
ShardDistributorStoreSubscribeToExecutorStatusChangesScope: {operation: "StoreSubscribeToExecutorStatusChanges"},
ShardDistributorStoreSubscribeToAssignmentChangesScope: {operation: "StoreSubscribeToAssignmentChanges"},
ShardDistributorStoreDeleteAssignedStatesScope: {operation: "StoreDeleteAssignedStates"},
ShardDistributorWatchScope: {operation: "Watch"},
},
}

Expand Down Expand Up @@ -3000,6 +3004,13 @@ const (
// ShardDistributorShardHandoverLatency measures the time taken to hand over a shard from one executor to another
ShardDistributorShardHandoverLatency

// ShardDistributorWatchConsumerLag measures the lag between latest etcd revision and last processed revision
ShardDistributorWatchConsumerLag
// ShardDistributorWatchProcessingLatency measures how long it takes to process a single WatchResponse
ShardDistributorWatchProcessingLatency
// ShardDistributorWatchEventsReceived counts the total number of watch events received
ShardDistributorWatchEventsReceived

NumShardDistributorMetrics
)

Expand Down Expand Up @@ -3802,6 +3813,10 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{

ShardDistributorShardAssignmentDistributionLatency: {metricName: "shard_distributor_shard_assignment_distribution_latency", metricType: Histogram, buckets: ShardDistributorShardAssignmentLatencyBuckets},
ShardDistributorShardHandoverLatency: {metricName: "shard_distributor_shard_handover_latency", metricType: Histogram, buckets: ShardDistributorShardAssignmentLatencyBuckets},

ShardDistributorWatchConsumerLag: {metricName: "shard_distributor_watch_consumer_lag", metricType: Gauge},
ShardDistributorWatchProcessingLatency: {metricName: "shard_distributor_watch_processing_latency", metricType: Histogram, buckets: Default1ms100s.buckets()},
ShardDistributorWatchEventsReceived: {metricName: "shard_distributor_watch_events_received", metricType: Counter},
},
}

Expand Down
64 changes: 43 additions & 21 deletions service/sharddistributor/store/etcd/executorstore/etcdstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/sharddistributor/config"
"github.com/uber/cadence/service/sharddistributor/store"
Expand All @@ -29,13 +30,14 @@ var (
)

type executorStoreImpl struct {
client etcdclient.Client
prefix string
logger log.Logger
shardCache *shardcache.ShardToExecutorCache
timeSource clock.TimeSource
recordWriter *common.RecordWriter
cfg *config.Config
client etcdclient.Client
prefix string
logger log.Logger
shardCache *shardcache.ShardToExecutorCache
timeSource clock.TimeSource
recordWriter *common.RecordWriter
cfg *config.Config
metricsClient metrics.Client
}

// shardStatisticsUpdate holds the staged statistics for a shard so we can write them
Expand All @@ -49,17 +51,18 @@ type shardStatisticsUpdate struct {
type ExecutorStoreParams struct {
fx.In

Client etcdclient.Client `name:"executorstore"`
ETCDConfig ETCDConfig
Lifecycle fx.Lifecycle
Logger log.Logger
TimeSource clock.TimeSource
Config *config.Config
Client etcdclient.Client `name:"executorstore"`
ETCDConfig ETCDConfig
Lifecycle fx.Lifecycle
Logger log.Logger
TimeSource clock.TimeSource
Config *config.Config
MetricsClient metrics.Client
}

// NewStore creates a new etcd-backed store and provides it to the fx application.
func NewStore(p ExecutorStoreParams) (store.Store, error) {
shardCache := shardcache.NewShardToExecutorCache(p.ETCDConfig.Prefix, p.Client, p.Logger, p.TimeSource)
shardCache := shardcache.NewShardToExecutorCache(p.ETCDConfig.Prefix, p.Client, p.Logger, p.TimeSource, p.MetricsClient)

timeSource := p.TimeSource
if timeSource == nil {
Expand All @@ -72,13 +75,14 @@ func NewStore(p ExecutorStoreParams) (store.Store, error) {
}

store := &executorStoreImpl{
client: p.Client,
prefix: p.ETCDConfig.Prefix,
logger: p.Logger,
shardCache: shardCache,
timeSource: timeSource,
recordWriter: recordWriter,
cfg: p.Config,
client: p.Client,
prefix: p.ETCDConfig.Prefix,
logger: p.Logger,
shardCache: shardCache,
timeSource: timeSource,
recordWriter: recordWriter,
cfg: p.Config,
metricsClient: p.MetricsClient,
}

p.Lifecycle.Append(fx.StartStopHook(store.Start, store.Stop))
Expand Down Expand Up @@ -275,6 +279,10 @@ func (s *executorStoreImpl) SubscribeToExecutorStatusChanges(ctx context.Context

go func() {
defer close(revisionChan)

scope := s.metricsClient.Scope(metrics.ShardDistributorWatchScope).
Tagged(metrics.NamespaceTag(namespace))

watchChan := s.client.Watch(ctx,
etcdkeys.BuildExecutorsPrefix(s.prefix, namespace),
clientv3.WithPrefix(),
Expand All @@ -286,7 +294,20 @@ func (s *executorStoreImpl) SubscribeToExecutorStatusChanges(ctx context.Context
return
}

// Track watch metrics
sw := scope.StartTimer(metrics.ShardDistributorWatchProcessingLatency)
scope.AddCounter(metrics.ShardDistributorWatchEventsReceived, int64(len(watchResp.Events)))

// Consumer lag: Header.Revision is the current etcd cluster revision,
// lastEvent.Kv.ModRevision is the revision when the event was created.
// The difference shows how far behind the consumer is from the current cluster state.
if len(watchResp.Events) > 0 {
lastEvent := watchResp.Events[len(watchResp.Events)-1]
scope.UpdateGauge(metrics.ShardDistributorWatchConsumerLag, float64(watchResp.Header.Revision-lastEvent.Kv.ModRevision))
}

if !s.hasExecutorStatusChanged(watchResp, namespace) {
sw.Stop()
continue
}

Expand All @@ -298,6 +319,7 @@ func (s *executorStoreImpl) SubscribeToExecutorStatusChanges(ctx context.Context
}

revisionChan <- watchResp.Header.Revision
sw.Stop()
}
}()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/service/sharddistributor/store"
"github.com/uber/cadence/service/sharddistributor/store/etcd/etcdclient"
"github.com/uber/cadence/service/sharddistributor/store/etcd/etcdkeys"
Expand Down Expand Up @@ -40,9 +41,10 @@ type namespaceShardToExecutor struct {
client etcdclient.Client
timeSource clock.TimeSource
pubSub *executorStatePubSub
metricsClient metrics.Client
}

func newNamespaceShardToExecutor(etcdPrefix, namespace string, client etcdclient.Client, stopCh chan struct{}, logger log.Logger, timeSource clock.TimeSource) (*namespaceShardToExecutor, error) {
func newNamespaceShardToExecutor(etcdPrefix, namespace string, client etcdclient.Client, stopCh chan struct{}, logger log.Logger, timeSource clock.TimeSource, metricsClient metrics.Client) (*namespaceShardToExecutor, error) {
return &namespaceShardToExecutor{
shardToExecutor: make(map[string]*store.ShardOwner),
executorState: make(map[*store.ShardOwner][]string),
Expand All @@ -55,6 +57,7 @@ func newNamespaceShardToExecutor(etcdPrefix, namespace string, client etcdclient
client: client,
timeSource: timeSource,
pubSub: newExecutorStatePubSub(logger, namespace),
metricsClient: metricsClient,
}, nil
}

Expand Down Expand Up @@ -171,6 +174,9 @@ func (n *namespaceShardToExecutor) watch(triggerCh chan<- struct{}) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

scope := n.metricsClient.Scope(metrics.ShardDistributorWatchScope).
Tagged(metrics.NamespaceTag(n.namespace))

watchChan := n.client.Watch(
// WithRequireLeader ensures that the etcd cluster has a leader
clientv3.WithRequireLeader(ctx),
Expand All @@ -193,8 +199,21 @@ func (n *namespaceShardToExecutor) watch(triggerCh chan<- struct{}) error {
return fmt.Errorf("watch channel closed")
}

// Track watch metrics
sw := scope.StartTimer(metrics.ShardDistributorWatchProcessingLatency)
scope.AddCounter(metrics.ShardDistributorWatchEventsReceived, int64(len(watchResp.Events)))

// Consumer lag: Header.Revision is the current etcd cluster revision,
// lastEvent.Kv.ModRevision is the revision when the event was created.
// The difference shows how far behind the consumer is from the current cluster state.
if len(watchResp.Events) > 0 {
lastEvent := watchResp.Events[len(watchResp.Events)-1]
scope.UpdateGauge(metrics.ShardDistributorWatchConsumerLag, float64(watchResp.Header.Revision-lastEvent.Kv.ModRevision))
}

// Only trigger refresh if the change is related to executor assigned state or metadata
if !n.hasExecutorStateChanged(watchResp) {
sw.Stop()
continue
}

Expand All @@ -203,6 +222,7 @@ func (n *namespaceShardToExecutor) watch(triggerCh chan<- struct{}) error {
default:
n.logger.Info("Cache is being refreshed, skipping trigger")
}
sw.Stop()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/sharddistributor/store"
"github.com/uber/cadence/service/sharddistributor/store/etcd/etcdclient"
Expand All @@ -38,7 +39,7 @@ func TestNamespaceShardToExecutor_Lifecycle(t *testing.T) {
})

// Start the cache
namespaceShardToExecutor, err := newNamespaceShardToExecutor(testCluster.EtcdPrefix, testCluster.Namespace, testCluster.Client, stopCh, logger, clock.NewRealTimeSource())
namespaceShardToExecutor, err := newNamespaceShardToExecutor(testCluster.EtcdPrefix, testCluster.Namespace, testCluster.Client, stopCh, logger, clock.NewRealTimeSource(), metrics.NewNoopMetricsClient())
assert.NoError(t, err)
namespaceShardToExecutor.Start(&sync.WaitGroup{})
time.Sleep(50 * time.Millisecond)
Expand Down Expand Up @@ -90,7 +91,7 @@ func TestNamespaceShardToExecutor_Subscribe(t *testing.T) {
})

// Start the cache
namespaceShardToExecutor, err := newNamespaceShardToExecutor(testCluster.EtcdPrefix, testCluster.Namespace, testCluster.Client, stopCh, logger, clock.NewRealTimeSource())
namespaceShardToExecutor, err := newNamespaceShardToExecutor(testCluster.EtcdPrefix, testCluster.Namespace, testCluster.Client, stopCh, logger, clock.NewRealTimeSource(), metrics.NewNoopMetricsClient())
assert.NoError(t, err)
namespaceShardToExecutor.Start(&sync.WaitGroup{})

Expand Down Expand Up @@ -157,7 +158,7 @@ func TestNamespaceShardToExecutor_watch_watchChanErrors(t *testing.T) {
Return(watchChan).
AnyTimes()

e, err := newNamespaceShardToExecutor(testPrefix, testNamespace, mockClient, stopCh, logger, clock.NewRealTimeSource())
e, err := newNamespaceShardToExecutor(testPrefix, testNamespace, mockClient, stopCh, logger, clock.NewRealTimeSource(), metrics.NewNoopMetricsClient())
require.NoError(t, err)

triggerChan := make(chan struct{}, 1)
Expand Down Expand Up @@ -431,7 +432,7 @@ func TestNamespaceShardToExecutor_namespaceRefreshLoop_watchError(t *testing.T)
MinTimes(0).
MaxTimes(1)

e, err := newNamespaceShardToExecutor(testPrefix, testNamespace, mockClient, stopCh, logger, timeSource)
e, err := newNamespaceShardToExecutor(testPrefix, testNamespace, mockClient, stopCh, logger, timeSource, metrics.NewNoopMetricsClient())
require.NoError(t, err)

wg := sync.WaitGroup{}
Expand Down Expand Up @@ -572,7 +573,7 @@ func setupNamespaceShardToExecutorTestCase(t *testing.T) *namespaceShardToExecut
Return(tc.watchChan).
AnyTimes()

e, err := newNamespaceShardToExecutor(tc.prefix, tc.namespace, tc.etcdClient, tc.stopCh, logger, clock.NewRealTimeSource())
e, err := newNamespaceShardToExecutor(tc.prefix, tc.namespace, tc.etcdClient, tc.stopCh, logger, clock.NewRealTimeSource(), metrics.NewNoopMetricsClient())
require.NoError(t, err)
tc.e = e
return tc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/service/sharddistributor/store"
"github.com/uber/cadence/service/sharddistributor/store/etcd/etcdclient"
)
Expand All @@ -23,13 +24,15 @@ type ShardToExecutorCache struct {
logger log.Logger
prefix string
wg sync.WaitGroup
metricsClient metrics.Client
}

func NewShardToExecutorCache(
prefix string,
client etcdclient.Client,
logger log.Logger,
timeSource clock.TimeSource,
metricsClient metrics.Client,
) *ShardToExecutorCache {
shardCache := &ShardToExecutorCache{
namespaceToShards: make(NamespaceToShards),
Expand All @@ -39,6 +42,7 @@ func NewShardToExecutorCache(
prefix: prefix,
client: client,
wg: sync.WaitGroup{},
metricsClient: metricsClient,
}

return shardCache
Expand Down Expand Up @@ -97,7 +101,7 @@ func (s *ShardToExecutorCache) getNamespaceShardToExecutor(namespace string) (*n
s.Lock()
defer s.Unlock()

namespaceShardToExecutor, err := newNamespaceShardToExecutor(s.prefix, namespace, s.client, s.stopC, s.logger, s.timeSource)
namespaceShardToExecutor, err := newNamespaceShardToExecutor(s.prefix, namespace, s.client, s.stopC, s.logger, s.timeSource, s.metricsClient)
if err != nil {
return nil, fmt.Errorf("new namespace shard to executor: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import (

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/service/sharddistributor/store/etcd/testhelper"
)

func TestNewShardToExecutorCache(t *testing.T) {
logger := testlogger.New(t)

client := &clientv3.Client{}
cache := NewShardToExecutorCache("some-prefix", client, logger, clock.NewRealTimeSource())
cache := NewShardToExecutorCache("some-prefix", client, logger, clock.NewRealTimeSource(), metrics.NewNoopMetricsClient())

assert.NotNil(t, cache)

Expand All @@ -38,7 +39,7 @@ func TestShardExecutorCacheForwarding(t *testing.T) {
"rack": "rack-42",
})

cache := NewShardToExecutorCache(testCluster.EtcdPrefix, testCluster.Client, logger, clock.NewRealTimeSource())
cache := NewShardToExecutorCache(testCluster.EtcdPrefix, testCluster.Client, logger, clock.NewRealTimeSource(), metrics.NewNoopMetricsClient())
cache.Start()
defer cache.Stop()

Expand Down
Loading