Skip to content

Commit 11036d5

Browse files
committed
Initial interface and implementation of prometheus metrics handler
1 parent 90fe9e4 commit 11036d5

File tree

7 files changed

+200
-5
lines changed

7 files changed

+200
-5
lines changed

Gopkg.lock

Lines changed: 68 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

driver/sql/projection_notification_processor.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ type (
2020
queueProcessors int
2121
queueBuffer int
2222

23-
logger goengine.Logger
23+
logger goengine.Logger
24+
metrics goengine.Metrics
2425
}
2526

2627
// ProcessHandler is a func used to trigger a notification but with the addition of providing a Trigger func so
@@ -29,7 +30,7 @@ type (
2930
)
3031

3132
// newBackgroundProcessor create a new projectionNotificationProcessor
32-
func newBackgroundProcessor(queueProcessors, queueBuffer int, logger goengine.Logger) (*projectionNotificationProcessor, error) {
33+
func newBackgroundProcessor(queueProcessors, queueBuffer int, logger goengine.Logger, metrics goengine.Metrics) (*projectionNotificationProcessor, error) {
3334
if queueProcessors <= 0 {
3435
return nil, errors.New("queueProcessors must be greater then zero")
3536
}
@@ -39,11 +40,15 @@ func newBackgroundProcessor(queueProcessors, queueBuffer int, logger goengine.Lo
3940
if logger == nil {
4041
logger = goengine.NopLogger
4142
}
43+
if metrics == nil {
44+
metrics = goengine.NopMetrics
45+
}
4246

4347
return &projectionNotificationProcessor{
4448
queueProcessors: queueProcessors,
4549
queueBuffer: queueBuffer,
4650
logger: logger,
51+
metrics: metrics,
4752
}, nil
4853
}
4954

@@ -104,6 +109,8 @@ func (b *projectionNotificationProcessor) Queue(ctx context.Context, notificatio
104109
return errors.New("goengine: unable to queue notification because the processor was stopped")
105110
}
106111

112+
b.metrics.QueueNotification(notification)
113+
107114
b.queue <- notification
108115
return nil
109116
}
@@ -117,12 +124,14 @@ func (b *projectionNotificationProcessor) startProcessor(ctx context.Context, ha
117124
return
118125
case notification := <-b.queue:
119126
// Execute the notification
127+
b.metrics.StartNotificationProcessing(notification)
120128
if err := handler(ctx, notification, b.Queue); err != nil {
121129
b.logger.Error("the ProcessHandler produced an error", func(e goengine.LoggerEntry) {
122130
e.Error(err)
123131
e.Any("notification", notification)
124132
})
125133
}
134+
b.metrics.FinishNotificationProcessing(notification)
126135
}
127136
}
128137
}

driver/sql/projector_aggregate.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ func NewAggregateProjector(
3535
projectorStorage AggregateProjectorStorage,
3636
projectionErrorHandler ProjectionErrorCallback,
3737
logger goengine.Logger,
38+
metrics goengine.Metrics,
3839
) (*AggregateProjector, error) {
3940
switch {
4041
case db == nil:
@@ -58,7 +59,7 @@ func NewAggregateProjector(
5859
e.String("projection", projection.Name())
5960
})
6061

61-
processor, err := newBackgroundProcessor(10, 32, logger)
62+
processor, err := newBackgroundProcessor(10, 32, logger, metrics)
6263
if err != nil {
6364
return nil, err
6465
}

extension/prometheus/prometheus.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package prometheus
2+
3+
import (
4+
"github.com/hellofresh/goengine/driver/sql"
5+
"github.com/prometheus/client_golang/prometheus"
6+
)
7+
8+
const namespace = "goengine"
9+
10+
type Metrics struct {
11+
notificationCounter *prometheus.CounterVec
12+
queueDuration *prometheus.HistogramVec
13+
notificationHandleDuration *prometheus.HistogramVec
14+
}
15+
16+
func NewMetrics() *Metrics {
17+
return &Metrics{
18+
// notificationCounter is used to expose 'notification_count' metric
19+
notificationCounter: prometheus.NewCounterVec(
20+
prometheus.CounterOpts{
21+
Namespace: namespace,
22+
Name: "notification_count",
23+
Help: "counter for number of notifications received",
24+
},
25+
[]string{"is_event"},
26+
),
27+
// queueDuration is used to expose 'queue_duration_seconds' metrics
28+
queueDuration: prometheus.NewHistogramVec(
29+
prometheus.HistogramOpts{
30+
Namespace: namespace,
31+
Name: "queue_duration_seconds",
32+
Help: "histogram of queue latencies",
33+
Buckets: []float64{0.1, 0.5, 0.9, 0.99}, //buckets for histogram
34+
},
35+
[]string{"retry", "success"},
36+
),
37+
38+
// notificationHandleDuration is used to expose 'notification_handle_duration_seconds' metrics
39+
notificationHandleDuration: prometheus.NewHistogramVec(
40+
prometheus.HistogramOpts{
41+
Namespace: namespace,
42+
Name: "notification_handle_duration_seconds",
43+
Help: "histogram of event handled latencies",
44+
Buckets: []float64{0.1, 0.5, 0.9, 0.99}, //buckets for histogram
45+
},
46+
[]string{"retry", "success"},
47+
),
48+
}
49+
}
50+
51+
// RegisterMetrics returns http handler for prometheus
52+
func (m *Metrics) RegisterMetrics(registry *prometheus.Registry) error {
53+
err := registry.Register(m.notificationCounter)
54+
if err != nil {
55+
return err
56+
}
57+
58+
err = registry.Register(m.queueDuration)
59+
if err != nil {
60+
return err
61+
}
62+
63+
return registry.Register(m.notificationHandleDuration)
64+
}
65+
66+
// ReceivedNotification
67+
func (m *Metrics) ReceivedNotification(isNotification bool) {
68+
69+
}
70+
71+
// QueueNotification returns http handler for prometheus
72+
func (m *Metrics) QueueNotification(notification *sql.ProjectionNotification) {
73+
74+
}
75+
76+
// StartNotificationProcessing is used to record start time of notification processing
77+
func (m *Metrics) StartNotificationProcessing(notification *sql.ProjectionNotification) {
78+
79+
}
80+
81+
// FinishNotificationProcessing is used to observe end time of notification queue and processing time
82+
func (m *Metrics) FinishNotificationProcessing(notification *sql.ProjectionNotification) {
83+
84+
}

metrics.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package goengine
2+
3+
import (
4+
"github.com/hellofresh/goengine/driver/sql"
5+
)
6+
7+
type (
8+
// Metrics a structured metrics interface
9+
Metrics interface {
10+
ReceivedNotification(isNotification bool)
11+
QueueNotification(notification *sql.ProjectionNotification)
12+
StartNotificationProcessing(notification *sql.ProjectionNotification)
13+
FinishNotificationProcessing(notification *sql.ProjectionNotification)
14+
}
15+
)

metrics_nop.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package goengine
2+
3+
import "github.com/hellofresh/goengine/driver/sql"
4+
5+
var NopMetrics Metrics = &nopMetrics{}
6+
7+
type nopMetrics struct{}
8+
9+
func (nm *nopMetrics) ReceivedNotification(isNotification bool) {}
10+
func (nm *nopMetrics) QueueNotification(notification *sql.ProjectionNotification) {}
11+
func (nm *nopMetrics) StartNotificationProcessing(notification *sql.ProjectionNotification) {}
12+
func (nm *nopMetrics) FinishNotificationProcessing(notification *sql.ProjectionNotification) {}

strategy/json/sql/postgres/manager.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,21 @@ type SingleStreamManager struct {
1717
persistenceStrategy driverSQL.PersistenceStrategy
1818
messageFactory driverSQL.MessageFactory
1919

20-
logger goengine.Logger
20+
logger goengine.Logger
21+
metrics goengine.Metrics
2122
}
2223

2324
// NewSingleStreamManager return a new instance of the SingleStreamManager
24-
func NewSingleStreamManager(db *sql.DB, logger goengine.Logger) (*SingleStreamManager, error) {
25+
func NewSingleStreamManager(db *sql.DB, logger goengine.Logger, metrics goengine.Metrics) (*SingleStreamManager, error) {
2526
if db == nil {
2627
return nil, goengine.InvalidArgumentError("db")
2728
}
2829
if logger == nil {
2930
logger = goengine.NopLogger
3031
}
32+
if metrics == nil {
33+
metrics = goengine.NopMetrics
34+
}
3135

3236
payloadTransformer := json.NewPayloadTransformer()
3337

@@ -49,6 +53,7 @@ func NewSingleStreamManager(db *sql.DB, logger goengine.Logger) (*SingleStreamMa
4953
persistenceStrategy: persistenceStrategy,
5054
messageFactory: messageFactory,
5155
logger: logger,
56+
metrics: metrics,
5257
}, nil
5358
}
5459

@@ -145,5 +150,6 @@ func (m *SingleStreamManager) NewAggregateProjector(
145150
projectorStorage,
146151
projectionErrorHandler,
147152
m.logger,
153+
m.metrics,
148154
)
149155
}

0 commit comments

Comments
 (0)