diff --git a/common/metrics/defs.go b/common/metrics/defs.go index f553f822031..f2f04a96d02 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -1494,6 +1494,9 @@ const ( // The scope for the shard distributor executor ShardDistributorExecutorScope + // ShardDistributorWatchScope tracks etcd watch stream processing + ShardDistributorWatchScope + NumShardDistributorScopes ) @@ -2182,6 +2185,7 @@ var ScopeDefs = map[ServiceIdx]map[ScopeIdx]scopeDefinition{ ShardDistributorStoreSubscribeToExecutorStatusChangesScope: {operation: "StoreSubscribeToExecutorStatusChanges"}, ShardDistributorStoreSubscribeToAssignmentChangesScope: {operation: "StoreSubscribeToAssignmentChanges"}, ShardDistributorStoreDeleteAssignedStatesScope: {operation: "StoreDeleteAssignedStates"}, + ShardDistributorWatchScope: {operation: "Watch"}, }, } @@ -3000,6 +3004,13 @@ const ( // ShardDistributorShardHandoverLatency measures the time taken to hand over a shard from one executor to another ShardDistributorShardHandoverLatency + // ShardDistributorWatchConsumerLag measures the lag between latest etcd revision and last processed revision + ShardDistributorWatchConsumerLag + // ShardDistributorWatchProcessingLatency measures how long it takes to process a single WatchResponse + ShardDistributorWatchProcessingLatency + // ShardDistributorWatchEventsReceived counts the total number of watch events received + ShardDistributorWatchEventsReceived + NumShardDistributorMetrics ) @@ -3802,6 +3813,10 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{ ShardDistributorShardAssignmentDistributionLatency: {metricName: "shard_distributor_shard_assignment_distribution_latency", metricType: Histogram, buckets: ShardDistributorShardAssignmentLatencyBuckets}, ShardDistributorShardHandoverLatency: {metricName: "shard_distributor_shard_handover_latency", metricType: Histogram, buckets: ShardDistributorShardAssignmentLatencyBuckets}, + + ShardDistributorWatchConsumerLag: {metricName: "shard_distributor_watch_consumer_lag", metricType: Gauge}, + ShardDistributorWatchProcessingLatency: {metricName: "shard_distributor_watch_processing_latency", metricType: Histogram, buckets: Default1ms100s.buckets()}, + ShardDistributorWatchEventsReceived: {metricName: "shard_distributor_watch_events_received", metricType: Counter}, }, } diff --git a/service/sharddistributor/store/etcd/executorstore/etcdstore.go b/service/sharddistributor/store/etcd/executorstore/etcdstore.go index c9d8b827b9c..bc6228a1f6e 100644 --- a/service/sharddistributor/store/etcd/executorstore/etcdstore.go +++ b/service/sharddistributor/store/etcd/executorstore/etcdstore.go @@ -14,6 +14,7 @@ import ( "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/sharddistributor/config" "github.com/uber/cadence/service/sharddistributor/store" @@ -29,13 +30,14 @@ var ( ) type executorStoreImpl struct { - client etcdclient.Client - prefix string - logger log.Logger - shardCache *shardcache.ShardToExecutorCache - timeSource clock.TimeSource - recordWriter *common.RecordWriter - cfg *config.Config + client etcdclient.Client + prefix string + logger log.Logger + shardCache *shardcache.ShardToExecutorCache + timeSource clock.TimeSource + recordWriter *common.RecordWriter + cfg *config.Config + metricsClient metrics.Client } // shardStatisticsUpdate holds the staged statistics for a shard so we can write them @@ -49,17 +51,18 @@ type shardStatisticsUpdate struct { type ExecutorStoreParams struct { fx.In - Client etcdclient.Client `name:"executorstore"` - ETCDConfig ETCDConfig - Lifecycle fx.Lifecycle - Logger log.Logger - TimeSource clock.TimeSource - Config *config.Config + Client etcdclient.Client `name:"executorstore"` + ETCDConfig ETCDConfig + Lifecycle fx.Lifecycle + Logger log.Logger + TimeSource clock.TimeSource + Config *config.Config + MetricsClient metrics.Client } // NewStore creates a new etcd-backed store and provides it to the fx application. func NewStore(p ExecutorStoreParams) (store.Store, error) { - shardCache := shardcache.NewShardToExecutorCache(p.ETCDConfig.Prefix, p.Client, p.Logger, p.TimeSource) + shardCache := shardcache.NewShardToExecutorCache(p.ETCDConfig.Prefix, p.Client, p.Logger, p.TimeSource, p.MetricsClient) timeSource := p.TimeSource if timeSource == nil { @@ -72,13 +75,14 @@ func NewStore(p ExecutorStoreParams) (store.Store, error) { } store := &executorStoreImpl{ - client: p.Client, - prefix: p.ETCDConfig.Prefix, - logger: p.Logger, - shardCache: shardCache, - timeSource: timeSource, - recordWriter: recordWriter, - cfg: p.Config, + client: p.Client, + prefix: p.ETCDConfig.Prefix, + logger: p.Logger, + shardCache: shardCache, + timeSource: timeSource, + recordWriter: recordWriter, + cfg: p.Config, + metricsClient: p.MetricsClient, } p.Lifecycle.Append(fx.StartStopHook(store.Start, store.Stop)) @@ -275,6 +279,10 @@ func (s *executorStoreImpl) SubscribeToExecutorStatusChanges(ctx context.Context go func() { defer close(revisionChan) + + scope := s.metricsClient.Scope(metrics.ShardDistributorWatchScope). + Tagged(metrics.NamespaceTag(namespace)) + watchChan := s.client.Watch(ctx, etcdkeys.BuildExecutorsPrefix(s.prefix, namespace), clientv3.WithPrefix(), @@ -286,7 +294,20 @@ func (s *executorStoreImpl) SubscribeToExecutorStatusChanges(ctx context.Context return } + // Track watch metrics + sw := scope.StartTimer(metrics.ShardDistributorWatchProcessingLatency) + scope.AddCounter(metrics.ShardDistributorWatchEventsReceived, int64(len(watchResp.Events))) + + // Consumer lag: Header.Revision is the current etcd cluster revision, + // lastEvent.Kv.ModRevision is the revision when the event was created. + // The difference shows how far behind the consumer is from the current cluster state. + if len(watchResp.Events) > 0 { + lastEvent := watchResp.Events[len(watchResp.Events)-1] + scope.UpdateGauge(metrics.ShardDistributorWatchConsumerLag, float64(watchResp.Header.Revision-lastEvent.Kv.ModRevision)) + } + if !s.hasExecutorStatusChanged(watchResp, namespace) { + sw.Stop() continue } @@ -298,6 +319,7 @@ func (s *executorStoreImpl) SubscribeToExecutorStatusChanges(ctx context.Context } revisionChan <- watchResp.Header.Revision + sw.Stop() } }() diff --git a/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go b/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go index ddd3545027a..bed13a10cea 100644 --- a/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go +++ b/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go @@ -13,6 +13,7 @@ import ( "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/service/sharddistributor/store" "github.com/uber/cadence/service/sharddistributor/store/etcd/etcdclient" "github.com/uber/cadence/service/sharddistributor/store/etcd/etcdkeys" @@ -40,9 +41,10 @@ type namespaceShardToExecutor struct { client etcdclient.Client timeSource clock.TimeSource pubSub *executorStatePubSub + metricsClient metrics.Client } -func newNamespaceShardToExecutor(etcdPrefix, namespace string, client etcdclient.Client, stopCh chan struct{}, logger log.Logger, timeSource clock.TimeSource) (*namespaceShardToExecutor, error) { +func newNamespaceShardToExecutor(etcdPrefix, namespace string, client etcdclient.Client, stopCh chan struct{}, logger log.Logger, timeSource clock.TimeSource, metricsClient metrics.Client) (*namespaceShardToExecutor, error) { return &namespaceShardToExecutor{ shardToExecutor: make(map[string]*store.ShardOwner), executorState: make(map[*store.ShardOwner][]string), @@ -55,6 +57,7 @@ func newNamespaceShardToExecutor(etcdPrefix, namespace string, client etcdclient client: client, timeSource: timeSource, pubSub: newExecutorStatePubSub(logger, namespace), + metricsClient: metricsClient, }, nil } @@ -171,6 +174,9 @@ func (n *namespaceShardToExecutor) watch(triggerCh chan<- struct{}) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + scope := n.metricsClient.Scope(metrics.ShardDistributorWatchScope). + Tagged(metrics.NamespaceTag(n.namespace)) + watchChan := n.client.Watch( // WithRequireLeader ensures that the etcd cluster has a leader clientv3.WithRequireLeader(ctx), @@ -193,8 +199,21 @@ func (n *namespaceShardToExecutor) watch(triggerCh chan<- struct{}) error { return fmt.Errorf("watch channel closed") } + // Track watch metrics + sw := scope.StartTimer(metrics.ShardDistributorWatchProcessingLatency) + scope.AddCounter(metrics.ShardDistributorWatchEventsReceived, int64(len(watchResp.Events))) + + // Consumer lag: Header.Revision is the current etcd cluster revision, + // lastEvent.Kv.ModRevision is the revision when the event was created. + // The difference shows how far behind the consumer is from the current cluster state. + if len(watchResp.Events) > 0 { + lastEvent := watchResp.Events[len(watchResp.Events)-1] + scope.UpdateGauge(metrics.ShardDistributorWatchConsumerLag, float64(watchResp.Header.Revision-lastEvent.Kv.ModRevision)) + } + // Only trigger refresh if the change is related to executor assigned state or metadata if !n.hasExecutorStateChanged(watchResp) { + sw.Stop() continue } @@ -203,6 +222,7 @@ func (n *namespaceShardToExecutor) watch(triggerCh chan<- struct{}) error { default: n.logger.Info("Cache is being refreshed, skipping trigger") } + sw.Stop() } } } diff --git a/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go b/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go index 2a6a9190349..7c87ffa3397 100644 --- a/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go +++ b/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go @@ -17,6 +17,7 @@ import ( "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/log/testlogger" + "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/sharddistributor/store" "github.com/uber/cadence/service/sharddistributor/store/etcd/etcdclient" @@ -38,7 +39,7 @@ func TestNamespaceShardToExecutor_Lifecycle(t *testing.T) { }) // Start the cache - namespaceShardToExecutor, err := newNamespaceShardToExecutor(testCluster.EtcdPrefix, testCluster.Namespace, testCluster.Client, stopCh, logger, clock.NewRealTimeSource()) + namespaceShardToExecutor, err := newNamespaceShardToExecutor(testCluster.EtcdPrefix, testCluster.Namespace, testCluster.Client, stopCh, logger, clock.NewRealTimeSource(), metrics.NewNoopMetricsClient()) assert.NoError(t, err) namespaceShardToExecutor.Start(&sync.WaitGroup{}) time.Sleep(50 * time.Millisecond) @@ -90,7 +91,7 @@ func TestNamespaceShardToExecutor_Subscribe(t *testing.T) { }) // Start the cache - namespaceShardToExecutor, err := newNamespaceShardToExecutor(testCluster.EtcdPrefix, testCluster.Namespace, testCluster.Client, stopCh, logger, clock.NewRealTimeSource()) + namespaceShardToExecutor, err := newNamespaceShardToExecutor(testCluster.EtcdPrefix, testCluster.Namespace, testCluster.Client, stopCh, logger, clock.NewRealTimeSource(), metrics.NewNoopMetricsClient()) assert.NoError(t, err) namespaceShardToExecutor.Start(&sync.WaitGroup{}) @@ -157,7 +158,7 @@ func TestNamespaceShardToExecutor_watch_watchChanErrors(t *testing.T) { Return(watchChan). AnyTimes() - e, err := newNamespaceShardToExecutor(testPrefix, testNamespace, mockClient, stopCh, logger, clock.NewRealTimeSource()) + e, err := newNamespaceShardToExecutor(testPrefix, testNamespace, mockClient, stopCh, logger, clock.NewRealTimeSource(), metrics.NewNoopMetricsClient()) require.NoError(t, err) triggerChan := make(chan struct{}, 1) @@ -431,7 +432,7 @@ func TestNamespaceShardToExecutor_namespaceRefreshLoop_watchError(t *testing.T) MinTimes(0). MaxTimes(1) - e, err := newNamespaceShardToExecutor(testPrefix, testNamespace, mockClient, stopCh, logger, timeSource) + e, err := newNamespaceShardToExecutor(testPrefix, testNamespace, mockClient, stopCh, logger, timeSource, metrics.NewNoopMetricsClient()) require.NoError(t, err) wg := sync.WaitGroup{} @@ -572,7 +573,7 @@ func setupNamespaceShardToExecutorTestCase(t *testing.T) *namespaceShardToExecut Return(tc.watchChan). AnyTimes() - e, err := newNamespaceShardToExecutor(tc.prefix, tc.namespace, tc.etcdClient, tc.stopCh, logger, clock.NewRealTimeSource()) + e, err := newNamespaceShardToExecutor(tc.prefix, tc.namespace, tc.etcdClient, tc.stopCh, logger, clock.NewRealTimeSource(), metrics.NewNoopMetricsClient()) require.NoError(t, err) tc.e = e return tc diff --git a/service/sharddistributor/store/etcd/executorstore/shardcache/shardcache.go b/service/sharddistributor/store/etcd/executorstore/shardcache/shardcache.go index 374435faae5..8cb2e75a2d8 100644 --- a/service/sharddistributor/store/etcd/executorstore/shardcache/shardcache.go +++ b/service/sharddistributor/store/etcd/executorstore/shardcache/shardcache.go @@ -9,6 +9,7 @@ import ( "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/service/sharddistributor/store" "github.com/uber/cadence/service/sharddistributor/store/etcd/etcdclient" ) @@ -23,6 +24,7 @@ type ShardToExecutorCache struct { logger log.Logger prefix string wg sync.WaitGroup + metricsClient metrics.Client } func NewShardToExecutorCache( @@ -30,6 +32,7 @@ func NewShardToExecutorCache( client etcdclient.Client, logger log.Logger, timeSource clock.TimeSource, + metricsClient metrics.Client, ) *ShardToExecutorCache { shardCache := &ShardToExecutorCache{ namespaceToShards: make(NamespaceToShards), @@ -39,6 +42,7 @@ func NewShardToExecutorCache( prefix: prefix, client: client, wg: sync.WaitGroup{}, + metricsClient: metricsClient, } return shardCache @@ -97,7 +101,7 @@ func (s *ShardToExecutorCache) getNamespaceShardToExecutor(namespace string) (*n s.Lock() defer s.Unlock() - namespaceShardToExecutor, err := newNamespaceShardToExecutor(s.prefix, namespace, s.client, s.stopC, s.logger, s.timeSource) + namespaceShardToExecutor, err := newNamespaceShardToExecutor(s.prefix, namespace, s.client, s.stopC, s.logger, s.timeSource, s.metricsClient) if err != nil { return nil, fmt.Errorf("new namespace shard to executor: %w", err) } diff --git a/service/sharddistributor/store/etcd/executorstore/shardcache/shardcache_test.go b/service/sharddistributor/store/etcd/executorstore/shardcache/shardcache_test.go index d88f612ce88..bd9fc8babc4 100644 --- a/service/sharddistributor/store/etcd/executorstore/shardcache/shardcache_test.go +++ b/service/sharddistributor/store/etcd/executorstore/shardcache/shardcache_test.go @@ -9,6 +9,7 @@ import ( "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/log/testlogger" + "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/service/sharddistributor/store/etcd/testhelper" ) @@ -16,7 +17,7 @@ func TestNewShardToExecutorCache(t *testing.T) { logger := testlogger.New(t) client := &clientv3.Client{} - cache := NewShardToExecutorCache("some-prefix", client, logger, clock.NewRealTimeSource()) + cache := NewShardToExecutorCache("some-prefix", client, logger, clock.NewRealTimeSource(), metrics.NewNoopMetricsClient()) assert.NotNil(t, cache) @@ -38,7 +39,7 @@ func TestShardExecutorCacheForwarding(t *testing.T) { "rack": "rack-42", }) - cache := NewShardToExecutorCache(testCluster.EtcdPrefix, testCluster.Client, logger, clock.NewRealTimeSource()) + cache := NewShardToExecutorCache(testCluster.EtcdPrefix, testCluster.Client, logger, clock.NewRealTimeSource(), metrics.NewNoopMetricsClient()) cache.Start() defer cache.Stop()