Skip to content

Commit f7ff687

Browse files
authored
feat(provider): add subscriber channel metrics (#4630)
Add `alertmanager_alerts_subscriber_channel_writes_total` metric to track the number of alerts written to subscriber channels. A drop in the rate of this metric may indicate a problem with the ingestion of alerts by subscribers (inhibitor and dispatcher). Signed-off-by: Siavash Safi <[email protected]>
1 parent 5a36856 commit f7ff687

File tree

7 files changed

+136
-30
lines changed

7 files changed

+136
-30
lines changed

dispatch/dispatch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func (d *Dispatcher) Run() {
144144
d.ctx, d.cancel = context.WithCancel(context.Background())
145145
d.mtx.Unlock()
146146

147-
d.run(d.alerts.Subscribe())
147+
d.run(d.alerts.Subscribe("dispatcher"))
148148
close(d.done)
149149
}
150150

dispatch/dispatch_test.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -392,16 +392,17 @@ route:
392392

393393
logger := promslog.NewNopLogger()
394394
route := NewRoute(conf.Route, nil)
395-
marker := types.NewMarker(prometheus.NewRegistry())
396-
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, nil)
395+
reg := prometheus.NewRegistry()
396+
marker := types.NewMarker(reg)
397+
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, reg)
397398
if err != nil {
398399
t.Fatal(err)
399400
}
400401
defer alerts.Close()
401402

402403
timeout := func(d time.Duration) time.Duration { return time.Duration(0) }
403404
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
404-
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()))
405+
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, reg))
405406
go dispatcher.Run()
406407
defer dispatcher.Stop()
407408

@@ -542,8 +543,9 @@ route:
542543

543544
logger := promslog.NewNopLogger()
544545
route := NewRoute(conf.Route, nil)
545-
marker := types.NewMarker(prometheus.NewRegistry())
546-
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, nil)
546+
reg := prometheus.NewRegistry()
547+
marker := types.NewMarker(reg)
548+
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, reg)
547549
if err != nil {
548550
t.Fatal(err)
549551
}
@@ -552,7 +554,7 @@ route:
552554
timeout := func(d time.Duration) time.Duration { return time.Duration(0) }
553555
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
554556
lim := limits{groups: 6}
555-
m := NewDispatcherMetrics(true, prometheus.NewRegistry())
557+
m := NewDispatcherMetrics(true, reg)
556558
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, testMaintenanceInterval, lim, logger, m)
557559
go dispatcher.Run()
558560
defer dispatcher.Stop()
@@ -663,15 +665,16 @@ func newAlert(labels model.LabelSet) *types.Alert {
663665

664666
func TestDispatcherRace(t *testing.T) {
665667
logger := promslog.NewNopLogger()
666-
marker := types.NewMarker(prometheus.NewRegistry())
667-
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, nil)
668+
reg := prometheus.NewRegistry()
669+
marker := types.NewMarker(reg)
670+
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, reg)
668671
if err != nil {
669672
t.Fatal(err)
670673
}
671674
defer alerts.Close()
672675

673676
timeout := func(d time.Duration) time.Duration { return time.Duration(0) }
674-
dispatcher := NewDispatcher(alerts, nil, nil, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()))
677+
dispatcher := NewDispatcher(alerts, nil, nil, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, reg))
675678
go dispatcher.Run()
676679
dispatcher.Stop()
677680
}
@@ -680,8 +683,9 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T)
680683
const numAlerts = 5000
681684

682685
logger := promslog.NewNopLogger()
683-
marker := types.NewMarker(prometheus.NewRegistry())
684-
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, nil)
686+
reg := prometheus.NewRegistry()
687+
marker := types.NewMarker(reg)
688+
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, reg)
685689
if err != nil {
686690
t.Fatal(err)
687691
}
@@ -699,7 +703,7 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T)
699703

700704
timeout := func(d time.Duration) time.Duration { return d }
701705
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
702-
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()))
706+
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, reg))
703707
go dispatcher.Run()
704708
defer dispatcher.Stop()
705709

@@ -735,7 +739,7 @@ func TestDispatcher_DoMaintenance(t *testing.T) {
735739
r := prometheus.NewRegistry()
736740
marker := types.NewMarker(r)
737741

738-
alerts, err := mem.NewAlerts(context.Background(), marker, time.Minute, nil, promslog.NewNopLogger(), nil)
742+
alerts, err := mem.NewAlerts(context.Background(), marker, time.Minute, nil, promslog.NewNopLogger(), r)
739743
if err != nil {
740744
t.Fatal(err)
741745
}

inhibit/inhibit.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func NewInhibitor(ap provider.Alerts, rs []config.InhibitRule, mk types.AlertMar
7171
}
7272

7373
func (ih *Inhibitor) run(ctx context.Context) {
74-
it := ih.alerts.Subscribe()
74+
it := ih.alerts.Subscribe("inhibitor")
7575
defer it.Close()
7676

7777
for {

inhibit/inhibit_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ func newFakeAlerts(alerts []*types.Alert) *fakeAlerts {
392392
func (f *fakeAlerts) GetPending() provider.AlertIterator { return nil }
393393
func (f *fakeAlerts) Get(model.Fingerprint) (*types.Alert, error) { return nil, nil }
394394
func (f *fakeAlerts) Put(...*types.Alert) error { return nil }
395-
func (f *fakeAlerts) Subscribe() provider.AlertIterator {
395+
func (f *fakeAlerts) Subscribe(name string) provider.AlertIterator {
396396
ch := make(chan *types.Alert)
397397
done := make(chan struct{})
398398
go func() {

provider/mem/mem.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"time"
2121

2222
"github.com/prometheus/client_golang/prometheus"
23+
"github.com/prometheus/client_golang/prometheus/promauto"
2324
"github.com/prometheus/common/model"
2425

2526
"github.com/prometheus/alertmanager/provider"
@@ -44,6 +45,8 @@ type Alerts struct {
4445
callback AlertStoreCallback
4546

4647
logger *slog.Logger
48+
49+
subscriberChannelWrites *prometheus.CounterVec
4750
}
4851

4952
type AlertStoreCallback interface {
@@ -61,13 +64,14 @@ type AlertStoreCallback interface {
6164
}
6265

6366
type listeningAlerts struct {
67+
name string
6468
alerts chan *types.Alert
6569
done chan struct{}
6670
}
6771

6872
func (a *Alerts) registerMetrics(r prometheus.Registerer) {
6973
newMemAlertByStatus := func(s types.AlertState) prometheus.GaugeFunc {
70-
return prometheus.NewGaugeFunc(
74+
return promauto.With(r).NewGaugeFunc(
7175
prometheus.GaugeOpts{
7276
Name: "alertmanager_alerts",
7377
Help: "How many alerts by state.",
@@ -79,9 +83,17 @@ func (a *Alerts) registerMetrics(r prometheus.Registerer) {
7983
)
8084
}
8185

82-
r.MustRegister(newMemAlertByStatus(types.AlertStateActive))
83-
r.MustRegister(newMemAlertByStatus(types.AlertStateSuppressed))
84-
r.MustRegister(newMemAlertByStatus(types.AlertStateUnprocessed))
86+
newMemAlertByStatus(types.AlertStateActive)
87+
newMemAlertByStatus(types.AlertStateSuppressed)
88+
newMemAlertByStatus(types.AlertStateUnprocessed)
89+
90+
a.subscriberChannelWrites = promauto.With(r).NewCounterVec(
91+
prometheus.CounterOpts{
92+
Name: "alertmanager_alerts_subscriber_channel_writes_total",
93+
Help: "Number of times alerts were written to subscriber channels",
94+
},
95+
[]string{"subscriber"},
96+
)
8597
}
8698

8799
// NewAlerts returns a new alert provider.
@@ -164,7 +176,7 @@ func max(a, b int) int {
164176
// Subscribe returns an iterator over active alerts that have not been
165177
// resolved and successfully notified about.
166178
// They are not guaranteed to be in chronological order.
167-
func (a *Alerts) Subscribe() provider.AlertIterator {
179+
func (a *Alerts) Subscribe(name string) provider.AlertIterator {
168180
a.mtx.Lock()
169181
defer a.mtx.Unlock()
170182
var (
@@ -177,7 +189,7 @@ func (a *Alerts) Subscribe() provider.AlertIterator {
177189
ch <- a
178190
}
179191

180-
a.listeners[a.next] = listeningAlerts{alerts: ch, done: done}
192+
a.listeners[a.next] = listeningAlerts{name: name, alerts: ch, done: done}
181193
a.next++
182194

183195
return provider.NewAlertIterator(ch, done, nil)
@@ -252,6 +264,7 @@ func (a *Alerts) Put(alerts ...*types.Alert) error {
252264
for _, l := range a.listeners {
253265
select {
254266
case l.alerts <- alert:
267+
a.subscriberChannelWrites.WithLabelValues(l.name).Inc()
255268
case <-l.done:
256269
}
257270
}

provider/mem/mem_test.go

Lines changed: 96 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,12 @@ func init() {
8787
// a listener can not unsubscribe as the lock is hold by `alerts.Lock`.
8888
func TestAlertsSubscribePutStarvation(t *testing.T) {
8989
marker := types.NewMarker(prometheus.NewRegistry())
90-
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), nil)
90+
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), prometheus.NewRegistry())
9191
if err != nil {
9292
t.Fatal(err)
9393
}
9494

95-
iterator := alerts.Subscribe()
95+
iterator := alerts.Subscribe("test")
9696

9797
alertsToInsert := []*types.Alert{}
9898
// Exhaust alert channel
@@ -142,7 +142,7 @@ func TestDeadLock(t *testing.T) {
142142

143143
marker := types.NewMarker(prometheus.NewRegistry())
144144
// Run gc every 5 milliseconds to increase the possibility of a deadlock with Subscribe()
145-
alerts, err := NewAlerts(context.Background(), marker, 5*time.Millisecond, noopCallback{}, promslog.NewNopLogger(), nil)
145+
alerts, err := NewAlerts(context.Background(), marker, 5*time.Millisecond, noopCallback{}, promslog.NewNopLogger(), prometheus.NewRegistry())
146146
if err != nil {
147147
t.Fatal(err)
148148
}
@@ -176,7 +176,7 @@ func TestDeadLock(t *testing.T) {
176176
for {
177177
select {
178178
case <-tick.C:
179-
alerts.Subscribe()
179+
alerts.Subscribe("test")
180180
case <-stopAfter:
181181
done <- true
182182
break
@@ -195,7 +195,7 @@ func TestDeadLock(t *testing.T) {
195195

196196
func TestAlertsPut(t *testing.T) {
197197
marker := types.NewMarker(prometheus.NewRegistry())
198-
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), nil)
198+
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), prometheus.NewRegistry())
199199
if err != nil {
200200
t.Fatal(err)
201201
}
@@ -223,7 +223,7 @@ func TestAlertsSubscribe(t *testing.T) {
223223

224224
ctx, cancel := context.WithCancel(context.Background())
225225
defer cancel()
226-
alerts, err := NewAlerts(ctx, marker, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), nil)
226+
alerts, err := NewAlerts(ctx, marker, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), prometheus.NewRegistry())
227227
if err != nil {
228228
t.Fatal(err)
229229
}
@@ -250,7 +250,7 @@ func TestAlertsSubscribe(t *testing.T) {
250250
go func(i int) {
251251
defer wg.Done()
252252

253-
it := alerts.Subscribe()
253+
it := alerts.Subscribe("test")
254254
defer it.Close()
255255

256256
received := make(map[model.Fingerprint]struct{})
@@ -621,3 +621,92 @@ func TestAlertsConcurrently(t *testing.T) {
621621
}, 2*expire, expire)
622622
require.Equal(t, int32(0), callback.alerts.Load())
623623
}
624+
625+
func TestSubscriberChannelMetrics(t *testing.T) {
626+
marker := types.NewMarker(prometheus.NewRegistry())
627+
reg := prometheus.NewRegistry()
628+
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), reg)
629+
require.NoError(t, err)
630+
631+
subscriberName := "test_subscriber"
632+
633+
// Subscribe to alerts
634+
iterator := alerts.Subscribe(subscriberName)
635+
defer iterator.Close()
636+
637+
// Consume alerts in the background
638+
go func() {
639+
for range iterator.Next() {
640+
// Just drain the channel
641+
}
642+
}()
643+
644+
// Helper function to get counter value
645+
getCounterValue := func(name, labelName, labelValue string) float64 {
646+
metrics, err := reg.Gather()
647+
require.NoError(t, err)
648+
for _, mf := range metrics {
649+
if mf.GetName() == name {
650+
for _, m := range mf.GetMetric() {
651+
for _, label := range m.GetLabel() {
652+
if label.GetName() == labelName && label.GetValue() == labelValue {
653+
return m.GetCounter().GetValue()
654+
}
655+
}
656+
}
657+
}
658+
}
659+
return 0
660+
}
661+
662+
// Initially, the counter should be 0
663+
writeCount := getCounterValue("alertmanager_alerts_subscriber_channel_writes_total", "subscriber", subscriberName)
664+
require.Equal(t, 0.0, writeCount, "subscriberChannelWrites should start at 0")
665+
666+
// Put some alerts
667+
now := time.Now()
668+
alertsToSend := []*types.Alert{
669+
{
670+
Alert: model.Alert{
671+
Labels: model.LabelSet{"test": "1"},
672+
Annotations: model.LabelSet{"foo": "bar"},
673+
StartsAt: now,
674+
EndsAt: now.Add(1 * time.Hour),
675+
GeneratorURL: "http://example.com/prometheus",
676+
},
677+
UpdatedAt: now,
678+
Timeout: false,
679+
},
680+
{
681+
Alert: model.Alert{
682+
Labels: model.LabelSet{"test": "2"},
683+
Annotations: model.LabelSet{"foo": "bar"},
684+
StartsAt: now,
685+
EndsAt: now.Add(1 * time.Hour),
686+
GeneratorURL: "http://example.com/prometheus",
687+
},
688+
UpdatedAt: now,
689+
Timeout: false,
690+
},
691+
{
692+
Alert: model.Alert{
693+
Labels: model.LabelSet{"test": "3"},
694+
Annotations: model.LabelSet{"foo": "bar"},
695+
StartsAt: now,
696+
EndsAt: now.Add(1 * time.Hour),
697+
GeneratorURL: "http://example.com/prometheus",
698+
},
699+
UpdatedAt: now,
700+
Timeout: false,
701+
},
702+
}
703+
704+
err = alerts.Put(alertsToSend...)
705+
require.NoError(t, err)
706+
707+
// Verify the counter incremented for each successful write
708+
require.Eventually(t, func() bool {
709+
writeCount := getCounterValue("alertmanager_alerts_subscriber_channel_writes_total", "subscriber", subscriberName)
710+
return writeCount == float64(len(alertsToSend))
711+
}, 1*time.Second, 10*time.Millisecond, "subscriberChannelWrites should equal the number of alerts sent")
712+
}

provider/provider.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ type Alerts interface {
7575
// Subscribe returns an iterator over active alerts that have not been
7676
// resolved and successfully notified about.
7777
// They are not guaranteed to be in chronological order.
78-
Subscribe() AlertIterator
78+
Subscribe(name string) AlertIterator
7979
// GetPending returns an iterator over all alerts that have
8080
// pending notifications.
8181
GetPending() AlertIterator

0 commit comments

Comments
 (0)