From 0d71a31bc4284229b95fbe94cb6ac908e89efd29 Mon Sep 17 00:00:00 2001 From: Diana Zawadzki Date: Tue, 10 Feb 2026 16:09:44 -0500 Subject: [PATCH 1/9] chore: add histograms for replication tasks lag, returned, and returned diff Dual-emit exponential histograms alongside existing timers for ReplicationTasksLag, ReplicationTasksReturned, and ReplicationTasksReturnedDiff to support histogram migration. ReplicationTasksLag uses the already-defined ExponentialReplicationTasksLag (duration-based Mid1ms24h). Returned and ReturnedDiff use new integer histograms (Mid1To16k) with _counts suffix. All six metric names added to HistogramMigrationMetrics allowlist. Signed-off-by: Diana Zawadzki --- common/metrics/config.go | 9 +++++++++ common/metrics/defs.go | 4 ++++ service/history/replication/task_ack_manager.go | 15 ++++++++++++--- 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/common/metrics/config.go b/common/metrics/config.go index 3e11d6be14d..7676451218a 100644 --- a/common/metrics/config.go +++ b/common/metrics/config.go @@ -65,6 +65,15 @@ var HistogramMigrationMetrics = map[string]struct{}{ "replication_task_latency": {}, "replication_task_latency_ns": {}, + + // Replication tasks lag/returned/diff (replication task-ack path). + // Dual-emitted as timer + histogram. + "replication_tasks_lag": {}, + "replication_tasks_lag_ns": {}, + "replication_tasks_returned": {}, + "replication_tasks_returned_counts": {}, + "replication_tasks_returned_diff": {}, + "replication_tasks_returned_diff_counts": {}, } func (h HistogramMigration) EmitTimer(name string) bool { diff --git a/common/metrics/defs.go b/common/metrics/defs.go index f235c331f96..58d77cad1b1 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -2740,7 +2740,9 @@ const ( ReplicationTasksDelay ReplicationTasksFetched ReplicationTasksReturned + ExponentialReplicationTasksReturned ReplicationTasksReturnedDiff + ExponentialReplicationTasksReturnedDiff ReplicationTasksAppliedLatency ExponentialReplicationTasksAppliedLatency ReplicationTasksBatchSize @@ -3567,7 +3569,9 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{ ReplicationTasksDelay: {metricName: "replication_tasks_delay", metricType: Histogram, buckets: ReplicationTaskDelayBucket}, ReplicationTasksFetched: {metricName: "replication_tasks_fetched", metricType: Timer}, ReplicationTasksReturned: {metricName: "replication_tasks_returned", metricType: Timer}, + ExponentialReplicationTasksReturned: {metricName: "replication_tasks_returned_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k}, ReplicationTasksReturnedDiff: {metricName: "replication_tasks_returned_diff", metricType: Timer}, + ExponentialReplicationTasksReturnedDiff: {metricName: "replication_tasks_returned_diff_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k}, ReplicationTasksAppliedLatency: {metricName: "replication_tasks_applied_latency", metricType: Timer}, ExponentialReplicationTasksAppliedLatency: {metricName: "replication_tasks_applied_latency_ns", metricType: Histogram, exponentialBuckets: Low1ms100s}, ReplicationTasksBatchSize: {metricName: "replication_tasks_batch_size", metricType: Gauge}, diff --git a/service/history/replication/task_ack_manager.go b/service/history/replication/task_ack_manager.go index e335be6a816..e459a6e8d4a 100644 --- a/service/history/replication/task_ack_manager.go +++ b/service/history/replication/task_ack_manager.go @@ -194,9 +194,18 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la return nil, err } - t.scope.RecordTimer(metrics.ReplicationTasksLag, time.Duration(t.ackLevels.UpdateIfNeededAndGetQueueMaxReadLevel(persistence.HistoryTaskCategoryReplication, pollingCluster).GetTaskID()-msgs.LastRetrievedMessageID)) - t.scope.RecordTimer(metrics.ReplicationTasksReturned, time.Duration(len(msgs.ReplicationTasks))) - t.scope.RecordTimer(metrics.ReplicationTasksReturnedDiff, time.Duration(len(taskInfos)-len(msgs.ReplicationTasks))) + // Keep timers (backwards compatible), dual-emit exponential histograms for migration. + replicationLag := time.Duration(t.ackLevels.UpdateIfNeededAndGetQueueMaxReadLevel(persistence.HistoryTaskCategoryReplication, pollingCluster).GetTaskID() - msgs.LastRetrievedMessageID) + t.scope.RecordTimer(metrics.ReplicationTasksLag, replicationLag) + t.scope.ExponentialHistogram(metrics.ExponentialReplicationTasksLag, replicationLag) + + tasksReturned := len(msgs.ReplicationTasks) + t.scope.RecordTimer(metrics.ReplicationTasksReturned, time.Duration(tasksReturned)) + t.scope.IntExponentialHistogram(metrics.ExponentialReplicationTasksReturned, tasksReturned) + + tasksReturnedDiff := len(taskInfos) - len(msgs.ReplicationTasks) + t.scope.RecordTimer(metrics.ReplicationTasksReturnedDiff, time.Duration(tasksReturnedDiff)) + t.scope.IntExponentialHistogram(metrics.ExponentialReplicationTasksReturnedDiff, tasksReturnedDiff) t.ackLevel(pollingCluster, lastReadTaskID) From abd27ad615fdf2166d95ac869844902ae5b960bb Mon Sep 17 00:00:00 2001 From: zawadzki Date: Wed, 25 Feb 2026 22:49:27 +0000 Subject: [PATCH 2/9] Use processing-latency histogram for task-ack path Address review feedback by removing task_latency_ns histogram and using ExponentialTaskProcessingLatency in task_ack_manager. Also remove stale histogram-migration allowlist entries for task_latency/task_latency_ns. Signed-off-by: zawadzki --- common/metrics/config.go | 2 -- service/history/replication/task_ack_manager.go | 4 ++++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/common/metrics/config.go b/common/metrics/config.go index 7676451218a..8fd247adb5c 100644 --- a/common/metrics/config.go +++ b/common/metrics/config.go @@ -68,8 +68,6 @@ var HistogramMigrationMetrics = map[string]struct{}{ // Replication tasks lag/returned/diff (replication task-ack path). // Dual-emitted as timer + histogram. - "replication_tasks_lag": {}, - "replication_tasks_lag_ns": {}, "replication_tasks_returned": {}, "replication_tasks_returned_counts": {}, "replication_tasks_returned_diff": {}, diff --git a/service/history/replication/task_ack_manager.go b/service/history/replication/task_ack_manager.go index e459a6e8d4a..e1e0bd5fe71 100644 --- a/service/history/replication/task_ack_manager.go +++ b/service/history/replication/task_ack_manager.go @@ -131,8 +131,12 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la lastReadTaskID = previousReadTaskID } + taskGeneratedStart := t.timeSource.Now() taskGeneratedTimer := t.scope.StartTimer(metrics.TaskLatency) defer taskGeneratedTimer.Stop() + defer func() { + t.scope.ExponentialHistogram(metrics.ExponentialTaskProcessingLatency, t.timeSource.Since(taskGeneratedStart)) + }() batchSize := t.dynamicTaskBatchSizer.value() t.scope.UpdateGauge(metrics.ReplicationTasksBatchSize, float64(batchSize)) From 6a32d1c689b90c6f0efea8b05736d06f2349ac9e Mon Sep 17 00:00:00 2001 From: Diana Zawadzki Date: Wed, 25 Feb 2026 16:28:09 -0800 Subject: [PATCH 3/9] fix(metrics): align task-ack processing timer with histogram Signed-off-by: Diana Zawadzki --- service/history/replication/task_ack_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/history/replication/task_ack_manager.go b/service/history/replication/task_ack_manager.go index e1e0bd5fe71..97b190a119a 100644 --- a/service/history/replication/task_ack_manager.go +++ b/service/history/replication/task_ack_manager.go @@ -132,7 +132,7 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la } taskGeneratedStart := t.timeSource.Now() - taskGeneratedTimer := t.scope.StartTimer(metrics.TaskLatency) + taskGeneratedTimer := t.scope.StartTimer(metrics.TaskProcessingLatency) defer taskGeneratedTimer.Stop() defer func() { t.scope.ExponentialHistogram(metrics.ExponentialTaskProcessingLatency, t.timeSource.Since(taskGeneratedStart)) From db571b7b0f97c371da577c6b763ec10979dd8a12 Mon Sep 17 00:00:00 2001 From: Diana Zawadzki Date: Thu, 26 Feb 2026 12:32:44 -0800 Subject: [PATCH 4/9] chore(metrics): apply gofmt alignment in histogram migration map Signed-off-by: Diana Zawadzki --- common/metrics/config.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/common/metrics/config.go b/common/metrics/config.go index 8fd247adb5c..1a60d767a62 100644 --- a/common/metrics/config.go +++ b/common/metrics/config.go @@ -68,9 +68,9 @@ var HistogramMigrationMetrics = map[string]struct{}{ // Replication tasks lag/returned/diff (replication task-ack path). // Dual-emitted as timer + histogram. - "replication_tasks_returned": {}, - "replication_tasks_returned_counts": {}, - "replication_tasks_returned_diff": {}, + "replication_tasks_returned": {}, + "replication_tasks_returned_counts": {}, + "replication_tasks_returned_diff": {}, "replication_tasks_returned_diff_counts": {}, } From becb9e13c0af17253d153545a8cb5fa0118ea372 Mon Sep 17 00:00:00 2001 From: Diana Zawadzki Date: Thu, 26 Feb 2026 13:22:50 -0800 Subject: [PATCH 5/9] fix(metrics): emit replication tasks lag via int histogram Signed-off-by: Diana Zawadzki --- service/history/replication/task_ack_manager.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/service/history/replication/task_ack_manager.go b/service/history/replication/task_ack_manager.go index 97b190a119a..7a5173294e6 100644 --- a/service/history/replication/task_ack_manager.go +++ b/service/history/replication/task_ack_manager.go @@ -199,9 +199,9 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la } // Keep timers (backwards compatible), dual-emit exponential histograms for migration. - replicationLag := time.Duration(t.ackLevels.UpdateIfNeededAndGetQueueMaxReadLevel(persistence.HistoryTaskCategoryReplication, pollingCluster).GetTaskID() - msgs.LastRetrievedMessageID) - t.scope.RecordTimer(metrics.ReplicationTasksLag, replicationLag) - t.scope.ExponentialHistogram(metrics.ExponentialReplicationTasksLag, replicationLag) + replicationLag := int(t.ackLevels.UpdateIfNeededAndGetQueueMaxReadLevel(persistence.HistoryTaskCategoryReplication, pollingCluster).GetTaskID() - msgs.LastRetrievedMessageID) + t.scope.RecordTimer(metrics.ReplicationTasksLag, time.Duration(replicationLag)) + t.scope.IntExponentialHistogram(metrics.ExponentialReplicationTasksLag, replicationLag) tasksReturned := len(msgs.ReplicationTasks) t.scope.RecordTimer(metrics.ReplicationTasksReturned, time.Duration(tasksReturned)) From f1165c84a97e06f37f7ca97a8e731ca13c223159 Mon Sep 17 00:00:00 2001 From: Diana Zawadzki Date: Fri, 13 Feb 2026 14:24:56 -0500 Subject: [PATCH 6/9] chore(metrics): add dual-emission for replication fetched and lag raw Add integer histogram dual-emission for ReplicationTasksFetched and ReplicationTasksLagRaw, and include their names in HistogramMigrationMetrics for config consistency checks. Signed-off-by: Diana Zawadzki --- common/metrics/config.go | 5 +++++ common/metrics/defs.go | 4 ++++ service/history/replication/task_ack_manager.go | 8 ++++++-- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/common/metrics/config.go b/common/metrics/config.go index 1a60d767a62..1a3e0279aec 100644 --- a/common/metrics/config.go +++ b/common/metrics/config.go @@ -72,6 +72,11 @@ var HistogramMigrationMetrics = map[string]struct{}{ "replication_tasks_returned_counts": {}, "replication_tasks_returned_diff": {}, "replication_tasks_returned_diff_counts": {}, + + "replication_tasks_fetched": {}, + "replication_tasks_fetched_counts": {}, + "replication_tasks_lag_raw": {}, + "replication_tasks_lag_raw_counts": {}, } func (h HistogramMigration) EmitTimer(name string) bool { diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 58d77cad1b1..1a7ad004ba2 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -2737,8 +2737,10 @@ const ( ReplicationTasksLag ExponentialReplicationTasksLag ReplicationTasksLagRaw + ExponentialReplicationTasksLagRaw ReplicationTasksDelay ReplicationTasksFetched + ExponentialReplicationTasksFetched ReplicationTasksReturned ExponentialReplicationTasksReturned ReplicationTasksReturnedDiff @@ -3566,8 +3568,10 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{ ReplicationTasksLag: {metricName: "replication_tasks_lag", metricType: Timer}, ExponentialReplicationTasksLag: {metricName: "replication_tasks_lag_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k}, ReplicationTasksLagRaw: {metricName: "replication_tasks_lag_raw", metricType: Timer}, + ExponentialReplicationTasksLagRaw: {metricName: "replication_tasks_lag_raw_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k}, ReplicationTasksDelay: {metricName: "replication_tasks_delay", metricType: Histogram, buckets: ReplicationTaskDelayBucket}, ReplicationTasksFetched: {metricName: "replication_tasks_fetched", metricType: Timer}, + ExponentialReplicationTasksFetched: {metricName: "replication_tasks_fetched_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k}, ReplicationTasksReturned: {metricName: "replication_tasks_returned", metricType: Timer}, ExponentialReplicationTasksReturned: {metricName: "replication_tasks_returned_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k}, ReplicationTasksReturnedDiff: {metricName: "replication_tasks_returned_diff", metricType: Timer}, diff --git a/service/history/replication/task_ack_manager.go b/service/history/replication/task_ack_manager.go index 7a5173294e6..7b873f5fa81 100644 --- a/service/history/replication/task_ack_manager.go +++ b/service/history/replication/task_ack_manager.go @@ -145,7 +145,9 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la if err != nil { return nil, err } - t.scope.RecordTimer(metrics.ReplicationTasksFetched, time.Duration(len(taskInfos))) + tasksFetched := len(taskInfos) + t.scope.RecordTimer(metrics.ReplicationTasksFetched, time.Duration(tasksFetched)) + t.scope.IntExponentialHistogram(metrics.ExponentialReplicationTasksFetched, tasksFetched) // Happy path assumption - we will push all tasks to replication tasks. msgs := &types.ReplicationMessages{ @@ -166,7 +168,9 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la oldestUnprocessedTaskTimestamp = t.timeSource.Now().UnixNano() } - t.scope.RecordTimer(metrics.ReplicationTasksLagRaw, time.Duration(t.ackLevels.UpdateIfNeededAndGetQueueMaxReadLevel(persistence.HistoryTaskCategoryReplication, pollingCluster).GetTaskID()-oldestUnprocessedTaskID)) + lagRaw := int(t.ackLevels.UpdateIfNeededAndGetQueueMaxReadLevel(persistence.HistoryTaskCategoryReplication, pollingCluster).GetTaskID() - oldestUnprocessedTaskID) + t.scope.RecordTimer(metrics.ReplicationTasksLagRaw, time.Duration(lagRaw)) + t.scope.IntExponentialHistogram(metrics.ExponentialReplicationTasksLagRaw, lagRaw) t.scope.RecordHistogramDuration(metrics.ReplicationTasksDelay, time.Duration(oldestUnprocessedTaskTimestamp-t.timeSource.Now().UnixNano())) // hydrate the tasks From a23691c782b2ccb294f2e90465ec46acfd16f6c4 Mon Sep 17 00:00:00 2001 From: Diana Zawadzki Date: Thu, 26 Feb 2026 12:07:49 -0800 Subject: [PATCH 7/9] fix(metrics): dual-emit task latency histogram in replication ack manager Signed-off-by: Diana Zawadzki --- common/metrics/config.go | 2 ++ common/metrics/defs.go | 2 ++ service/history/replication/task_ack_manager.go | 6 +++++- 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/common/metrics/config.go b/common/metrics/config.go index 1a3e0279aec..545ecf20cd9 100644 --- a/common/metrics/config.go +++ b/common/metrics/config.go @@ -45,6 +45,8 @@ var HistogramMigrationMetrics = map[string]struct{}{ "task_attempt_counts": {}, "task_attempt_per_domain": {}, "task_attempt_per_domain_counts": {}, + "task_latency": {}, + "task_latency_ns": {}, "task_latency_per_domain": {}, "task_latency_per_domain_ns": {}, "task_latency_processing": {}, diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 1a7ad004ba2..c85188c55f4 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -2484,6 +2484,7 @@ const ( const ( TaskRequests = iota + NumCommonMetrics TaskLatency + ExponentialTaskLatency TaskFailures TaskDiscarded TaskAttemptTimer @@ -3328,6 +3329,7 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{ History: { TaskRequests: {metricName: "task_requests", metricType: Counter}, TaskLatency: {metricName: "task_latency", metricType: Timer}, + ExponentialTaskLatency: {metricName: "task_latency_ns", metricType: Histogram, exponentialBuckets: Low1ms100s}, TaskAttemptTimer: {metricName: "task_attempt", metricType: Timer}, ExponentialTaskAttemptCounts: {metricName: "task_attempt_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k}, TaskFailures: {metricName: "task_errors", metricType: Counter}, diff --git a/service/history/replication/task_ack_manager.go b/service/history/replication/task_ack_manager.go index 7b873f5fa81..8b9378f4ed9 100644 --- a/service/history/replication/task_ack_manager.go +++ b/service/history/replication/task_ack_manager.go @@ -135,7 +135,11 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la taskGeneratedTimer := t.scope.StartTimer(metrics.TaskProcessingLatency) defer taskGeneratedTimer.Stop() defer func() { - t.scope.ExponentialHistogram(metrics.ExponentialTaskProcessingLatency, t.timeSource.Since(taskGeneratedStart)) + taskLatency := t.timeSource.Since(taskGeneratedStart) + // Preserve current task-ack processing latency metrics and also migrate legacy task latency. + t.scope.ExponentialHistogram(metrics.ExponentialTaskProcessingLatency, taskLatency) + t.scope.RecordTimer(metrics.TaskLatency, taskLatency) + t.scope.ExponentialHistogram(metrics.ExponentialTaskLatency, taskLatency) }() batchSize := t.dynamicTaskBatchSizer.value() From 9d367ec48809d8d417ca58e0866a96da9bb99261 Mon Sep 17 00:00:00 2001 From: Diana Zawadzki Date: Thu, 26 Feb 2026 15:03:31 -0800 Subject: [PATCH 8/9] fix(metrics): keep task-ack latency migration scoped to TaskLatency Signed-off-by: Diana Zawadzki --- service/history/replication/task_ack_manager.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/service/history/replication/task_ack_manager.go b/service/history/replication/task_ack_manager.go index 8b9378f4ed9..37e4a1e3c32 100644 --- a/service/history/replication/task_ack_manager.go +++ b/service/history/replication/task_ack_manager.go @@ -131,15 +131,12 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la lastReadTaskID = previousReadTaskID } + // Keep timer (backwards compatible), dual-emit exponential histogram for migration. taskGeneratedStart := t.timeSource.Now() - taskGeneratedTimer := t.scope.StartTimer(metrics.TaskProcessingLatency) + taskGeneratedTimer := t.scope.StartTimer(metrics.TaskLatency) defer taskGeneratedTimer.Stop() defer func() { - taskLatency := t.timeSource.Since(taskGeneratedStart) - // Preserve current task-ack processing latency metrics and also migrate legacy task latency. - t.scope.ExponentialHistogram(metrics.ExponentialTaskProcessingLatency, taskLatency) - t.scope.RecordTimer(metrics.TaskLatency, taskLatency) - t.scope.ExponentialHistogram(metrics.ExponentialTaskLatency, taskLatency) + t.scope.ExponentialHistogram(metrics.ExponentialTaskLatency, t.timeSource.Since(taskGeneratedStart)) }() batchSize := t.dynamicTaskBatchSizer.value() From 5328b5d61a1e82a22b93688552bc10e737a14f04 Mon Sep 17 00:00:00 2001 From: Diana Zawadzki Date: Thu, 26 Feb 2026 15:09:20 -0800 Subject: [PATCH 9/9] chore(metrics): remove task-ack migration inline comments Signed-off-by: Diana Zawadzki --- common/metrics/config.go | 4 ---- service/history/replication/task_ack_manager.go | 2 -- 2 files changed, 6 deletions(-) diff --git a/common/metrics/config.go b/common/metrics/config.go index 545ecf20cd9..38a6438f34e 100644 --- a/common/metrics/config.go +++ b/common/metrics/config.go @@ -58,8 +58,6 @@ var HistogramMigrationMetrics = map[string]struct{}{ "task_latency_queue_per_domain": {}, "task_latency_queue_per_domain_ns": {}, - // Replication task processor histograms (PR #7685). - // Dual-emitted as timer + histogram. "replication_tasks_lag": {}, "replication_tasks_lag_counts": {}, "replication_tasks_applied_latency": {}, @@ -68,8 +66,6 @@ var HistogramMigrationMetrics = map[string]struct{}{ "replication_task_latency": {}, "replication_task_latency_ns": {}, - // Replication tasks lag/returned/diff (replication task-ack path). - // Dual-emitted as timer + histogram. "replication_tasks_returned": {}, "replication_tasks_returned_counts": {}, "replication_tasks_returned_diff": {}, diff --git a/service/history/replication/task_ack_manager.go b/service/history/replication/task_ack_manager.go index 37e4a1e3c32..e59676de2ec 100644 --- a/service/history/replication/task_ack_manager.go +++ b/service/history/replication/task_ack_manager.go @@ -131,7 +131,6 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la lastReadTaskID = previousReadTaskID } - // Keep timer (backwards compatible), dual-emit exponential histogram for migration. taskGeneratedStart := t.timeSource.Now() taskGeneratedTimer := t.scope.StartTimer(metrics.TaskLatency) defer taskGeneratedTimer.Stop() @@ -203,7 +202,6 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la return nil, err } - // Keep timers (backwards compatible), dual-emit exponential histograms for migration. replicationLag := int(t.ackLevels.UpdateIfNeededAndGetQueueMaxReadLevel(persistence.HistoryTaskCategoryReplication, pollingCluster).GetTaskID() - msgs.LastRetrievedMessageID) t.scope.RecordTimer(metrics.ReplicationTasksLag, time.Duration(replicationLag)) t.scope.IntExponentialHistogram(metrics.ExponentialReplicationTasksLag, replicationLag)