Skip to content

Commit 7c77ce0

Browse files
authored
Merge pull request #37 from hellofresh/patch/projector-metrics
OWMS-2318 projector metrics
2 parents 90fe9e4 + 1a242c8 commit 7c77ce0

File tree

13 files changed

+445
-8
lines changed

13 files changed

+445
-8
lines changed

Gopkg.lock

Lines changed: 68 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

driver/sql/metrics.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package sql
2+
3+
type (
4+
// Metrics a structured metrics interface
5+
Metrics interface {
6+
// ReceivedNotification sends the metric to keep count of notifications received by goengine
7+
ReceivedNotification(isNotification bool)
8+
// QueueNotification is called when a notification is queued.
9+
// It saves start time for an event on aggregate when it's queued
10+
QueueNotification(notification *ProjectionNotification)
11+
// StartNotificationProcessing is called when a notification processing is started
12+
// It saves start time for an event on aggregate when it's picked to be processed by background processor
13+
StartNotificationProcessing(notification *ProjectionNotification)
14+
// FinishNotificationProcessing is called when a notification processing is finished
15+
// It actually sends metrics calculating duration for which a notification spends in queue and then processed by background processor
16+
FinishNotificationProcessing(notification *ProjectionNotification, success bool)
17+
}
18+
)

driver/sql/metrics_nop.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package sql
2+
3+
// NopMetrics is default Metrics handler in case nil is passed
4+
var NopMetrics Metrics = &nopMetrics{}
5+
6+
type nopMetrics struct{}
7+
8+
func (nm *nopMetrics) ReceivedNotification(isNotification bool) {}
9+
func (nm *nopMetrics) QueueNotification(notification *ProjectionNotification) {}
10+
func (nm *nopMetrics) StartNotificationProcessing(notification *ProjectionNotification) {}
11+
func (nm *nopMetrics) FinishNotificationProcessing(notification *ProjectionNotification, success bool) {
12+
}

driver/sql/postgres/projector_aggregate_bench_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ func setup(
181181
return driverSQL.ProjectionFail
182182
},
183183
goengine.NopLogger,
184+
driverSQL.NopMetrics,
184185
)
185186
require.NoError(b, err, "failed to create aggregate projector")
186187

driver/sql/projection_notification_processor.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ type (
2020
queueProcessors int
2121
queueBuffer int
2222

23-
logger goengine.Logger
23+
logger goengine.Logger
24+
metrics Metrics
2425
}
2526

2627
// ProcessHandler is a func used to trigger a notification but with the addition of providing a Trigger func so
@@ -29,7 +30,7 @@ type (
2930
)
3031

3132
// newBackgroundProcessor create a new projectionNotificationProcessor
32-
func newBackgroundProcessor(queueProcessors, queueBuffer int, logger goengine.Logger) (*projectionNotificationProcessor, error) {
33+
func newBackgroundProcessor(queueProcessors, queueBuffer int, logger goengine.Logger, metrics Metrics) (*projectionNotificationProcessor, error) {
3334
if queueProcessors <= 0 {
3435
return nil, errors.New("queueProcessors must be greater then zero")
3536
}
@@ -39,11 +40,15 @@ func newBackgroundProcessor(queueProcessors, queueBuffer int, logger goengine.Lo
3940
if logger == nil {
4041
logger = goengine.NopLogger
4142
}
43+
if metrics == nil {
44+
metrics = NopMetrics
45+
}
4246

4347
return &projectionNotificationProcessor{
4448
queueProcessors: queueProcessors,
4549
queueBuffer: queueBuffer,
4650
logger: logger,
51+
metrics: metrics,
4752
}, nil
4853
}
4954

@@ -104,6 +109,8 @@ func (b *projectionNotificationProcessor) Queue(ctx context.Context, notificatio
104109
return errors.New("goengine: unable to queue notification because the processor was stopped")
105110
}
106111

112+
b.metrics.QueueNotification(notification)
113+
107114
b.queue <- notification
108115
return nil
109116
}
@@ -117,11 +124,17 @@ func (b *projectionNotificationProcessor) startProcessor(ctx context.Context, ha
117124
return
118125
case notification := <-b.queue:
119126
// Execute the notification
127+
b.metrics.StartNotificationProcessing(notification)
120128
if err := handler(ctx, notification, b.Queue); err != nil {
121129
b.logger.Error("the ProcessHandler produced an error", func(e goengine.LoggerEntry) {
122130
e.Error(err)
123131
e.Any("notification", notification)
124132
})
133+
134+
b.metrics.FinishNotificationProcessing(notification, false)
135+
136+
} else {
137+
b.metrics.FinishNotificationProcessing(notification, true)
125138
}
126139
}
127140
}

driver/sql/projector_aggregate.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ func NewAggregateProjector(
3535
projectorStorage AggregateProjectorStorage,
3636
projectionErrorHandler ProjectionErrorCallback,
3737
logger goengine.Logger,
38+
metrics Metrics,
3839
) (*AggregateProjector, error) {
3940
switch {
4041
case db == nil:
@@ -58,7 +59,7 @@ func NewAggregateProjector(
5859
e.String("projection", projection.Name())
5960
})
6061

61-
processor, err := newBackgroundProcessor(10, 32, logger)
62+
processor, err := newBackgroundProcessor(10, 32, logger, metrics)
6263
if err != nil {
6364
return nil, err
6465
}

extension/pq/listener.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ type Listener struct {
2222
minReconnectInterval time.Duration
2323
maxReconnectInterval time.Duration
2424

25-
logger goengine.Logger
25+
logger goengine.Logger
26+
metrics sql.Metrics
2627
}
2728

2829
// NewListener returns a new notification listener
@@ -32,6 +33,7 @@ func NewListener(
3233
minReconnectInterval time.Duration,
3334
maxReconnectInterval time.Duration,
3435
logger goengine.Logger,
36+
metrics sql.Metrics,
3537
) (*Listener, error) {
3638
switch {
3739
case strings.TrimSpace(dbDSN) == "":
@@ -48,12 +50,17 @@ func NewListener(
4850
logger = goengine.NopLogger
4951
}
5052

53+
if metrics == nil {
54+
metrics = sql.NopMetrics
55+
}
56+
5157
return &Listener{
5258
dbDSN: dbDSN,
5359
dbChannel: dbChannel,
5460
minReconnectInterval: minReconnectInterval,
5561
maxReconnectInterval: maxReconnectInterval,
5662
logger: logger,
63+
metrics: metrics,
5764
}, nil
5865
}
5966

@@ -83,13 +90,15 @@ func (s *Listener) Listen(ctx context.Context, exec sql.ProjectionTrigger) error
8390

8491
// Execute an initial run of the projection.
8592
// This is done after db listen is started to avoid losing a set of messages while the Listener creates a db connection.
93+
s.metrics.ReceivedNotification(false)
8694
if err := exec(ctx, nil); err != nil {
8795
return err
8896
}
8997

9098
for {
9199
select {
92100
case n := <-listener.Notify:
101+
s.metrics.ReceivedNotification(n != nil)
93102
// Unmarshal the notification
94103
notification := s.unmarshalNotification(n)
95104

0 commit comments

Comments
 (0)