Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions common/metrics/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ var HistogramMigrationMetrics = map[string]struct{}{

"replication_task_latency": {},
"replication_task_latency_ns": {},

// Replication tasks lag/returned/diff (replication task-ack path).
// Dual-emitted as timer + histogram.
"replication_tasks_returned": {},
"replication_tasks_returned_counts": {},
"replication_tasks_returned_diff": {},
"replication_tasks_returned_diff_counts": {},
}

func (h HistogramMigration) EmitTimer(name string) bool {
Expand Down
4 changes: 4 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2740,7 +2740,9 @@ const (
ReplicationTasksDelay
ReplicationTasksFetched
ReplicationTasksReturned
ExponentialReplicationTasksReturned
ReplicationTasksReturnedDiff
ExponentialReplicationTasksReturnedDiff
ReplicationTasksAppliedLatency
ExponentialReplicationTasksAppliedLatency
ReplicationTasksBatchSize
Expand Down Expand Up @@ -3567,7 +3569,9 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{
ReplicationTasksDelay: {metricName: "replication_tasks_delay", metricType: Histogram, buckets: ReplicationTaskDelayBucket},
ReplicationTasksFetched: {metricName: "replication_tasks_fetched", metricType: Timer},
ReplicationTasksReturned: {metricName: "replication_tasks_returned", metricType: Timer},
ExponentialReplicationTasksReturned: {metricName: "replication_tasks_returned_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k},
ReplicationTasksReturnedDiff: {metricName: "replication_tasks_returned_diff", metricType: Timer},
ExponentialReplicationTasksReturnedDiff: {metricName: "replication_tasks_returned_diff_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k},
ReplicationTasksAppliedLatency: {metricName: "replication_tasks_applied_latency", metricType: Timer},
ExponentialReplicationTasksAppliedLatency: {metricName: "replication_tasks_applied_latency_ns", metricType: Histogram, exponentialBuckets: Low1ms100s},
ReplicationTasksBatchSize: {metricName: "replication_tasks_batch_size", metricType: Gauge},
Expand Down
21 changes: 17 additions & 4 deletions service/history/replication/task_ack_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,12 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la
lastReadTaskID = previousReadTaskID
}

taskGeneratedTimer := t.scope.StartTimer(metrics.TaskLatency)
taskGeneratedStart := t.timeSource.Now()
taskGeneratedTimer := t.scope.StartTimer(metrics.TaskProcessingLatency)
defer taskGeneratedTimer.Stop()
defer func() {
t.scope.ExponentialHistogram(metrics.ExponentialTaskProcessingLatency, t.timeSource.Since(taskGeneratedStart))
}()

batchSize := t.dynamicTaskBatchSizer.value()
t.scope.UpdateGauge(metrics.ReplicationTasksBatchSize, float64(batchSize))
Expand Down Expand Up @@ -194,9 +198,18 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la
return nil, err
}

t.scope.RecordTimer(metrics.ReplicationTasksLag, time.Duration(t.ackLevels.UpdateIfNeededAndGetQueueMaxReadLevel(persistence.HistoryTaskCategoryReplication, pollingCluster).GetTaskID()-msgs.LastRetrievedMessageID))
t.scope.RecordTimer(metrics.ReplicationTasksReturned, time.Duration(len(msgs.ReplicationTasks)))
t.scope.RecordTimer(metrics.ReplicationTasksReturnedDiff, time.Duration(len(taskInfos)-len(msgs.ReplicationTasks)))
// Keep timers (backwards compatible), dual-emit exponential histograms for migration.
replicationLag := time.Duration(t.ackLevels.UpdateIfNeededAndGetQueueMaxReadLevel(persistence.HistoryTaskCategoryReplication, pollingCluster).GetTaskID() - msgs.LastRetrievedMessageID)
t.scope.RecordTimer(metrics.ReplicationTasksLag, replicationLag)
t.scope.ExponentialHistogram(metrics.ExponentialReplicationTasksLag, replicationLag)

tasksReturned := len(msgs.ReplicationTasks)
t.scope.RecordTimer(metrics.ReplicationTasksReturned, time.Duration(tasksReturned))
t.scope.IntExponentialHistogram(metrics.ExponentialReplicationTasksReturned, tasksReturned)

tasksReturnedDiff := len(taskInfos) - len(msgs.ReplicationTasks)
t.scope.RecordTimer(metrics.ReplicationTasksReturnedDiff, time.Duration(tasksReturnedDiff))
t.scope.IntExponentialHistogram(metrics.ExponentialReplicationTasksReturnedDiff, tasksReturnedDiff)

t.ackLevel(pollingCluster, lastReadTaskID)

Expand Down
Loading