forked from luno/reflex
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconsumer.go
More file actions
160 lines (132 loc) · 4.15 KB
/
consumer.go
File metadata and controls
160 lines (132 loc) · 4.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package reflex
import (
"context"
"time"
"github.com/luno/fate"
"github.com/prometheus/client_golang/prometheus"
"github.com/luno/reflex/internal/metrics"
"github.com/luno/reflex/internal/tracing"
)
const defaultLagAlert = 30 * time.Minute
const defaultActivityTTL = 24 * time.Hour
type consumer struct {
fn func(context.Context, fate.Fate, *Event) error
name string
lagAlert time.Duration
activityTTL time.Duration
ageHist prometheus.Observer
lagGauge prometheus.Gauge
lagAlertGauge prometheus.Gauge
errorCounter prometheus.Counter
latencyHist prometheus.Observer
filterIncludeTypes []EventType
activityKey string
}
// ConsumerOption will change the behaviour of the consumer
type ConsumerOption func(*consumer)
// WithConsumerLagAlert provides an option to set the consumer lag alert
// threshold.
func WithConsumerLagAlert(d time.Duration) ConsumerOption {
return func(c *consumer) {
c.lagAlert = d
}
}
// WithoutConsumerLag provides an option to disable the consumer lag alert.
func WithoutConsumerLag() ConsumerOption {
return func(c *consumer) {
c.lagAlert = -1
}
}
// WithConsumerLagAlertGauge provides an option to set the consumer lag alert
// gauge. Handy for custom alert metadata as labels.
func WithConsumerLagAlertGauge(g prometheus.Gauge) ConsumerOption {
return func(c *consumer) {
c.lagAlertGauge = g
}
}
// WithConsumerActivityTTL provides an option to set the consumer activity
// metric ttl; ie. if no events is consumed in `tll` duration the consumer
// is considered inactive.
func WithConsumerActivityTTL(ttl time.Duration) ConsumerOption {
return func(c *consumer) {
c.activityTTL = ttl
}
}
// WithoutConsumerActivityTTL provides an option to disable the consumer activity metric ttl.
func WithoutConsumerActivityTTL() ConsumerOption {
return func(c *consumer) {
c.activityTTL = -1
}
}
// WithFilterIncludeTypes provides an option to specify which EventTypes a consumer is interested in.
// For uninteresting events Consume is never called, and a skipped metric is incremented.
func WithFilterIncludeTypes(evts ...EventType) ConsumerOption {
return func(c *consumer) {
c.filterIncludeTypes = evts
}
}
// NewConsumer returns a new instrumented consumer of events.
func NewConsumer(name string, fn func(context.Context, fate.Fate, *Event) error,
opts ...ConsumerOption) Consumer {
ls := metrics.Labels(name)
c := &consumer{
fn: fn,
name: name,
lagAlert: defaultLagAlert,
activityTTL: defaultActivityTTL,
ageHist: metrics.ConsumerAge.With(ls),
lagAlertGauge: metrics.ConsumerLagAlert.With(ls),
errorCounter: metrics.ConsumerErrors.With(ls),
latencyHist: metrics.ConsumerLatency.With(ls),
}
for _, o := range opts {
o(c)
}
c.activityKey = metrics.ConsumerActivityGauge.Register(ls, c.activityTTL)
_ = c.Reset()
return c
}
func (c *consumer) Name() string {
return c.name
}
func (c *consumer) Consume(ctx context.Context, ft fate.Fate,
event *Event) error {
t0 := time.Now()
metrics.ConsumerActivityGauge.SetActive(c.activityKey)
lag := t0.Sub(event.Timestamp)
c.lagGauge.Set(lag.Seconds())
c.ageHist.Observe(lag.Seconds())
alert := 0.0
if lag > c.lagAlert && c.lagAlert > 0 {
alert = 1
}
c.lagAlertGauge.Set(alert)
hasTraceData := len(event.Trace) > 0
if hasTraceData {
// Load any trace information into the context to allow logging with trace id and manually
// configuring a trace from within the consumer.
ctx = tracing.Inject(ctx, event.Trace)
}
var err error
if len(c.filterIncludeTypes) == 0 || IsAnyType(event.Type, c.filterIncludeTypes...) {
err = c.fn(ctx, ft, event)
if err != nil && !IsExpected(err) {
c.errorCounter.Inc()
}
latency := time.Since(t0)
c.latencyHist.Observe(latency.Seconds())
} else {
metrics.ConsumerSkippedEvents.WithLabelValues(c.name).Inc()
}
return err
}
// Reset the consumer, create metrics ready for Consume
func (c *consumer) Reset() error {
c.lagGauge = metrics.ConsumerLag.With(metrics.Labels(c.name))
return nil
}
// Stop the consumer, discard metrics
func (c *consumer) Stop() error {
metrics.ConsumerLag.Delete(metrics.Labels(c.name))
return nil
}