Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions common/metrics/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ var HistogramMigrationMetrics = map[string]struct{}{
"task_attempt_counts": {},
"task_attempt_per_domain": {},
"task_attempt_per_domain_counts": {},
"task_latency": {},
"task_latency_ns": {},
"task_latency_per_domain": {},
"task_latency_per_domain_ns": {},
"task_latency_processing": {},
Expand All @@ -65,6 +67,18 @@ var HistogramMigrationMetrics = map[string]struct{}{

"replication_task_latency": {},
"replication_task_latency_ns": {},

// Replication tasks lag/returned/diff (replication task-ack path).
// Dual-emitted as timer + histogram.
"replication_tasks_returned": {},
"replication_tasks_returned_counts": {},
"replication_tasks_returned_diff": {},
"replication_tasks_returned_diff_counts": {},

"replication_tasks_fetched": {},
"replication_tasks_fetched_counts": {},
"replication_tasks_lag_raw": {},
"replication_tasks_lag_raw_counts": {},
}

func (h HistogramMigration) EmitTimer(name string) bool {
Expand Down
10 changes: 10 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2484,6 +2484,7 @@ const (
const (
TaskRequests = iota + NumCommonMetrics
TaskLatency
ExponentialTaskLatency
TaskFailures
TaskDiscarded
TaskAttemptTimer
Expand Down Expand Up @@ -2737,10 +2738,14 @@ const (
ReplicationTasksLag
ExponentialReplicationTasksLag
ReplicationTasksLagRaw
ExponentialReplicationTasksLagRaw
ReplicationTasksDelay
ReplicationTasksFetched
ExponentialReplicationTasksFetched
ReplicationTasksReturned
ExponentialReplicationTasksReturned
ReplicationTasksReturnedDiff
ExponentialReplicationTasksReturnedDiff
ReplicationTasksAppliedLatency
ExponentialReplicationTasksAppliedLatency
ReplicationTasksBatchSize
Expand Down Expand Up @@ -3324,6 +3329,7 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{
History: {
TaskRequests: {metricName: "task_requests", metricType: Counter},
TaskLatency: {metricName: "task_latency", metricType: Timer},
ExponentialTaskLatency: {metricName: "task_latency_ns", metricType: Histogram, exponentialBuckets: Low1ms100s},
TaskAttemptTimer: {metricName: "task_attempt", metricType: Timer},
ExponentialTaskAttemptCounts: {metricName: "task_attempt_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k},
TaskFailures: {metricName: "task_errors", metricType: Counter},
Expand Down Expand Up @@ -3564,10 +3570,14 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{
ReplicationTasksLag: {metricName: "replication_tasks_lag", metricType: Timer},
ExponentialReplicationTasksLag: {metricName: "replication_tasks_lag_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k},
ReplicationTasksLagRaw: {metricName: "replication_tasks_lag_raw", metricType: Timer},
ExponentialReplicationTasksLagRaw: {metricName: "replication_tasks_lag_raw_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k},
ReplicationTasksDelay: {metricName: "replication_tasks_delay", metricType: Histogram, buckets: ReplicationTaskDelayBucket},
ReplicationTasksFetched: {metricName: "replication_tasks_fetched", metricType: Timer},
ExponentialReplicationTasksFetched: {metricName: "replication_tasks_fetched_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k},
ReplicationTasksReturned: {metricName: "replication_tasks_returned", metricType: Timer},
ExponentialReplicationTasksReturned: {metricName: "replication_tasks_returned_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k},
ReplicationTasksReturnedDiff: {metricName: "replication_tasks_returned_diff", metricType: Timer},
ExponentialReplicationTasksReturnedDiff: {metricName: "replication_tasks_returned_diff_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k},
ReplicationTasksAppliedLatency: {metricName: "replication_tasks_applied_latency", metricType: Timer},
ExponentialReplicationTasksAppliedLatency: {metricName: "replication_tasks_applied_latency_ns", metricType: Histogram, exponentialBuckets: Low1ms100s},
ReplicationTasksBatchSize: {metricName: "replication_tasks_batch_size", metricType: Gauge},
Expand Down
33 changes: 27 additions & 6 deletions service/history/replication/task_ack_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,16 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la
lastReadTaskID = previousReadTaskID
}

taskGeneratedTimer := t.scope.StartTimer(metrics.TaskLatency)
taskGeneratedStart := t.timeSource.Now()
taskGeneratedTimer := t.scope.StartTimer(metrics.TaskProcessingLatency)
defer taskGeneratedTimer.Stop()
defer func() {
taskLatency := t.timeSource.Since(taskGeneratedStart)
// Preserve current task-ack processing latency metrics and also migrate legacy task latency.
t.scope.ExponentialHistogram(metrics.ExponentialTaskProcessingLatency, taskLatency)
t.scope.RecordTimer(metrics.TaskLatency, taskLatency)
t.scope.ExponentialHistogram(metrics.ExponentialTaskLatency, taskLatency)
}()

batchSize := t.dynamicTaskBatchSizer.value()
t.scope.UpdateGauge(metrics.ReplicationTasksBatchSize, float64(batchSize))
Expand All @@ -141,7 +149,9 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la
if err != nil {
return nil, err
}
t.scope.RecordTimer(metrics.ReplicationTasksFetched, time.Duration(len(taskInfos)))
tasksFetched := len(taskInfos)
t.scope.RecordTimer(metrics.ReplicationTasksFetched, time.Duration(tasksFetched))
t.scope.IntExponentialHistogram(metrics.ExponentialReplicationTasksFetched, tasksFetched)

// Happy path assumption - we will push all tasks to replication tasks.
msgs := &types.ReplicationMessages{
Expand All @@ -162,7 +172,9 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la
oldestUnprocessedTaskTimestamp = t.timeSource.Now().UnixNano()
}

t.scope.RecordTimer(metrics.ReplicationTasksLagRaw, time.Duration(t.ackLevels.UpdateIfNeededAndGetQueueMaxReadLevel(persistence.HistoryTaskCategoryReplication, pollingCluster).GetTaskID()-oldestUnprocessedTaskID))
lagRaw := int(t.ackLevels.UpdateIfNeededAndGetQueueMaxReadLevel(persistence.HistoryTaskCategoryReplication, pollingCluster).GetTaskID() - oldestUnprocessedTaskID)
t.scope.RecordTimer(metrics.ReplicationTasksLagRaw, time.Duration(lagRaw))
t.scope.IntExponentialHistogram(metrics.ExponentialReplicationTasksLagRaw, lagRaw)
t.scope.RecordHistogramDuration(metrics.ReplicationTasksDelay, time.Duration(oldestUnprocessedTaskTimestamp-t.timeSource.Now().UnixNano()))

// hydrate the tasks
Expand Down Expand Up @@ -194,9 +206,18 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la
return nil, err
}

t.scope.RecordTimer(metrics.ReplicationTasksLag, time.Duration(t.ackLevels.UpdateIfNeededAndGetQueueMaxReadLevel(persistence.HistoryTaskCategoryReplication, pollingCluster).GetTaskID()-msgs.LastRetrievedMessageID))
t.scope.RecordTimer(metrics.ReplicationTasksReturned, time.Duration(len(msgs.ReplicationTasks)))
t.scope.RecordTimer(metrics.ReplicationTasksReturnedDiff, time.Duration(len(taskInfos)-len(msgs.ReplicationTasks)))
// Keep timers (backwards compatible), dual-emit exponential histograms for migration.
replicationLag := int(t.ackLevels.UpdateIfNeededAndGetQueueMaxReadLevel(persistence.HistoryTaskCategoryReplication, pollingCluster).GetTaskID() - msgs.LastRetrievedMessageID)
t.scope.RecordTimer(metrics.ReplicationTasksLag, time.Duration(replicationLag))
t.scope.IntExponentialHistogram(metrics.ExponentialReplicationTasksLag, replicationLag)

tasksReturned := len(msgs.ReplicationTasks)
t.scope.RecordTimer(metrics.ReplicationTasksReturned, time.Duration(tasksReturned))
t.scope.IntExponentialHistogram(metrics.ExponentialReplicationTasksReturned, tasksReturned)

tasksReturnedDiff := len(taskInfos) - len(msgs.ReplicationTasks)
t.scope.RecordTimer(metrics.ReplicationTasksReturnedDiff, time.Duration(tasksReturnedDiff))
t.scope.IntExponentialHistogram(metrics.ExponentialReplicationTasksReturnedDiff, tasksReturnedDiff)

t.ackLevel(pollingCluster, lastReadTaskID)

Expand Down
Loading