Skip to content

Commit 88b0396

Browse files
committed
Implement metrics functions
added maps for start times indexed by address of notification objects. some refactroing
1 parent 061e8c0 commit 88b0396

File tree

5 files changed

+42
-24
lines changed

5 files changed

+42
-24
lines changed

driver/sql/projection_notification_processor.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,12 @@ func (b *projectionNotificationProcessor) startProcessor(ctx context.Context, ha
130130
e.Error(err)
131131
e.Any("notification", notification)
132132
})
133+
134+
b.metrics.FinishNotificationProcessing(notification, false, false)
135+
136+
} else {
137+
b.metrics.FinishNotificationProcessing(notification, true, false)
133138
}
134-
b.metrics.FinishNotificationProcessing(notification)
135139
}
136140
}
137141
}

extension/pq/listener.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,9 @@ func (s *Listener) Listen(ctx context.Context, exec sql.ProjectionTrigger) error
9898
for {
9999
select {
100100
case n := <-listener.Notify:
101+
s.metrics.ReceivedNotification(n != nil)
101102
// Unmarshal the notification
102103
notification := s.unmarshalNotification(n)
103-
s.metrics.ReceivedNotification(notification != nil)
104104

105105
// Execute the notification to be projected
106106
if err := exec(ctx, notification); err != nil {

extension/prometheus/prometheus.go

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
11
package prometheus
22

33
import (
4+
"fmt"
45
"github.com/hellofresh/goengine/driver/sql"
56
"github.com/prometheus/client_golang/prometheus"
7+
"strconv"
8+
"time"
69
)
710

811
const namespace = "goengine"
912

1013
type Metrics struct {
11-
notificationCounter *prometheus.CounterVec
12-
queueDuration *prometheus.HistogramVec
13-
notificationHandleDuration *prometheus.HistogramVec
14+
notificationCounter *prometheus.CounterVec
15+
notificationQueueDuration *prometheus.HistogramVec
16+
notificationProcessingDuration *prometheus.HistogramVec
17+
notificationStartTimes map[string]time.Time
1418
}
1519

1620
func NewMetrics() *Metrics {
@@ -22,10 +26,10 @@ func NewMetrics() *Metrics {
2226
Name: "notification_count",
2327
Help: "counter for number of notifications received",
2428
},
25-
[]string{"is_event"},
29+
[]string{"is_notification"},
2630
),
2731
// queueDuration is used to expose 'queue_duration_seconds' metrics
28-
queueDuration: prometheus.NewHistogramVec(
32+
notificationQueueDuration: prometheus.NewHistogramVec(
2933
prometheus.HistogramOpts{
3034
Namespace: namespace,
3135
Name: "queue_duration_seconds",
@@ -35,15 +39,15 @@ func NewMetrics() *Metrics {
3539
[]string{"retry", "success"},
3640
),
3741

38-
// notificationHandleDuration is used to expose 'notification_handle_duration_seconds' metrics
39-
notificationHandleDuration: prometheus.NewHistogramVec(
42+
// notificationProcessingDuration is used to expose 'notification_handle_duration_seconds' metrics
43+
notificationProcessingDuration: prometheus.NewHistogramVec(
4044
prometheus.HistogramOpts{
4145
Namespace: namespace,
42-
Name: "notification_handle_duration_seconds",
43-
Help: "histogram of event handled latencies",
46+
Name: "notification_processing_duration_seconds",
47+
Help: "histogram of notifications handled latencies",
4448
Buckets: []float64{0.1, 0.5, 0.9, 0.99}, //buckets for histogram
4549
},
46-
[]string{"retry", "success"},
50+
[]string{"success", "retry"},
4751
),
4852
}
4953
}
@@ -55,30 +59,39 @@ func (m *Metrics) RegisterMetrics(registry *prometheus.Registry) error {
5559
return err
5660
}
5761

58-
err = registry.Register(m.queueDuration)
62+
err = registry.Register(m.notificationQueueDuration)
5963
if err != nil {
6064
return err
6165
}
6266

63-
return registry.Register(m.notificationHandleDuration)
67+
return registry.Register(m.notificationProcessingDuration)
6468
}
6569

6670
// ReceivedNotification
6771
func (m *Metrics) ReceivedNotification(isNotification bool) {
68-
72+
labels := prometheus.Labels{"is_notification": strconv.FormatBool(isNotification)}
73+
m.notificationCounter.With(labels).Inc()
6974
}
7075

7176
// QueueNotification returns http handler for prometheus
7277
func (m *Metrics) QueueNotification(notification *sql.ProjectionNotification) {
73-
78+
key := "q" + fmt.Sprintf("%p", notification)
79+
m.notificationStartTimes[key] = time.Now()
7480
}
7581

7682
// StartNotificationProcessing is used to record start time of notification processing
7783
func (m *Metrics) StartNotificationProcessing(notification *sql.ProjectionNotification) {
78-
84+
key := "p" + fmt.Sprintf("%p", notification)
85+
m.notificationStartTimes[key] = time.Now()
7986
}
8087

8188
// FinishNotificationProcessing is used to observe end time of notification queue and processing time
82-
func (m *Metrics) FinishNotificationProcessing(notification *sql.ProjectionNotification) {
83-
89+
func (m *Metrics) FinishNotificationProcessing(notification *sql.ProjectionNotification, success bool, retry bool) {
90+
memAddress := fmt.Sprintf("%p", notification)
91+
queueStartTime := m.notificationStartTimes["q"+memAddress]
92+
processingStartTime := m.notificationStartTimes["p"+memAddress]
93+
labels := prometheus.Labels{"success": strconv.FormatBool(success), "retry": strconv.FormatBool(retry)}
94+
95+
m.notificationQueueDuration.With(labels).Observe(time.Since(queueStartTime).Seconds())
96+
m.notificationProcessingDuration.With(labels).Observe(time.Since(processingStartTime).Seconds())
8497
}

metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,6 @@ type (
1010
ReceivedNotification(isNotification bool)
1111
QueueNotification(notification *sql.ProjectionNotification)
1212
StartNotificationProcessing(notification *sql.ProjectionNotification)
13-
FinishNotificationProcessing(notification *sql.ProjectionNotification)
13+
FinishNotificationProcessing(notification *sql.ProjectionNotification, success bool, retry bool)
1414
}
1515
)

metrics_nop.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ var NopMetrics Metrics = &nopMetrics{}
66

77
type nopMetrics struct{}
88

9-
func (nm *nopMetrics) ReceivedNotification(isNotification bool) {}
10-
func (nm *nopMetrics) QueueNotification(notification *sql.ProjectionNotification) {}
11-
func (nm *nopMetrics) StartNotificationProcessing(notification *sql.ProjectionNotification) {}
12-
func (nm *nopMetrics) FinishNotificationProcessing(notification *sql.ProjectionNotification) {}
9+
func (nm *nopMetrics) ReceivedNotification(isNotification bool) {}
10+
func (nm *nopMetrics) QueueNotification(notification *sql.ProjectionNotification) {}
11+
func (nm *nopMetrics) StartNotificationProcessing(notification *sql.ProjectionNotification) {}
12+
func (nm *nopMetrics) FinishNotificationProcessing(notification *sql.ProjectionNotification, success bool, retry bool) {
13+
}

0 commit comments

Comments
 (0)