@@ -147,6 +147,7 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la
147147 return nil , err
148148 }
149149 t .scope .RecordTimer (metrics .ReplicationTasksFetched , time .Duration (len (taskInfos )))
150+ t .scope .IntExponentialHistogram (metrics .ExponentialReplicationTasksFetched , len (taskInfos ))
150151
151152 // Happy path assumption - we will push all tasks to replication tasks.
152153 msgs := & types.ReplicationMessages {
@@ -167,7 +168,10 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la
167168 oldestUnprocessedTaskTimestamp = t .timeSource .Now ().UnixNano ()
168169 }
169170
170- t .scope .RecordTimer (metrics .ReplicationTasksLagRaw , time .Duration (t .ackLevels .UpdateIfNeededAndGetQueueMaxReadLevel (persistence .HistoryTaskCategoryReplication , pollingCluster ).GetTaskID ()- oldestUnprocessedTaskID ))
171+ maxReadLevel := t .ackLevels .UpdateIfNeededAndGetQueueMaxReadLevel (persistence .HistoryTaskCategoryReplication , pollingCluster ).GetTaskID ()
172+ lagRaw := maxReadLevel - oldestUnprocessedTaskID
173+ t .scope .RecordTimer (metrics .ReplicationTasksLagRaw , time .Duration (lagRaw ))
174+ t .scope .IntExponentialHistogram (metrics .ExponentialReplicationTasksLagRaw , clampInt64ToInt (lagRaw ))
171175 t .scope .RecordHistogramDuration (metrics .ReplicationTasksDelay , time .Duration (oldestUnprocessedTaskTimestamp - t .timeSource .Now ().UnixNano ()))
172176
173177 // hydrate the tasks
@@ -199,9 +203,16 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la
199203 return nil , err
200204 }
201205
202- t .scope .RecordTimer (metrics .ReplicationTasksLag , time .Duration (t .ackLevels .UpdateIfNeededAndGetQueueMaxReadLevel (persistence .HistoryTaskCategoryReplication , pollingCluster ).GetTaskID ()- msgs .LastRetrievedMessageID ))
206+ lag := maxReadLevel - msgs .LastRetrievedMessageID
207+ t .scope .RecordTimer (metrics .ReplicationTasksLag , time .Duration (lag ))
208+ t .scope .IntExponentialHistogram (metrics .ExponentialReplicationTasksLag , clampInt64ToInt (lag ))
209+
203210 t .scope .RecordTimer (metrics .ReplicationTasksReturned , time .Duration (len (msgs .ReplicationTasks )))
204- t .scope .RecordTimer (metrics .ReplicationTasksReturnedDiff , time .Duration (len (taskInfos )- len (msgs .ReplicationTasks )))
211+ t .scope .IntExponentialHistogram (metrics .ExponentialReplicationTasksReturned , len (msgs .ReplicationTasks ))
212+
213+ returnedDiff := len (taskInfos ) - len (msgs .ReplicationTasks )
214+ t .scope .RecordTimer (metrics .ReplicationTasksReturnedDiff , time .Duration (returnedDiff ))
215+ t .scope .IntExponentialHistogram (metrics .ExponentialReplicationTasksReturnedDiff , returnedDiff )
205216
206217 t .ackLevel (pollingCluster , lastReadTaskID )
207218
@@ -221,6 +232,17 @@ func (t *TaskAckManager) getTasks(ctx context.Context, pollingCluster string, la
221232 }, nil
222233}
223234
235+ func clampInt64ToInt (v int64 ) int {
236+ if v <= 0 {
237+ return 0
238+ }
239+ maxInt := int64 (int (^ uint (0 ) >> 1 ))
240+ if v > maxInt {
241+ return int (maxInt )
242+ }
243+ return int (v )
244+ }
245+
224246// ackLevel updates the ack level for the given cluster
225247func (t * TaskAckManager ) ackLevel (pollingCluster string , lastReadTaskID int64 ) {
226248 if err := t .ackLevels .UpdateQueueClusterAckLevel (persistence .HistoryTaskCategoryReplication , pollingCluster , persistence .NewImmediateTaskKey (lastReadTaskID )); err != nil {
0 commit comments