Skip to content

Commit 1586249

Browse files
committed
Logging in case of failures
1 parent bdf51ef commit 1586249

File tree

3 files changed

+40
-21
lines changed

3 files changed

+40
-21
lines changed

driver/sql/metrics.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ type (
44
// Metrics a structured metrics interface
55
Metrics interface {
66
ReceivedNotification(isNotification bool)
7-
QueueNotification(notification *ProjectionNotification) bool
8-
StartNotificationProcessing(notification *ProjectionNotification) bool
9-
FinishNotificationProcessing(notification *ProjectionNotification, success bool) bool
7+
QueueNotification(notification *ProjectionNotification)
8+
StartNotificationProcessing(notification *ProjectionNotification)
9+
FinishNotificationProcessing(notification *ProjectionNotification, success bool)
1010
}
1111
)

driver/sql/metrics_nop.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,10 @@ var NopMetrics Metrics = &nopMetrics{}
66
type nopMetrics struct{}
77

88
func (nm *nopMetrics) ReceivedNotification(isNotification bool) {}
9-
func (nm *nopMetrics) QueueNotification(notification *ProjectionNotification) bool {
10-
return true
9+
func (nm *nopMetrics) QueueNotification(notification *ProjectionNotification) {
1110
}
12-
func (nm *nopMetrics) StartNotificationProcessing(notification *ProjectionNotification) bool {
13-
return true
11+
func (nm *nopMetrics) StartNotificationProcessing(notification *ProjectionNotification) {
12+
1413
}
15-
func (nm *nopMetrics) FinishNotificationProcessing(notification *ProjectionNotification, success bool) bool {
16-
return true
14+
func (nm *nopMetrics) FinishNotificationProcessing(notification *ProjectionNotification, success bool) {
1715
}

extension/prometheus/prometheus.go

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package prometheus
22

33
import (
44
"fmt"
5+
"github.com/hellofresh/goengine"
56
"strconv"
67
"sync"
78
"time"
@@ -22,6 +23,7 @@ type Metrics struct {
2223
notificationQueueDuration *prometheus.HistogramVec
2324
notificationProcessingDuration *prometheus.HistogramVec
2425
notificationStartTimes sync.Map
26+
logger goengine.Logger
2527
}
2628

2729
// NewMetrics instantiate and return an object of Metrics
@@ -60,6 +62,10 @@ func NewMetrics() *Metrics {
6062
}
6163
}
6264

65+
func (m *Metrics) SetLogger(logger goengine.Logger) {
66+
m.logger = logger
67+
}
68+
6369
// RegisterMetrics returns http handler for prometheus
6470
func (m *Metrics) RegisterMetrics(registry *prometheus.Registry) error {
6571
err := registry.Register(m.notificationCounter)
@@ -82,38 +88,53 @@ func (m *Metrics) ReceivedNotification(isNotification bool) {
8288
}
8389

8490
// QueueNotification returns http handler for prometheus
85-
func (m *Metrics) QueueNotification(notification *sql.ProjectionNotification) bool {
86-
return m.storeStartTime(notificationQueueKeyPrefix, notification)
91+
func (m *Metrics) QueueNotification(notification *sql.ProjectionNotification) {
92+
if !m.storeStartTime(notificationQueueKeyPrefix, notification) {
93+
m.logger.Warn("notification already queued", func(e goengine.LoggerEntry) {
94+
e.Any("notification", notification)
95+
})
96+
}
97+
8798
}
8899

89100
// StartNotificationProcessing is used to record start time of notification processing
90-
func (m *Metrics) StartNotificationProcessing(notification *sql.ProjectionNotification) bool {
91-
return m.storeStartTime(notificationProcessingKeyPrefix, notification)
101+
func (m *Metrics) StartNotificationProcessing(notification *sql.ProjectionNotification) {
102+
if !m.storeStartTime(notificationProcessingKeyPrefix, notification) {
103+
m.logger.Warn("notification processing already started", func(e goengine.LoggerEntry) {
104+
e.Any("notification", notification)
105+
})
106+
}
92107
}
93108

94109
// FinishNotificationProcessing is used to observe end time of notification queue and processing time
95-
func (m *Metrics) FinishNotificationProcessing(notification *sql.ProjectionNotification, success bool) bool {
110+
func (m *Metrics) FinishNotificationProcessing(notification *sql.ProjectionNotification, success bool) {
96111
memAddress := fmt.Sprintf("%p", notification)
97112
labels := prometheus.Labels{"success": strconv.FormatBool(success)}
98113

99-
queueStartTime, queueOk := m.notificationStartTimes.Load(notificationQueueKeyPrefix + memAddress)
114+
if queueStartTime, ok := m.notificationStartTimes.Load(notificationQueueKeyPrefix + memAddress); ok {
115+
m.notificationQueueDuration.With(labels).Observe(time.Since(queueStartTime.(time.Time)).Seconds())
116+
117+
} else {
100118

101-
processingStartTime, processingOk := m.notificationStartTimes.Load(notificationProcessingKeyPrefix + memAddress)
119+
m.logger.Warn("notification queue start time not found", func(e goengine.LoggerEntry) {
120+
e.Any("notification", notification)
121+
})
122+
}
102123

103-
if processingOk && queueOk {
124+
if processingStartTime, ok := m.notificationStartTimes.Load(notificationProcessingKeyPrefix + memAddress); ok {
104125
m.notificationProcessingDuration.With(labels).Observe(time.Since(processingStartTime.(time.Time)).Seconds())
105-
m.notificationQueueDuration.With(labels).Observe(time.Since(queueStartTime.(time.Time)).Seconds())
106-
return true
126+
} else {
127+
m.logger.Warn("notification processing start time not found", func(e goengine.LoggerEntry) {
128+
e.Any("notification", notification)
129+
})
107130
}
108131

109-
return false
110132
}
111133

112134
// storeStartTime stores the start time against each notification only if it's not already existent
113135
func (m *Metrics) storeStartTime(prefix string, notification *sql.ProjectionNotification) bool {
114136
key := prefix + fmt.Sprintf("%p", notification)
115137

116138
_, alreadyExists := m.notificationStartTimes.LoadOrStore(key, time.Now())
117-
118139
return !alreadyExists
119140
}

0 commit comments

Comments
 (0)