diff --git a/common/metrics/config.go b/common/metrics/config.go index 3e11d6be14d..38a6438f34e 100644 --- a/common/metrics/config.go +++ b/common/metrics/config.go @@ -45,6 +45,8 @@ var HistogramMigrationMetrics = map[string]struct{}{ "task_attempt_counts": {}, "task_attempt_per_domain": {}, "task_attempt_per_domain_counts": {}, + "task_latency": {}, + "task_latency_ns": {}, "task_latency_per_domain": {}, "task_latency_per_domain_ns": {}, "task_latency_processing": {}, @@ -56,8 +58,6 @@ var HistogramMigrationMetrics = map[string]struct{}{ "task_latency_queue_per_domain": {}, "task_latency_queue_per_domain_ns": {}, - // Replication task processor histograms (PR #7685). - // Dual-emitted as timer + histogram. "replication_tasks_lag": {}, "replication_tasks_lag_counts": {}, "replication_tasks_applied_latency": {}, @@ -65,6 +65,16 @@ var HistogramMigrationMetrics = map[string]struct{}{ "replication_task_latency": {}, "replication_task_latency_ns": {}, + + "replication_tasks_returned": {}, + "replication_tasks_returned_counts": {}, + "replication_tasks_returned_diff": {}, + "replication_tasks_returned_diff_counts": {}, + + "replication_tasks_fetched": {}, + "replication_tasks_fetched_counts": {}, + "replication_tasks_lag_raw": {}, + "replication_tasks_lag_raw_counts": {}, } func (h HistogramMigration) EmitTimer(name string) bool { diff --git a/common/metrics/defs.go b/common/metrics/defs.go index f235c331f96..c85188c55f4 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -2484,6 +2484,7 @@ const ( const ( TaskRequests = iota + NumCommonMetrics TaskLatency + ExponentialTaskLatency TaskFailures TaskDiscarded TaskAttemptTimer @@ -2737,10 +2738,14 @@ const ( ReplicationTasksLag ExponentialReplicationTasksLag ReplicationTasksLagRaw + ExponentialReplicationTasksLagRaw ReplicationTasksDelay ReplicationTasksFetched + ExponentialReplicationTasksFetched ReplicationTasksReturned + ExponentialReplicationTasksReturned ReplicationTasksReturnedDiff + ExponentialReplicationTasksReturnedDiff ReplicationTasksAppliedLatency ExponentialReplicationTasksAppliedLatency ReplicationTasksBatchSize @@ -3324,6 +3329,7 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{ History: { TaskRequests: {metricName: "task_requests", metricType: Counter}, TaskLatency: {metricName: "task_latency", metricType: Timer}, + ExponentialTaskLatency: {metricName: "task_latency_ns", metricType: Histogram, exponentialBuckets: Low1ms100s}, TaskAttemptTimer: {metricName: "task_attempt", metricType: Timer}, ExponentialTaskAttemptCounts: {metricName: "task_attempt_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k}, TaskFailures: {metricName: "task_errors", metricType: Counter}, @@ -3564,10 +3570,14 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{ ReplicationTasksLag: {metricName: "replication_tasks_lag", metricType: Timer}, ExponentialReplicationTasksLag: {metricName: "replication_tasks_lag_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k}, ReplicationTasksLagRaw: {metricName: "replication_tasks_lag_raw", metricType: Timer}, + ExponentialReplicationTasksLagRaw: {metricName: "replication_tasks_lag_raw_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k}, ReplicationTasksDelay: {metricName: "replication_tasks_delay", metricType: Histogram, buckets: ReplicationTaskDelayBucket}, ReplicationTasksFetched: {metricName: "replication_tasks_fetched", metricType: Timer}, + ExponentialReplicationTasksFetched: {metricName: "replication_tasks_fetched_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k}, ReplicationTasksReturned: {metricName: "replication_tasks_returned", metricType: Timer}, + ExponentialReplicationTasksReturned: {metricName: "replication_tasks_returned_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k}, ReplicationTasksReturnedDiff: {metricName: "replication_tasks_returned_diff", metricType: Timer}, + ExponentialReplicationTasksReturnedDiff: {metricName: "replication_tasks_returned_diff_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k}, ReplicationTasksAppliedLatency: {metricName: "replication_tasks_applied_latency", metricType: Timer}, ExponentialReplicationTasksAppliedLatency: {metricName: "replication_tasks_applied_latency_ns", metricType: Histogram, exponentialBuckets: Low1ms100s}, ReplicationTasksBatchSize: {metricName: "replication_tasks_batch_size", metricType: Gauge}, diff --git a/service/history/replication/task_ack_manager.go b/service/history/replication/task_ack_manager.go index e335be6a816..e59676de2ec 100644 --- a/service/history/replication/task_ack_manager.go +++ b/service/history/replication/task_ack_manager.go @@ -131,8 +131,12 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la lastReadTaskID = previousReadTaskID } + taskGeneratedStart := t.timeSource.Now() taskGeneratedTimer := t.scope.StartTimer(metrics.TaskLatency) defer taskGeneratedTimer.Stop() + defer func() { + t.scope.ExponentialHistogram(metrics.ExponentialTaskLatency, t.timeSource.Since(taskGeneratedStart)) + }() batchSize := t.dynamicTaskBatchSizer.value() t.scope.UpdateGauge(metrics.ReplicationTasksBatchSize, float64(batchSize)) @@ -141,7 +145,9 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la if err != nil { return nil, err } - t.scope.RecordTimer(metrics.ReplicationTasksFetched, time.Duration(len(taskInfos))) + tasksFetched := len(taskInfos) + t.scope.RecordTimer(metrics.ReplicationTasksFetched, time.Duration(tasksFetched)) + t.scope.IntExponentialHistogram(metrics.ExponentialReplicationTasksFetched, tasksFetched) // Happy path assumption - we will push all tasks to replication tasks. msgs := &types.ReplicationMessages{ @@ -162,7 +168,9 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la oldestUnprocessedTaskTimestamp = t.timeSource.Now().UnixNano() } - t.scope.RecordTimer(metrics.ReplicationTasksLagRaw, time.Duration(t.ackLevels.UpdateIfNeededAndGetQueueMaxReadLevel(persistence.HistoryTaskCategoryReplication, pollingCluster).GetTaskID()-oldestUnprocessedTaskID)) + lagRaw := int(t.ackLevels.UpdateIfNeededAndGetQueueMaxReadLevel(persistence.HistoryTaskCategoryReplication, pollingCluster).GetTaskID() - oldestUnprocessedTaskID) + t.scope.RecordTimer(metrics.ReplicationTasksLagRaw, time.Duration(lagRaw)) + t.scope.IntExponentialHistogram(metrics.ExponentialReplicationTasksLagRaw, lagRaw) t.scope.RecordHistogramDuration(metrics.ReplicationTasksDelay, time.Duration(oldestUnprocessedTaskTimestamp-t.timeSource.Now().UnixNano())) // hydrate the tasks @@ -194,9 +202,17 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la return nil, err } - t.scope.RecordTimer(metrics.ReplicationTasksLag, time.Duration(t.ackLevels.UpdateIfNeededAndGetQueueMaxReadLevel(persistence.HistoryTaskCategoryReplication, pollingCluster).GetTaskID()-msgs.LastRetrievedMessageID)) - t.scope.RecordTimer(metrics.ReplicationTasksReturned, time.Duration(len(msgs.ReplicationTasks))) - t.scope.RecordTimer(metrics.ReplicationTasksReturnedDiff, time.Duration(len(taskInfos)-len(msgs.ReplicationTasks))) + replicationLag := int(t.ackLevels.UpdateIfNeededAndGetQueueMaxReadLevel(persistence.HistoryTaskCategoryReplication, pollingCluster).GetTaskID() - msgs.LastRetrievedMessageID) + t.scope.RecordTimer(metrics.ReplicationTasksLag, time.Duration(replicationLag)) + t.scope.IntExponentialHistogram(metrics.ExponentialReplicationTasksLag, replicationLag) + + tasksReturned := len(msgs.ReplicationTasks) + t.scope.RecordTimer(metrics.ReplicationTasksReturned, time.Duration(tasksReturned)) + t.scope.IntExponentialHistogram(metrics.ExponentialReplicationTasksReturned, tasksReturned) + + tasksReturnedDiff := len(taskInfos) - len(msgs.ReplicationTasks) + t.scope.RecordTimer(metrics.ReplicationTasksReturnedDiff, time.Duration(tasksReturnedDiff)) + t.scope.IntExponentialHistogram(metrics.ExponentialReplicationTasksReturnedDiff, tasksReturnedDiff) t.ackLevel(pollingCluster, lastReadTaskID)