Skip to content

Commit 624556c

Browse files
fix(metrics): add replication task latency histogram (#7684)
**What changed?** Added an exponential histogram for replication task latency in TaskAckManager alongside the existing timer (task_latency_ns) **Why?** Replication task latency was tracked only via timers, which limits distribution analysis. Adding a histogram retains the existing timer while enabling bucketed/percentile insights in M3/Grafana without changing processing logic. **How did you test it?** Verified metric emission in uMonitor. **Potential risks** Low risk: additional metrics emission on the replication ack path; minimal overhead. **Release notes** Replication now emits task_latency_ns histogram for replication task latency. CadenceCDNC-17610 **Documentation Changes** N/A --------- Signed-off-by: Diana Zawadzki <dzawa@live.de> Signed-off-by: zawadzki <zawadzki@uber.com> Co-authored-by: zawadzki <zawadzki@uber.com>
1 parent 1d9e012 commit 624556c

File tree

3 files changed

+43
-5
lines changed

3 files changed

+43
-5
lines changed

common/metrics/config.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ var HistogramMigrationMetrics = map[string]struct{}{
4545
"task_attempt_counts": {},
4646
"task_attempt_per_domain": {},
4747
"task_attempt_per_domain_counts": {},
48+
"task_latency": {},
49+
"task_latency_ns": {},
4850
"task_latency_per_domain": {},
4951
"task_latency_per_domain_ns": {},
5052
"task_latency_processing": {},
@@ -68,6 +70,16 @@ var HistogramMigrationMetrics = map[string]struct{}{
6870

6971
"replication_task_latency": {},
7072
"replication_task_latency_ns": {},
73+
74+
"replication_tasks_returned": {},
75+
"replication_tasks_returned_counts": {},
76+
"replication_tasks_returned_diff": {},
77+
"replication_tasks_returned_diff_counts": {},
78+
79+
"replication_tasks_fetched": {},
80+
"replication_tasks_fetched_counts": {},
81+
"replication_tasks_lag_raw": {},
82+
"replication_tasks_lag_raw_counts": {},
7183
}
7284

7385
func (h HistogramMigration) EmitTimer(name string) bool {

common/metrics/defs.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2484,6 +2484,7 @@ const (
24842484
const (
24852485
TaskRequests = iota + NumCommonMetrics
24862486
TaskLatency
2487+
ExponentialTaskLatency
24872488
TaskFailures
24882489
TaskDiscarded
24892490
TaskAttemptTimer
@@ -2739,10 +2740,14 @@ const (
27392740
ReplicationTasksLag
27402741
ExponentialReplicationTasksLag
27412742
ReplicationTasksLagRaw
2743+
ExponentialReplicationTasksLagRaw
27422744
ReplicationTasksDelay
27432745
ReplicationTasksFetched
2746+
ExponentialReplicationTasksFetched
27442747
ReplicationTasksReturned
2748+
ExponentialReplicationTasksReturned
27452749
ReplicationTasksReturnedDiff
2750+
ExponentialReplicationTasksReturnedDiff
27462751
ReplicationTasksAppliedLatency
27472752
ExponentialReplicationTasksAppliedLatency
27482753
ReplicationTasksBatchSize
@@ -3326,6 +3331,7 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{
33263331
History: {
33273332
TaskRequests: {metricName: "task_requests", metricType: Counter},
33283333
TaskLatency: {metricName: "task_latency", metricType: Timer},
3334+
ExponentialTaskLatency: {metricName: "task_latency_ns", metricType: Histogram, exponentialBuckets: Low1ms100s},
33293335
TaskAttemptTimer: {metricName: "task_attempt", metricType: Timer},
33303336
ExponentialTaskAttemptCounts: {metricName: "task_attempt_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k},
33313337
TaskFailures: {metricName: "task_errors", metricType: Counter},
@@ -3568,10 +3574,14 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{
35683574
ReplicationTasksLag: {metricName: "replication_tasks_lag", metricType: Timer},
35693575
ExponentialReplicationTasksLag: {metricName: "replication_tasks_lag_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k},
35703576
ReplicationTasksLagRaw: {metricName: "replication_tasks_lag_raw", metricType: Timer},
3577+
ExponentialReplicationTasksLagRaw: {metricName: "replication_tasks_lag_raw_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k},
35713578
ReplicationTasksDelay: {metricName: "replication_tasks_delay", metricType: Histogram, buckets: ReplicationTaskDelayBucket},
35723579
ReplicationTasksFetched: {metricName: "replication_tasks_fetched", metricType: Timer},
3580+
ExponentialReplicationTasksFetched: {metricName: "replication_tasks_fetched_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k},
35733581
ReplicationTasksReturned: {metricName: "replication_tasks_returned", metricType: Timer},
3582+
ExponentialReplicationTasksReturned: {metricName: "replication_tasks_returned_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k},
35743583
ReplicationTasksReturnedDiff: {metricName: "replication_tasks_returned_diff", metricType: Timer},
3584+
ExponentialReplicationTasksReturnedDiff: {metricName: "replication_tasks_returned_diff_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k},
35753585
ReplicationTasksAppliedLatency: {metricName: "replication_tasks_applied_latency", metricType: Timer},
35763586
ExponentialReplicationTasksAppliedLatency: {metricName: "replication_tasks_applied_latency_ns", metricType: Histogram, exponentialBuckets: Low1ms100s},
35773587
ReplicationTasksBatchSize: {metricName: "replication_tasks_batch_size", metricType: Gauge},

service/history/replication/task_ack_manager.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,12 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la
131131
lastReadTaskID = previousReadTaskID
132132
}
133133

134+
taskGeneratedStart := t.timeSource.Now()
134135
taskGeneratedTimer := t.scope.StartTimer(metrics.TaskLatency)
135136
defer taskGeneratedTimer.Stop()
137+
defer func() {
138+
t.scope.ExponentialHistogram(metrics.ExponentialTaskLatency, t.timeSource.Since(taskGeneratedStart))
139+
}()
136140

137141
batchSize := t.dynamicTaskBatchSizer.value()
138142
t.scope.UpdateGauge(metrics.ReplicationTasksBatchSize, float64(batchSize))
@@ -141,7 +145,9 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la
141145
if err != nil {
142146
return nil, err
143147
}
144-
t.scope.RecordTimer(metrics.ReplicationTasksFetched, time.Duration(len(taskInfos)))
148+
tasksFetched := len(taskInfos)
149+
t.scope.RecordTimer(metrics.ReplicationTasksFetched, time.Duration(tasksFetched))
150+
t.scope.IntExponentialHistogram(metrics.ExponentialReplicationTasksFetched, tasksFetched)
145151

146152
// Happy path assumption - we will push all tasks to replication tasks.
147153
msgs := &types.ReplicationMessages{
@@ -162,7 +168,9 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la
162168
oldestUnprocessedTaskTimestamp = t.timeSource.Now().UnixNano()
163169
}
164170

165-
t.scope.RecordTimer(metrics.ReplicationTasksLagRaw, time.Duration(t.ackLevels.UpdateIfNeededAndGetQueueMaxReadLevel(persistence.HistoryTaskCategoryReplication, pollingCluster).GetTaskID()-oldestUnprocessedTaskID))
171+
lagRaw := int(t.ackLevels.UpdateIfNeededAndGetQueueMaxReadLevel(persistence.HistoryTaskCategoryReplication, pollingCluster).GetTaskID() - oldestUnprocessedTaskID)
172+
t.scope.RecordTimer(metrics.ReplicationTasksLagRaw, time.Duration(lagRaw))
173+
t.scope.IntExponentialHistogram(metrics.ExponentialReplicationTasksLagRaw, lagRaw)
166174
t.scope.RecordHistogramDuration(metrics.ReplicationTasksDelay, time.Duration(oldestUnprocessedTaskTimestamp-t.timeSource.Now().UnixNano()))
167175

168176
// hydrate the tasks
@@ -194,9 +202,17 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la
194202
return nil, err
195203
}
196204

197-
t.scope.RecordTimer(metrics.ReplicationTasksLag, time.Duration(t.ackLevels.UpdateIfNeededAndGetQueueMaxReadLevel(persistence.HistoryTaskCategoryReplication, pollingCluster).GetTaskID()-msgs.LastRetrievedMessageID))
198-
t.scope.RecordTimer(metrics.ReplicationTasksReturned, time.Duration(len(msgs.ReplicationTasks)))
199-
t.scope.RecordTimer(metrics.ReplicationTasksReturnedDiff, time.Duration(len(taskInfos)-len(msgs.ReplicationTasks)))
205+
replicationLag := int(t.ackLevels.UpdateIfNeededAndGetQueueMaxReadLevel(persistence.HistoryTaskCategoryReplication, pollingCluster).GetTaskID() - msgs.LastRetrievedMessageID)
206+
t.scope.RecordTimer(metrics.ReplicationTasksLag, time.Duration(replicationLag))
207+
t.scope.IntExponentialHistogram(metrics.ExponentialReplicationTasksLag, replicationLag)
208+
209+
tasksReturned := len(msgs.ReplicationTasks)
210+
t.scope.RecordTimer(metrics.ReplicationTasksReturned, time.Duration(tasksReturned))
211+
t.scope.IntExponentialHistogram(metrics.ExponentialReplicationTasksReturned, tasksReturned)
212+
213+
tasksReturnedDiff := len(taskInfos) - len(msgs.ReplicationTasks)
214+
t.scope.RecordTimer(metrics.ReplicationTasksReturnedDiff, time.Duration(tasksReturnedDiff))
215+
t.scope.IntExponentialHistogram(metrics.ExponentialReplicationTasksReturnedDiff, tasksReturnedDiff)
200216

201217
t.ackLevel(pollingCluster, lastReadTaskID)
202218

0 commit comments

Comments
 (0)