Skip to content

Commit c6959bf

Browse files
authored
chore(replicationTaskFetcher): Add metrics to task_fetcher (#7462)
<!-- Describe what has changed in this PR --> **What changed?** Added metrics to task_fetcher <!-- Tell your future self why have you made these changes --> **Why?** Have visibility over the latency of the taskFetcher relative to the replication latency <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** Signed-off-by: fimanishi <[email protected]>
1 parent 97175b5 commit c6959bf

File tree

4 files changed

+25
-1
lines changed

4 files changed

+25
-1
lines changed

common/metrics/defs.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2742,6 +2742,8 @@ const (
27422742
ReplicationTaskCleanupFailure
27432743
ReplicationTaskLatency
27442744
ExponentialReplicationTaskLatency
2745+
ExponentialReplicationTaskFetchLatency
2746+
ReplicationTasksFetchedSize
27452747
MutableStateChecksumMismatch
27462748
MutableStateChecksumInvalidated
27472749
FailoverMarkerCount
@@ -3535,6 +3537,8 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{
35353537
ReplicationTaskCleanupFailure: {metricName: "replication_task_cleanup_failed", metricType: Counter},
35363538
ReplicationTaskLatency: {metricName: "replication_task_latency", metricType: Timer},
35373539
ExponentialReplicationTaskLatency: {metricName: "replication_task_latency_ns", metricType: Histogram, exponentialBuckets: Mid1ms24h},
3540+
ExponentialReplicationTaskFetchLatency: {metricName: "replication_task_fetch_latency_ns", metricType: Histogram, exponentialBuckets: Mid1ms24h},
3541+
ReplicationTasksFetchedSize: {metricName: "replication_tasks_fetched_size", metricType: Gauge},
35383542
MutableStateChecksumMismatch: {metricName: "mutable_state_checksum_mismatch", metricType: Counter},
35393543
MutableStateChecksumInvalidated: {metricName: "mutable_state_checksum_invalidated", metricType: Counter},
35403544
FailoverMarkerCount: {metricName: "failover_marker_count", metricType: Counter},

service/history/handler/handler.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ func (h *handlerImpl) Start() {
121121
h.config,
122122
h.GetClusterMetadata(),
123123
h.GetClientBean(),
124+
h.GetMetricsClient(),
124125
)
125126
if err != nil {
126127
h.GetLogger().Fatal("Creating replication task fetchers failed", tag.Error(err))

service/history/replication/task_fetcher.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/uber/cadence/common/cluster"
3535
"github.com/uber/cadence/common/log"
3636
"github.com/uber/cadence/common/log/tag"
37+
"github.com/uber/cadence/common/metrics"
3738
"github.com/uber/cadence/common/quotas"
3839
"github.com/uber/cadence/common/types"
3940
"github.com/uber/cadence/service/history/config"
@@ -71,6 +72,7 @@ type (
7172
sourceCluster string
7273
config *config.Config
7374
logger log.Logger
75+
metricsScope metrics.Scope
7476
remotePeer admin.Client
7577
rateLimiter quotas.Limiter
7678
timeSource clock.TimeSource
@@ -99,6 +101,7 @@ func NewTaskFetchers(
99101
config *config.Config,
100102
clusterMetadata cluster.Metadata,
101103
clientBean client.Bean,
104+
metricsClient metrics.Client,
102105
) (TaskFetchers, error) {
103106
currentCluster := clusterMetadata.GetCurrentClusterName()
104107
var fetchers []TaskFetcher
@@ -113,6 +116,7 @@ func NewTaskFetchers(
113116
currentCluster,
114117
config,
115118
remoteFrontendClient,
119+
metricsClient,
116120
)
117121
fetchers = append(fetchers, fetcher)
118122
}
@@ -160,12 +164,14 @@ func newReplicationTaskFetcher(
160164
currentCluster string,
161165
config *config.Config,
162166
sourceFrontend admin.Client,
167+
metricsClient metrics.Client,
163168
) TaskFetcher {
164169
ctx, cancel := context.WithCancel(context.Background())
165170
fetcher := &taskFetcherImpl{
166171
status: common.DaemonStatusInitialized,
167172
config: config,
168173
logger: logger.WithTags(tag.ClusterName(sourceCluster)),
174+
metricsScope: metricsClient.Scope(metrics.ReplicationTaskFetcherScope, metrics.TargetClusterTag(sourceCluster)),
169175
remotePeer: sourceFrontend,
170176
currentCluster: currentCluster,
171177
sourceCluster: sourceCluster,
@@ -210,7 +216,13 @@ func (f *taskFetcherImpl) Stop() {
210216

211217
// fetchTasks collects getReplicationTasks request from shards and send out aggregated request to source frontend.
212218
func (f *taskFetcherImpl) fetchTasks() {
219+
startTime := f.timeSource.Now()
213220
defer f.wg.Done()
221+
defer func() {
222+
totalLatency := f.timeSource.Now().Sub(startTime)
223+
f.metricsScope.ExponentialHistogram(metrics.ExponentialReplicationTaskFetchLatency, totalLatency)
224+
}()
225+
214226
timer := f.timeSource.NewTimer(backoff.JitDuration(
215227
f.config.ReplicationTaskFetcherAggregationInterval(),
216228
f.config.ReplicationTaskFetcherTimerJitterCoefficient(),
@@ -273,6 +285,12 @@ func (f *taskFetcherImpl) fetchAndDistributeTasks(requestByShard map[int32]*requ
273285
return err
274286
}
275287

288+
totalTasks := 0
289+
for _, messages := range messagesByShard {
290+
totalTasks += len(messages.ReplicationTasks)
291+
}
292+
f.metricsScope.UpdateGauge(metrics.ReplicationTasksFetchedSize, float64(totalTasks))
293+
276294
f.logger.Debug("Successfully fetched replication tasks.", tag.Counter(len(messagesByShard)))
277295
for shardID, tasks := range messagesByShard {
278296
request := requestByShard[shardID]

service/history/replication/task_fetcher_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ func (s *taskFetcherSuite) SetupTest() {
8484
"active",
8585
s.config,
8686
s.frontendClient,
87+
metrics.NewNoopMetricsClient(),
8788
).(*taskFetcherImpl)
8889
}
8990

@@ -269,7 +270,7 @@ func TestTaskFetchers(t *testing.T) {
269270
cfg := config.NewForTest()
270271

271272
mockBean.EXPECT().GetRemoteAdminClient(cluster.TestAlternativeClusterName).Return(mockAdminClient, nil)
272-
fetchers, err := NewTaskFetchers(logger, cfg, cluster.TestActiveClusterMetadata, mockBean)
273+
fetchers, err := NewTaskFetchers(logger, cfg, cluster.TestActiveClusterMetadata, mockBean, metrics.NewNoopMetricsClient())
273274
assert.NoError(t, err)
274275
assert.NotNil(t, fetchers)
275276
assert.Len(t, fetchers.GetFetchers(), len(cluster.TestActiveClusterMetadata.GetRemoteClusterInfo()))

0 commit comments

Comments
 (0)