Skip to content

Commit 5135034

Browse files
committed
Add unit tests and make sure metrics are only sent once for one notification
1 parent f914ebc commit 5135034

File tree

3 files changed

+97
-194
lines changed

3 files changed

+97
-194
lines changed

extension/prometheus/prometheus.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ type Metrics struct {
2828
}
2929

3030
// NewMetrics instantiate and return an object of Metrics
31-
func NewMetrics() *Metrics {
31+
func NewMetrics(logger goengine.Logger) *Metrics {
32+
if logger == nil {
33+
logger = goengine.NopLogger
34+
}
3235
return &Metrics{
3336
// notificationCounter is used to expose 'notification_count' metric
3437
notificationCounter: prometheus.NewCounterVec(
@@ -60,15 +63,10 @@ func NewMetrics() *Metrics {
6063
},
6164
[]string{"success"},
6265
),
63-
logger: goengine.NopLogger,
66+
logger: logger,
6467
}
6568
}
6669

67-
// SetLogger sets the logger for metrics
68-
func (m *Metrics) SetLogger(logger goengine.Logger) {
69-
m.logger = logger
70-
}
71-
7270
// RegisterMetrics returns http handler for prometheus
7371
func (m *Metrics) RegisterMetrics(registry *prometheus.Registry) error {
7472
err := registry.Register(m.notificationCounter)
@@ -116,6 +114,7 @@ func (m *Metrics) FinishNotificationProcessing(notification *sql.ProjectionNotif
116114

117115
if queueStartTime, ok := m.notificationStartTimes.Load(notificationQueueKeyPrefix + memAddress); ok {
118116
m.notificationQueueDuration.With(labels).Observe(time.Since(queueStartTime.(time.Time)).Seconds())
117+
m.notificationStartTimes.Delete(notificationQueueKeyPrefix + memAddress)
119118

120119
} else {
121120

@@ -126,6 +125,7 @@ func (m *Metrics) FinishNotificationProcessing(notification *sql.ProjectionNotif
126125

127126
if processingStartTime, ok := m.notificationStartTimes.Load(notificationProcessingKeyPrefix + memAddress); ok {
128127
m.notificationProcessingDuration.With(labels).Observe(time.Since(processingStartTime.(time.Time)).Seconds())
128+
m.notificationStartTimes.Delete(notificationProcessingKeyPrefix + memAddress)
129129
} else {
130130
m.logger.Warn("notification processing start time not found", func(e goengine.LoggerEntry) {
131131
e.Any("notification", notification)

extension/prometheus/prometheus_test.go

Lines changed: 89 additions & 186 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ package prometheus_test
55
import (
66
"testing"
77

8-
"github.com/hellofresh/goengine/.vendor-new/github.com/stretchr/testify/assert"
8+
"github.com/stretchr/testify/assert"
99

10-
"github.com/hellofresh/goengine/.vendor-new/github.com/stretchr/testify/require"
10+
"github.com/stretchr/testify/require"
1111

1212
"github.com/hellofresh/goengine/driver/sql"
1313
goenginePrometheus "github.com/hellofresh/goengine/extension/prometheus"
@@ -22,21 +22,105 @@ func TestMetrics_QueueAndFinishNotification(t *testing.T) {
2222

2323
registry := prometheus.NewPedanticRegistry()
2424

25-
metrics := goenginePrometheus.NewMetrics()
25+
metrics := goenginePrometheus.NewMetrics(nil)
2626
require.NoError(t, metrics.RegisterMetrics(registry))
2727

2828
metrics.QueueNotification(notification)
2929
metrics.FinishNotificationProcessing(notification, true)
3030

3131
assertMetricsWhereCalled(t, registry, map[string]uint64{
32-
"goengine_queue_duration_seconds": 1,
32+
"goengine_queue_duration_seconds": 1,
33+
"goengine_notification_processing_duration_seconds": 0,
34+
})
35+
36+
}
37+
38+
func TestMetrics_ProcessAndFinishNotification(t *testing.T) {
39+
notification := &sql.ProjectionNotification{
40+
No: 1,
41+
AggregateID: "C56A4180-65AA-42EC-A945-5FD21DEC0538",
42+
}
43+
44+
registry := prometheus.NewPedanticRegistry()
45+
46+
metrics := goenginePrometheus.NewMetrics(nil)
47+
require.NoError(t, metrics.RegisterMetrics(registry))
48+
49+
metrics.StartNotificationProcessing(notification)
50+
metrics.FinishNotificationProcessing(notification, true)
51+
52+
assertMetricsWhereCalled(t, registry, map[string]uint64{
53+
"goengine_notification_processing_duration_seconds": 1,
54+
"goengine_queue_duration_seconds": 0,
55+
})
56+
}
57+
58+
func TestMetrics_QueueProcessAndFinishNotification(t *testing.T) {
59+
notification := &sql.ProjectionNotification{
60+
No: 1,
61+
AggregateID: "C56A4180-65AA-42EC-A945-5FD21DEC0538",
62+
}
63+
64+
registry := prometheus.NewPedanticRegistry()
65+
66+
metrics := goenginePrometheus.NewMetrics(nil)
67+
require.NoError(t, metrics.RegisterMetrics(registry))
68+
69+
metrics.QueueNotification(notification)
70+
metrics.StartNotificationProcessing(notification)
71+
metrics.FinishNotificationProcessing(notification, true)
72+
73+
assertMetricsWhereCalled(t, registry, map[string]uint64{
74+
"goengine_notification_processing_duration_seconds": 1,
75+
"goengine_queue_duration_seconds": 1,
76+
})
77+
}
78+
79+
func TestMetrics_FinishNotificationWithoutStart(t *testing.T) {
80+
notification := &sql.ProjectionNotification{
81+
No: 1,
82+
AggregateID: "C56A4180-65AA-42EC-A945-5FD21DEC0538",
83+
}
84+
85+
registry := prometheus.NewPedanticRegistry()
86+
87+
metrics := goenginePrometheus.NewMetrics(nil)
88+
require.NoError(t, metrics.RegisterMetrics(registry))
89+
90+
metrics.FinishNotificationProcessing(notification, true)
91+
92+
assertMetricsWhereCalled(t, registry, map[string]uint64{
93+
"goengine_notification_processing_duration_seconds": 0,
94+
"goengine_queue_duration_seconds": 0,
95+
})
96+
}
97+
98+
func TestMetrics_QueueProcessAndFinishNotificationTwice(t *testing.T) {
99+
notification := &sql.ProjectionNotification{
100+
No: 1,
101+
AggregateID: "C56A4180-65AA-42EC-A945-5FD21DEC0538",
102+
}
103+
104+
registry := prometheus.NewPedanticRegistry()
105+
106+
metrics := goenginePrometheus.NewMetrics(nil)
107+
require.NoError(t, metrics.RegisterMetrics(registry))
108+
109+
metrics.QueueNotification(notification)
110+
metrics.StartNotificationProcessing(notification)
111+
metrics.FinishNotificationProcessing(notification, true)
112+
metrics.FinishNotificationProcessing(notification, true)
113+
114+
assertMetricsWhereCalled(t, registry, map[string]uint64{
115+
"goengine_notification_processing_duration_seconds": 1,
116+
"goengine_queue_duration_seconds": 1,
33117
})
34118
}
35119

36120
func TestMetrics_ReceivedNotification(t *testing.T) {
37121
registry := prometheus.NewPedanticRegistry()
38122

39-
metrics := goenginePrometheus.NewMetrics()
123+
metrics := goenginePrometheus.NewMetrics(nil)
40124
require.NoError(t, metrics.RegisterMetrics(registry))
41125

42126
metrics.ReceivedNotification(true)
@@ -51,7 +135,6 @@ func assertMetricsWhereCalled(t *testing.T, g prometheus.Gatherer, metricsCounts
51135
got, err := g.Gather()
52136
require.NoError(t, err)
53137

54-
require.Len(t, got, len(metricsCounts))
55138
for _, m := range got {
56139
expectedCount, ok := metricsCounts[m.GetName()]
57140
if !assert.Truef(t, ok, "Unknown metric %s", m.GetName()) {
@@ -71,183 +154,3 @@ func assertMetricsWhereCalled(t *testing.T, g prometheus.Gatherer, metricsCounts
71154
assert.Equal(t, expectedCount, calls)
72155
}
73156
}
74-
75-
//// MockObserver is the mock object for Observer
76-
//type MockObserver struct {
77-
// observation float64
78-
//}
79-
//
80-
//func (o *MockObserver) Observe(value float64) {
81-
// o.observation = value
82-
//}
83-
//
84-
//// MockedMetricObject is the mock object for ObserverVec
85-
//type MockedMetricObject struct {
86-
// observer *MockObserver
87-
//}
88-
//
89-
//func (m *MockedMetricObject) GetMetricWith(labels prometheus.Labels) (prometheus.Observer, error) {
90-
// return &MockObserver{}, nil
91-
//}
92-
//
93-
//func (m *MockedMetricObject) GetMetricWithLabelValues(lvs ...string) (prometheus.Observer, error) {
94-
// return &MockObserver{}, nil
95-
//}
96-
//
97-
//func (m *MockedMetricObject) With(labels prometheus.Labels) prometheus.Observer {
98-
// m.observer = &MockObserver{
99-
// observation: 0.0,
100-
// }
101-
// return m.observer
102-
//}
103-
//
104-
//func (m *MockedMetricObject) WithLabelValues(lvs ...string) prometheus.Observer {
105-
// return &MockObserver{}
106-
//}
107-
//
108-
//func (m *MockedMetricObject) CurryWith(labels prometheus.Labels) (prometheus.ObserverVec, error) {
109-
// return m, nil
110-
//}
111-
//
112-
//func (m *MockedMetricObject) MustCurryWith(labels prometheus.Labels) prometheus.ObserverVec {
113-
// return m
114-
//}
115-
//
116-
//func (m *MockedMetricObject) Describe(ch chan<- *prometheus.Desc) {}
117-
//
118-
//func (m *MockedMetricObject) Collect(ch chan<- prometheus.Metric) {}
119-
//
120-
//func TestMetrics_NotificationStartTime(t *testing.T) {
121-
// metrics := NewMetrics()
122-
//
123-
// metrics.QueueNotification(&testSQLProjection)
124-
// metrics.StartNotificationProcessing(&testSQLProjection)
125-
//
126-
// memAddress := fmt.Sprintf("%p", &testSQLProjection)
127-
// queueStartTime, ok := metrics.notificationStartTimes.Load("q" + memAddress)
128-
// assert.True(t, ok)
129-
// assert.IsType(t, time.Time{}, queueStartTime)
130-
//
131-
// processingStartTime, _ := metrics.notificationStartTimes.Load("p" + memAddress)
132-
// assert.True(t, ok)
133-
// assert.IsType(t, time.Time{}, processingStartTime)
134-
//
135-
//}
136-
//
137-
//func TestMetrics_FinishNotificationProcessingSuccess(t *testing.T) {
138-
//
139-
// mockQueueMetric := new(MockedMetricObject)
140-
// mockProcessMetric := new(MockedMetricObject)
141-
// testMetrics := newMetricsWith(mockQueueMetric, mockProcessMetric)
142-
//
143-
// testMetrics.QueueNotification(&testSQLProjection)
144-
// testMetrics.StartNotificationProcessing(&testSQLProjection)
145-
// testMetrics.FinishNotificationProcessing(&testSQLProjection, true)
146-
//
147-
// assert.NotEqual(t, mockQueueMetric.observer.observation, 0.0)
148-
//
149-
//}
150-
//
151-
//func TestMetrics_FinishNotificationProcessingFailureForQueue(t *testing.T) {
152-
//
153-
// mockQueueMetric := new(MockedMetricObject)
154-
// mockProcessMetric := new(MockedMetricObject)
155-
// testMetrics := newMetricsWith(mockQueueMetric, mockProcessMetric)
156-
//
157-
// testMetrics.StartNotificationProcessing(&testSQLProjection)
158-
// testMetrics.FinishNotificationProcessing(&testSQLProjection, true)
159-
//
160-
// assert.Nil(t, mockQueueMetric.observer)
161-
//
162-
//}
163-
//
164-
//func TestMetrics_FinishNotificationProcessingFailureForProcessing(t *testing.T) {
165-
//
166-
// mockQueueMetric := new(MockedMetricObject)
167-
// mockProcessMetric := new(MockedMetricObject)
168-
// testMetrics := newMetricsWith(mockQueueMetric, mockProcessMetric)
169-
//
170-
// testMetrics.QueueNotification(&testSQLProjection)
171-
// testMetrics.FinishNotificationProcessing(&testSQLProjection, true)
172-
//
173-
// assert.Nil(t, mockProcessMetric.observer)
174-
//}
175-
//
176-
//func TestMetrics_CollectAndCompareHistogramMetrics(t *testing.T) {
177-
//
178-
// registry := prometheus.NewPedanticRegistry()
179-
// metrics := goenginePrometheus.NewMetrics()
180-
// metrics.RegisterMetrics(registry)
181-
//
182-
// metrics.SetLogger(logrus.StandardLogger())
183-
//
184-
// inputs := []struct {
185-
// name string
186-
// collector prometheus.ObserverVec
187-
// metadata string
188-
// expect string
189-
// observation float64
190-
// }{
191-
// {
192-
// name: "Testing Queue Duration Metric Collector",
193-
// collector: metrics.notificationQueueDuration,
194-
// metadata: `
195-
// # HELP goengine_queue_duration_seconds histogram of queue latencies
196-
// # TYPE goengine_queue_duration_seconds histogram
197-
// `,
198-
// expect: `
199-
// goengine_queue_duration_seconds_bucket{success="true",le="0.1"} 0
200-
// goengine_queue_duration_seconds_bucket{success="true",le="0.5"} 0
201-
// goengine_queue_duration_seconds_bucket{success="true",le="0.9"} 0
202-
// goengine_queue_duration_seconds_bucket{success="true",le="0.99"} 1.0
203-
// goengine_queue_duration_seconds_bucket{success="true",le="+Inf"} 1.0
204-
// goengine_queue_duration_seconds_sum{success="true"} 0.99
205-
// goengine_queue_duration_seconds_count{success="true"} 1.0
206-
//
207-
// `,
208-
// observation: 0.99,
209-
// },
210-
// {
211-
// name: "Testing Notification Processing Duration Metric Collector",
212-
// collector: metrics.notificationProcessingDuration,
213-
// metadata: `
214-
// # HELP goengine_notification_processing_duration_seconds histogram of notifications handled latencies
215-
// # TYPE goengine_notification_processing_duration_seconds histogram
216-
// `,
217-
// expect: `
218-
// goengine_notification_processing_duration_seconds_bucket{success="true",le="0.1"} 0
219-
// goengine_notification_processing_duration_seconds_bucket{success="true",le="0.5"} 0
220-
// goengine_notification_processing_duration_seconds_bucket{success="true",le="0.9"} 1.0
221-
// goengine_notification_processing_duration_seconds_bucket{success="true",le="0.99"} 1.0
222-
// goengine_notification_processing_duration_seconds_bucket{success="true",le="+Inf"} 1.0
223-
// goengine_notification_processing_duration_seconds_sum{success="true"} 0.54
224-
// goengine_notification_processing_duration_seconds_count{success="true"} 1.0
225-
//
226-
// `,
227-
// observation: 0.54,
228-
// },
229-
// }
230-
//
231-
// labels := prometheus.Labels{"success": "true"}
232-
// for _, input := range inputs {
233-
// input.collector.With(labels).Observe(input.observation)
234-
// t.Run(input.name, func(t *testing.T) {
235-
// if err := testutil.CollectAndCompare(input.collector, strings.NewReader(input.metadata+input.expect)); err != nil {
236-
// t.Errorf("unexpected collecting result:\n%s", err)
237-
// }
238-
// })
239-
//
240-
// }
241-
//}
242-
243-
//// NewMetrics instantiate and return an object of Metrics
244-
//func newMetricsWith(queueDuration prometheus.ObserverVec, processDuration prometheus.ObserverVec) *Metrics {
245-
// return &Metrics{
246-
// // queueDuration is used to expose 'queue_duration_seconds' metrics
247-
// notificationQueueDuration: queueDuration,
248-
//
249-
// // notificationProcessingDuration is used to expose 'notification_handle_duration_seconds' metrics
250-
// notificationProcessingDuration: processDuration,
251-
// logger: goengine.NopLogger,
252-
// }
253-
//}

test/internal/suite.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func (s *Suite) SetupTest() {
3030

3131
s.LoggerHook = test.NewLocal(s.Logger)
3232

33-
s.Metrics = prometheus.NewMetrics()
33+
s.Metrics = prometheus.NewMetrics(nil)
3434
}
3535

3636
// TearDownTest cleanup suite variables

0 commit comments

Comments
 (0)