Skip to content

Commit cfa5b2d

Browse files
chore(shard-manager): Add metrics to track etcd watch events (#7586)
<!-- Describe what has changed in this PR --> **What changed?** This PR adds metrics to etcd watch handlers in shard distributor, to track watch event throughput, processing latency, and consumer lag. <!-- Tell your future self why have you made these changes --> **Why?** Shard distributor uses etcd watch streams to detect changes in executor state and trigger cache refreshes or rebalancing. Previously, we had no observability on these watch events, making it difficult to: - Detect if watches are falling behind etcd updates, - Measure processing performance - Alert on watch-related issues before they impact the system <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Started local Cadence with Prometheus, triggered watch events, verified shard_distributor_watch_* metrics appear in /metrics endpoint <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** N/A <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** N/A <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** N/A --------- Signed-off-by: Gaziza Yestemirova <gaziza@uber.com>
1 parent 05a2412 commit cfa5b2d

File tree

8 files changed

+87
-35
lines changed

8 files changed

+87
-35
lines changed

common/metrics/defs.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1494,6 +1494,9 @@ const (
14941494
// The scope for the shard distributor executor
14951495
ShardDistributorExecutorScope
14961496

1497+
// ShardDistributorWatchScope tracks etcd watch stream processing
1498+
ShardDistributorWatchScope
1499+
14971500
NumShardDistributorScopes
14981501
)
14991502

@@ -2182,6 +2185,7 @@ var ScopeDefs = map[ServiceIdx]map[ScopeIdx]scopeDefinition{
21822185
ShardDistributorStoreSubscribeToExecutorStatusChangesScope: {operation: "StoreSubscribeToExecutorStatusChanges"},
21832186
ShardDistributorStoreSubscribeToAssignmentChangesScope: {operation: "StoreSubscribeToAssignmentChanges"},
21842187
ShardDistributorStoreDeleteAssignedStatesScope: {operation: "StoreDeleteAssignedStates"},
2188+
ShardDistributorWatchScope: {operation: "Watch"},
21852189
},
21862190
}
21872191

@@ -3000,6 +3004,11 @@ const (
30003004
// ShardDistributorShardHandoverLatency measures the time taken to hand over a shard from one executor to another
30013005
ShardDistributorShardHandoverLatency
30023006

3007+
// ShardDistributorWatchProcessingLatency measures how long it takes to process a single WatchResponse
3008+
ShardDistributorWatchProcessingLatency
3009+
// ShardDistributorWatchEventsReceived counts the total number of watch events received
3010+
ShardDistributorWatchEventsReceived
3011+
30033012
NumShardDistributorMetrics
30043013
)
30053014

@@ -3802,6 +3811,9 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{
38023811

38033812
ShardDistributorShardAssignmentDistributionLatency: {metricName: "shard_distributor_shard_assignment_distribution_latency", metricType: Histogram, buckets: ShardDistributorShardAssignmentLatencyBuckets},
38043813
ShardDistributorShardHandoverLatency: {metricName: "shard_distributor_shard_handover_latency", metricType: Histogram, buckets: ShardDistributorShardAssignmentLatencyBuckets},
3814+
3815+
ShardDistributorWatchProcessingLatency: {metricName: "shard_distributor_watch_processing_latency", metricType: Histogram, buckets: Default1ms100s.buckets()},
3816+
ShardDistributorWatchEventsReceived: {metricName: "shard_distributor_watch_events_received", metricType: Counter},
38053817
},
38063818
}
38073819

common/metrics/tags.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,10 @@ func ExecutorStatusTag(status string) Tag {
360360
return metricWithUnknown("executor_status", status)
361361
}
362362

363+
func ShardDistributorWatchTypeTag(watchType string) Tag {
364+
return metricWithUnknown("watch_type", watchType)
365+
}
366+
363367
func TaskCategoryTag(category string) Tag {
364368
return metricWithUnknown("task_category", category)
365369
}

service/sharddistributor/store/etcd/executorstore/etcdstore.go

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/uber/cadence/common/clock"
1515
"github.com/uber/cadence/common/log"
1616
"github.com/uber/cadence/common/log/tag"
17+
"github.com/uber/cadence/common/metrics"
1718
"github.com/uber/cadence/common/types"
1819
"github.com/uber/cadence/service/sharddistributor/config"
1920
"github.com/uber/cadence/service/sharddistributor/store"
@@ -29,13 +30,14 @@ var (
2930
)
3031

3132
type executorStoreImpl struct {
32-
client etcdclient.Client
33-
prefix string
34-
logger log.Logger
35-
shardCache *shardcache.ShardToExecutorCache
36-
timeSource clock.TimeSource
37-
recordWriter *common.RecordWriter
38-
cfg *config.Config
33+
client etcdclient.Client
34+
prefix string
35+
logger log.Logger
36+
shardCache *shardcache.ShardToExecutorCache
37+
timeSource clock.TimeSource
38+
recordWriter *common.RecordWriter
39+
cfg *config.Config
40+
metricsClient metrics.Client
3941
}
4042

4143
// shardStatisticsUpdate holds the staged statistics for a shard so we can write them
@@ -49,17 +51,18 @@ type shardStatisticsUpdate struct {
4951
type ExecutorStoreParams struct {
5052
fx.In
5153

52-
Client etcdclient.Client `name:"executorstore"`
53-
ETCDConfig ETCDConfig
54-
Lifecycle fx.Lifecycle
55-
Logger log.Logger
56-
TimeSource clock.TimeSource
57-
Config *config.Config
54+
Client etcdclient.Client `name:"executorstore"`
55+
ETCDConfig ETCDConfig
56+
Lifecycle fx.Lifecycle
57+
Logger log.Logger
58+
TimeSource clock.TimeSource
59+
Config *config.Config
60+
MetricsClient metrics.Client
5861
}
5962

6063
// NewStore creates a new etcd-backed store and provides it to the fx application.
6164
func NewStore(p ExecutorStoreParams) (store.Store, error) {
62-
shardCache := shardcache.NewShardToExecutorCache(p.ETCDConfig.Prefix, p.Client, p.Logger, p.TimeSource)
65+
shardCache := shardcache.NewShardToExecutorCache(p.ETCDConfig.Prefix, p.Client, p.Logger, p.TimeSource, p.MetricsClient)
6366

6467
timeSource := p.TimeSource
6568
if timeSource == nil {
@@ -72,13 +75,14 @@ func NewStore(p ExecutorStoreParams) (store.Store, error) {
7275
}
7376

7477
store := &executorStoreImpl{
75-
client: p.Client,
76-
prefix: p.ETCDConfig.Prefix,
77-
logger: p.Logger,
78-
shardCache: shardCache,
79-
timeSource: timeSource,
80-
recordWriter: recordWriter,
81-
cfg: p.Config,
78+
client: p.Client,
79+
prefix: p.ETCDConfig.Prefix,
80+
logger: p.Logger,
81+
shardCache: shardCache,
82+
timeSource: timeSource,
83+
recordWriter: recordWriter,
84+
cfg: p.Config,
85+
metricsClient: p.MetricsClient,
8286
}
8387

8488
p.Lifecycle.Append(fx.StartStopHook(store.Start, store.Stop))
@@ -275,6 +279,11 @@ func (s *executorStoreImpl) SubscribeToExecutorStatusChanges(ctx context.Context
275279

276280
go func() {
277281
defer close(revisionChan)
282+
283+
scope := s.metricsClient.Scope(metrics.ShardDistributorWatchScope).
284+
Tagged(metrics.NamespaceTag(namespace)).
285+
Tagged(metrics.ShardDistributorWatchTypeTag("rebalance"))
286+
278287
watchChan := s.client.Watch(ctx,
279288
etcdkeys.BuildExecutorsPrefix(s.prefix, namespace),
280289
clientv3.WithPrefix(),
@@ -286,7 +295,12 @@ func (s *executorStoreImpl) SubscribeToExecutorStatusChanges(ctx context.Context
286295
return
287296
}
288297

298+
// Track watch metrics
299+
sw := scope.StartTimer(metrics.ShardDistributorWatchProcessingLatency)
300+
scope.AddCounter(metrics.ShardDistributorWatchEventsReceived, int64(len(watchResp.Events)))
301+
289302
if !s.hasExecutorStatusChanged(watchResp, namespace) {
303+
sw.Stop()
290304
continue
291305
}
292306

@@ -298,6 +312,7 @@ func (s *executorStoreImpl) SubscribeToExecutorStatusChanges(ctx context.Context
298312
}
299313

300314
revisionChan <- watchResp.Header.Revision
315+
sw.Stop()
301316
}
302317
}()
303318

service/sharddistributor/store/etcd/executorstore/etcdstore_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/uber/cadence/common/clock"
1616
"github.com/uber/cadence/common/log/testlogger"
17+
"github.com/uber/cadence/common/metrics"
1718
"github.com/uber/cadence/common/types"
1819
"github.com/uber/cadence/service/sharddistributor/config"
1920
"github.com/uber/cadence/service/sharddistributor/store"
@@ -770,11 +771,12 @@ func createStore(t *testing.T, tc *testhelper.StoreTestCluster) store.Store {
770771
require.NoError(t, err)
771772

772773
store, err := NewStore(ExecutorStoreParams{
773-
Client: tc.Client,
774-
ETCDConfig: etcdConfig,
775-
Lifecycle: fxtest.NewLifecycle(t),
776-
Logger: testlogger.New(t),
777-
TimeSource: clock.NewMockedTimeSourceAt(time.Now()),
774+
Client: tc.Client,
775+
ETCDConfig: etcdConfig,
776+
Lifecycle: fxtest.NewLifecycle(t),
777+
Logger: testlogger.New(t),
778+
TimeSource: clock.NewMockedTimeSourceAt(time.Now()),
779+
MetricsClient: metrics.NewNoopMetricsClient(),
778780
Config: &config.Config{
779781
LoadBalancingMode: func(namespace string) string { return config.LoadBalancingModeNAIVE },
780782
},

service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/uber/cadence/common/clock"
1414
"github.com/uber/cadence/common/log"
1515
"github.com/uber/cadence/common/log/tag"
16+
"github.com/uber/cadence/common/metrics"
1617
"github.com/uber/cadence/service/sharddistributor/store"
1718
"github.com/uber/cadence/service/sharddistributor/store/etcd/etcdclient"
1819
"github.com/uber/cadence/service/sharddistributor/store/etcd/etcdkeys"
@@ -40,9 +41,10 @@ type namespaceShardToExecutor struct {
4041
client etcdclient.Client
4142
timeSource clock.TimeSource
4243
pubSub *executorStatePubSub
44+
metricsClient metrics.Client
4345
}
4446

45-
func newNamespaceShardToExecutor(etcdPrefix, namespace string, client etcdclient.Client, stopCh chan struct{}, logger log.Logger, timeSource clock.TimeSource) (*namespaceShardToExecutor, error) {
47+
func newNamespaceShardToExecutor(etcdPrefix, namespace string, client etcdclient.Client, stopCh chan struct{}, logger log.Logger, timeSource clock.TimeSource, metricsClient metrics.Client) (*namespaceShardToExecutor, error) {
4648
return &namespaceShardToExecutor{
4749
shardToExecutor: make(map[string]*store.ShardOwner),
4850
executorState: make(map[*store.ShardOwner][]string),
@@ -55,6 +57,7 @@ func newNamespaceShardToExecutor(etcdPrefix, namespace string, client etcdclient
5557
client: client,
5658
timeSource: timeSource,
5759
pubSub: newExecutorStatePubSub(logger, namespace),
60+
metricsClient: metricsClient,
5861
}, nil
5962
}
6063

@@ -171,6 +174,10 @@ func (n *namespaceShardToExecutor) watch(triggerCh chan<- struct{}) error {
171174
ctx, cancel := context.WithCancel(context.Background())
172175
defer cancel()
173176

177+
scope := n.metricsClient.Scope(metrics.ShardDistributorWatchScope).
178+
Tagged(metrics.NamespaceTag(n.namespace)).
179+
Tagged(metrics.ShardDistributorWatchTypeTag("cache_refresh"))
180+
174181
watchChan := n.client.Watch(
175182
// WithRequireLeader ensures that the etcd cluster has a leader
176183
clientv3.WithRequireLeader(ctx),
@@ -193,8 +200,13 @@ func (n *namespaceShardToExecutor) watch(triggerCh chan<- struct{}) error {
193200
return fmt.Errorf("watch channel closed")
194201
}
195202

203+
// Track watch metrics
204+
sw := scope.StartTimer(metrics.ShardDistributorWatchProcessingLatency)
205+
scope.AddCounter(metrics.ShardDistributorWatchEventsReceived, int64(len(watchResp.Events)))
206+
196207
// Only trigger refresh if the change is related to executor assigned state or metadata
197208
if !n.hasExecutorStateChanged(watchResp) {
209+
sw.Stop()
198210
continue
199211
}
200212

@@ -203,6 +215,7 @@ func (n *namespaceShardToExecutor) watch(triggerCh chan<- struct{}) error {
203215
default:
204216
n.logger.Info("Cache is being refreshed, skipping trigger")
205217
}
218+
sw.Stop()
206219
}
207220
}
208221
}

service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717

1818
"github.com/uber/cadence/common/clock"
1919
"github.com/uber/cadence/common/log/testlogger"
20+
"github.com/uber/cadence/common/metrics"
2021
"github.com/uber/cadence/common/types"
2122
"github.com/uber/cadence/service/sharddistributor/store"
2223
"github.com/uber/cadence/service/sharddistributor/store/etcd/etcdclient"
@@ -38,7 +39,7 @@ func TestNamespaceShardToExecutor_Lifecycle(t *testing.T) {
3839
})
3940

4041
// Start the cache
41-
namespaceShardToExecutor, err := newNamespaceShardToExecutor(testCluster.EtcdPrefix, testCluster.Namespace, testCluster.Client, stopCh, logger, clock.NewRealTimeSource())
42+
namespaceShardToExecutor, err := newNamespaceShardToExecutor(testCluster.EtcdPrefix, testCluster.Namespace, testCluster.Client, stopCh, logger, clock.NewRealTimeSource(), metrics.NewNoopMetricsClient())
4243
assert.NoError(t, err)
4344
namespaceShardToExecutor.Start(&sync.WaitGroup{})
4445
time.Sleep(50 * time.Millisecond)
@@ -90,7 +91,7 @@ func TestNamespaceShardToExecutor_Subscribe(t *testing.T) {
9091
})
9192

9293
// Start the cache
93-
namespaceShardToExecutor, err := newNamespaceShardToExecutor(testCluster.EtcdPrefix, testCluster.Namespace, testCluster.Client, stopCh, logger, clock.NewRealTimeSource())
94+
namespaceShardToExecutor, err := newNamespaceShardToExecutor(testCluster.EtcdPrefix, testCluster.Namespace, testCluster.Client, stopCh, logger, clock.NewRealTimeSource(), metrics.NewNoopMetricsClient())
9495
assert.NoError(t, err)
9596
namespaceShardToExecutor.Start(&sync.WaitGroup{})
9697

@@ -157,7 +158,7 @@ func TestNamespaceShardToExecutor_watch_watchChanErrors(t *testing.T) {
157158
Return(watchChan).
158159
AnyTimes()
159160

160-
e, err := newNamespaceShardToExecutor(testPrefix, testNamespace, mockClient, stopCh, logger, clock.NewRealTimeSource())
161+
e, err := newNamespaceShardToExecutor(testPrefix, testNamespace, mockClient, stopCh, logger, clock.NewRealTimeSource(), metrics.NewNoopMetricsClient())
161162
require.NoError(t, err)
162163

163164
triggerChan := make(chan struct{}, 1)
@@ -431,7 +432,7 @@ func TestNamespaceShardToExecutor_namespaceRefreshLoop_watchError(t *testing.T)
431432
MinTimes(0).
432433
MaxTimes(1)
433434

434-
e, err := newNamespaceShardToExecutor(testPrefix, testNamespace, mockClient, stopCh, logger, timeSource)
435+
e, err := newNamespaceShardToExecutor(testPrefix, testNamespace, mockClient, stopCh, logger, timeSource, metrics.NewNoopMetricsClient())
435436
require.NoError(t, err)
436437

437438
wg := sync.WaitGroup{}
@@ -572,7 +573,7 @@ func setupNamespaceShardToExecutorTestCase(t *testing.T) *namespaceShardToExecut
572573
Return(tc.watchChan).
573574
AnyTimes()
574575

575-
e, err := newNamespaceShardToExecutor(tc.prefix, tc.namespace, tc.etcdClient, tc.stopCh, logger, clock.NewRealTimeSource())
576+
e, err := newNamespaceShardToExecutor(tc.prefix, tc.namespace, tc.etcdClient, tc.stopCh, logger, clock.NewRealTimeSource(), metrics.NewNoopMetricsClient())
576577
require.NoError(t, err)
577578
tc.e = e
578579
return tc

service/sharddistributor/store/etcd/executorstore/shardcache/shardcache.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/uber/cadence/common/clock"
1111
"github.com/uber/cadence/common/log"
12+
"github.com/uber/cadence/common/metrics"
1213
"github.com/uber/cadence/service/sharddistributor/store"
1314
"github.com/uber/cadence/service/sharddistributor/store/etcd/etcdclient"
1415
)
@@ -23,13 +24,15 @@ type ShardToExecutorCache struct {
2324
logger log.Logger
2425
prefix string
2526
wg sync.WaitGroup
27+
metricsClient metrics.Client
2628
}
2729

2830
func NewShardToExecutorCache(
2931
prefix string,
3032
client etcdclient.Client,
3133
logger log.Logger,
3234
timeSource clock.TimeSource,
35+
metricsClient metrics.Client,
3336
) *ShardToExecutorCache {
3437
shardCache := &ShardToExecutorCache{
3538
namespaceToShards: make(NamespaceToShards),
@@ -39,6 +42,7 @@ func NewShardToExecutorCache(
3942
prefix: prefix,
4043
client: client,
4144
wg: sync.WaitGroup{},
45+
metricsClient: metricsClient,
4246
}
4347

4448
return shardCache
@@ -97,7 +101,7 @@ func (s *ShardToExecutorCache) getNamespaceShardToExecutor(namespace string) (*n
97101
s.Lock()
98102
defer s.Unlock()
99103

100-
namespaceShardToExecutor, err := newNamespaceShardToExecutor(s.prefix, namespace, s.client, s.stopC, s.logger, s.timeSource)
104+
namespaceShardToExecutor, err := newNamespaceShardToExecutor(s.prefix, namespace, s.client, s.stopC, s.logger, s.timeSource, s.metricsClient)
101105
if err != nil {
102106
return nil, fmt.Errorf("new namespace shard to executor: %w", err)
103107
}

service/sharddistributor/store/etcd/executorstore/shardcache/shardcache_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@ import (
99

1010
"github.com/uber/cadence/common/clock"
1111
"github.com/uber/cadence/common/log/testlogger"
12+
"github.com/uber/cadence/common/metrics"
1213
"github.com/uber/cadence/service/sharddistributor/store/etcd/testhelper"
1314
)
1415

1516
func TestNewShardToExecutorCache(t *testing.T) {
1617
logger := testlogger.New(t)
1718

1819
client := &clientv3.Client{}
19-
cache := NewShardToExecutorCache("some-prefix", client, logger, clock.NewRealTimeSource())
20+
cache := NewShardToExecutorCache("some-prefix", client, logger, clock.NewRealTimeSource(), metrics.NewNoopMetricsClient())
2021

2122
assert.NotNil(t, cache)
2223

@@ -38,7 +39,7 @@ func TestShardExecutorCacheForwarding(t *testing.T) {
3839
"rack": "rack-42",
3940
})
4041

41-
cache := NewShardToExecutorCache(testCluster.EtcdPrefix, testCluster.Client, logger, clock.NewRealTimeSource())
42+
cache := NewShardToExecutorCache(testCluster.EtcdPrefix, testCluster.Client, logger, clock.NewRealTimeSource(), metrics.NewNoopMetricsClient())
4243
cache.Start()
4344
defer cache.Stop()
4445

0 commit comments

Comments
 (0)