@@ -10,7 +10,11 @@ import (
1010 "github.com/prometheus/client_golang/prometheus"
1111)
1212
13- const namespace = "goengine"
13+ const (
14+ namespace = "goengine"
15+ notificationQueueKeyPrefix = "q"
16+ notificationProcessingKeyPrefix = "p"
17+ )
1418
1519// Metrics is an object for exposing prometheus metrics
1620type Metrics struct {
@@ -78,24 +82,38 @@ func (m *Metrics) ReceivedNotification(isNotification bool) {
7882}
7983
8084// QueueNotification returns http handler for prometheus
81- func (m * Metrics ) QueueNotification (notification * sql.ProjectionNotification ) {
82- key := "q" + fmt .Sprintf ("%p" , notification )
83- m .notificationStartTimes .Store (key , time .Now ())
85+ func (m * Metrics ) QueueNotification (notification * sql.ProjectionNotification ) bool {
86+ return m .storeStartTime (notificationQueueKeyPrefix , notification )
8487}
8588
8689// StartNotificationProcessing is used to record start time of notification processing
87- func (m * Metrics ) StartNotificationProcessing (notification * sql.ProjectionNotification ) {
88- key := "p" + fmt .Sprintf ("%p" , notification )
89- m .notificationStartTimes .Store (key , time .Now ())
90+ func (m * Metrics ) StartNotificationProcessing (notification * sql.ProjectionNotification ) bool {
91+ return m .storeStartTime (notificationProcessingKeyPrefix , notification )
9092}
9193
9294// FinishNotificationProcessing is used to observe end time of notification queue and processing time
93- func (m * Metrics ) FinishNotificationProcessing (notification * sql.ProjectionNotification , success bool ) {
95+ func (m * Metrics ) FinishNotificationProcessing (notification * sql.ProjectionNotification , success bool ) bool {
9496 memAddress := fmt .Sprintf ("%p" , notification )
95- queueStartTime , _ := m .notificationStartTimes .Load ("q" + memAddress )
96- processingStartTime , _ := m .notificationStartTimes .Load ("p" + memAddress )
9797 labels := prometheus.Labels {"success" : strconv .FormatBool (success )}
9898
99- m .notificationQueueDuration .With (labels ).Observe (time .Since (queueStartTime .(time.Time )).Seconds ())
100- m .notificationProcessingDuration .With (labels ).Observe (time .Since (processingStartTime .(time.Time )).Seconds ())
99+ queueStartTime , queueOk := m .notificationStartTimes .Load (notificationQueueKeyPrefix + memAddress )
100+
101+ processingStartTime , processingOk := m .notificationStartTimes .Load (notificationProcessingKeyPrefix + memAddress )
102+
103+ if processingOk && queueOk {
104+ m .notificationProcessingDuration .With (labels ).Observe (time .Since (processingStartTime .(time.Time )).Seconds ())
105+ m .notificationQueueDuration .With (labels ).Observe (time .Since (queueStartTime .(time.Time )).Seconds ())
106+ return true
107+ }
108+
109+ return false
110+ }
111+
112+ // storeStartTime stores the start time against each notification only if it's not already existent
113+ func (m * Metrics ) storeStartTime (prefix string , notification * sql.ProjectionNotification ) bool {
114+ key := prefix + fmt .Sprintf ("%p" , notification )
115+
116+ _ , alreadyExists := m .notificationStartTimes .LoadOrStore (key , time .Now ())
117+
118+ return ! alreadyExists
101119}
0 commit comments