Skip to content

Commit fb35209

Browse files
committed
Add new behavior to avoid races on config reload
1 parent 8acfc6e commit fb35209

File tree

6 files changed

+128
-36
lines changed

6 files changed

+128
-36
lines changed

cmd/alertmanager/main.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,7 @@ func run() int {
491491
silencer.Mutes(labels)
492492
})
493493

494-
disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, *dispatchMaintenanceInterval, nil, logger, dispMetrics)
494+
newDisp := dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, *dispatchMaintenanceInterval, nil, logger, dispMetrics)
495495
routes.Walk(func(r *dispatch.Route) {
496496
if r.RouteOpts.RepeatInterval > *retention {
497497
configLogger.Warn(
@@ -518,8 +518,18 @@ func run() int {
518518
}
519519
})
520520

521-
go disp.Run()
521+
// first, start the inhibitor so the inhibition cache can populate
522+
// wait for this to load alerts before starting the dispatcher so
523+
// we don't accidentially notify for an alert that will be inhibited
522524
go inhibitor.Run()
525+
inhibitor.WaitForLoading()
526+
527+
// next, start the dispatcher and wait for it to load before swapping the disp pointer.
528+
// This ensures that the API doesn't see the new dispatcher before it finishes populating
529+
// the aggrGroups
530+
go newDisp.Run()
531+
newDisp.WaitForLoading()
532+
disp = newDisp
523533

524534
return nil
525535
})

dispatch/dispatch.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ type Dispatcher struct {
8181
timeout func(time.Duration) time.Duration
8282

8383
mtx sync.RWMutex
84+
loadingFinished sync.WaitGroup
8485
aggrGroupsPerRoute map[*Route]map[model.Fingerprint]*aggrGroup
8586
aggrGroupsNum int
8687

@@ -127,6 +128,7 @@ func NewDispatcher(
127128
metrics: m,
128129
limits: lim,
129130
}
131+
disp.loadingFinished.Add(1)
130132
return disp
131133
}
132134

@@ -141,7 +143,13 @@ func (d *Dispatcher) Run() {
141143
d.ctx, d.cancel = context.WithCancel(context.Background())
142144
d.mtx.Unlock()
143145

144-
d.run(d.alerts.Subscribe("dispatcher"))
146+
initalAlerts, it := d.alerts.SlurpAndSubscribe("dispatcher")
147+
for _, alert := range initalAlerts {
148+
d.ingestAlert(alert)
149+
}
150+
d.loadingFinished.Done()
151+
152+
d.run(it)
145153
close(d.done)
146154
}
147155

@@ -170,11 +178,7 @@ func (d *Dispatcher) run(it provider.AlertIterator) {
170178
continue
171179
}
172180

173-
now := time.Now()
174-
for _, r := range d.route.Match(alert.Labels) {
175-
d.processAlert(alert, r)
176-
}
177-
d.metrics.processingDuration.Observe(time.Since(now).Seconds())
181+
d.ingestAlert(alert)
178182

179183
case <-maintenance.C:
180184
d.doMaintenance()
@@ -200,6 +204,19 @@ func (d *Dispatcher) doMaintenance() {
200204
}
201205
}
202206

207+
func (d *Dispatcher) ingestAlert(alert *types.Alert) {
208+
now := time.Now()
209+
for _, r := range d.route.Match(alert.Labels) {
210+
d.processAlert(alert, r)
211+
}
212+
d.metrics.processingDuration.Observe(time.Since(now).Seconds())
213+
214+
}
215+
216+
func (d *Dispatcher) WaitForLoading() {
217+
d.loadingFinished.Wait()
218+
}
219+
203220
// AlertGroup represents how alerts exist within an aggrGroup.
204221
type AlertGroup struct {
205222
Alerts types.AlertSlice
@@ -222,6 +239,7 @@ func (ag AlertGroups) Len() int { return len(ag) }
222239

223240
// Groups returns a slice of AlertGroups from the dispatcher's internal state.
224241
func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*types.Alert, time.Time) bool) (AlertGroups, map[model.Fingerprint][]string) {
242+
d.WaitForLoading()
225243
groups := AlertGroups{}
226244

227245
d.mtx.RLock()

inhibit/inhibit.go

Lines changed: 44 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,9 @@ type Inhibitor struct {
4040
logger *slog.Logger
4141
metrics *InhibitorMetrics
4242

43-
mtx sync.RWMutex
44-
cancel func()
43+
mtx sync.RWMutex
44+
loadingFinished sync.WaitGroup
45+
cancel func()
4546
}
4647

4748
// NewInhibitor returns a new Inhibitor.
@@ -53,6 +54,7 @@ func NewInhibitor(ap provider.Alerts, rs []config.InhibitRule, mk types.AlertMar
5354
metrics: metrics,
5455
}
5556

57+
ih.loadingFinished.Add(1)
5658
ruleNames := make(map[string]struct{})
5759
for i, cr := range rs {
5860
if _, ok := ruleNames[cr.Name]; ok {
@@ -66,14 +68,19 @@ func NewInhibitor(ap provider.Alerts, rs []config.InhibitRule, mk types.AlertMar
6668
ruleNames[cr.Name] = struct{}{}
6769
}
6870
}
69-
7071
return ih
7172
}
7273

7374
func (ih *Inhibitor) run(ctx context.Context) {
74-
it := ih.alerts.Subscribe("inhibitor")
75+
initalAlerts, it := ih.alerts.SlurpAndSubscribe("inhibitor")
7576
defer it.Close()
7677

78+
for _, a := range initalAlerts {
79+
ih.processAlert(a)
80+
}
81+
82+
ih.loadingFinished.Done()
83+
7784
for {
7885
select {
7986
case <-ctx.Done():
@@ -83,33 +90,42 @@ func (ih *Inhibitor) run(ctx context.Context) {
8390
ih.logger.Error("Error iterating alerts", "err", err)
8491
continue
8592
}
86-
// Update the inhibition rules' cache.
87-
cachedSum := 0
88-
indexedSum := 0
89-
for _, r := range ih.rules {
90-
if r.SourceMatchers.Matches(a.Labels) {
91-
if err := r.scache.Set(a); err != nil {
92-
ih.logger.Error("error on set alert", "err", err)
93-
continue
94-
}
95-
r.updateIndex(a)
96-
97-
}
98-
cached := r.scache.Len()
99-
indexed := r.sindex.Len()
100-
101-
if r.Name != "" {
102-
r.metrics.sourceAlertsCacheItems.With(prometheus.Labels{"rule": r.Name}).Set(float64(cached))
103-
r.metrics.sourceAlertsIndexItems.With(prometheus.Labels{"rule": r.Name}).Set(float64(indexed))
104-
}
105-
106-
cachedSum += cached
107-
indexedSum += indexed
93+
ih.processAlert(a)
94+
}
95+
}
96+
}
97+
98+
func (ih *Inhibitor) processAlert(a *types.Alert) {
99+
// Update the inhibition rules' cache.
100+
cachedSum := 0
101+
indexedSum := 0
102+
for _, r := range ih.rules {
103+
if r.SourceMatchers.Matches(a.Labels) {
104+
if err := r.scache.Set(a); err != nil {
105+
ih.logger.Error("error on set alert", "err", err)
106+
continue
108107
}
109-
ih.metrics.sourceAlertsCacheItems.Set(float64(cachedSum))
110-
ih.metrics.sourceAlertsIndexItems.Set(float64(indexedSum))
108+
r.updateIndex(a)
109+
110+
}
111+
cached := r.scache.Len()
112+
indexed := r.sindex.Len()
113+
114+
if r.Name != "" {
115+
r.metrics.sourceAlertsCacheItems.With(prometheus.Labels{"rule": r.Name}).Set(float64(cached))
116+
r.metrics.sourceAlertsIndexItems.With(prometheus.Labels{"rule": r.Name}).Set(float64(indexed))
111117
}
118+
119+
cachedSum += cached
120+
indexedSum += indexed
112121
}
122+
ih.metrics.sourceAlertsCacheItems.Set(float64(cachedSum))
123+
ih.metrics.sourceAlertsIndexItems.Set(float64(indexedSum))
124+
125+
}
126+
127+
func (ih *Inhibitor) WaitForLoading() {
128+
ih.loadingFinished.Wait()
113129
}
114130

115131
// Run the Inhibitor's background processing.

inhibit/inhibit_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,26 @@ func (f *fakeAlerts) Subscribe(name string) provider.AlertIterator {
412412
}()
413413
return provider.NewAlertIterator(ch, done, nil)
414414
}
415+
func (f *fakeAlerts) SlurpAndSubscribe(name string) ([]*types.Alert, provider.AlertIterator) {
416+
ch := make(chan *types.Alert)
417+
done := make(chan struct{})
418+
go func() {
419+
for _, a := range f.alerts {
420+
ch <- a
421+
}
422+
// Send another (meaningless) alert to make sure that the inhibitor has
423+
// processed everything.
424+
ch <- &types.Alert{
425+
Alert: model.Alert{
426+
Labels: model.LabelSet{},
427+
StartsAt: time.Now(),
428+
},
429+
}
430+
close(f.finished)
431+
<-done
432+
}()
433+
return []*types.Alert{}, provider.NewAlertIterator(ch, done, nil)
434+
}
415435

416436
func TestInhibit(t *testing.T) {
417437
t.Parallel()

provider/mem/mem.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,22 @@ func (a *Alerts) Subscribe(name string) provider.AlertIterator {
195195
return provider.NewAlertIterator(ch, done, nil)
196196
}
197197

198+
func (a *Alerts) SlurpAndSubscribe(name string) ([]*types.Alert, provider.AlertIterator) {
199+
a.mtx.Lock()
200+
defer a.mtx.Unlock()
201+
202+
var (
203+
done = make(chan struct{})
204+
alerts = a.alerts.List()
205+
ch = make(chan *types.Alert, alertChannelLength)
206+
)
207+
208+
a.listeners[a.next] = listeningAlerts{name: name, alerts: ch, done: done}
209+
a.next++
210+
211+
return alerts, provider.NewAlertIterator(ch, done, nil)
212+
}
213+
198214
// GetPending returns an iterator over all the alerts that have
199215
// pending notifications.
200216
func (a *Alerts) GetPending() provider.AlertIterator {

provider/provider.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,18 @@ type Alerts interface {
7676
// resolved and successfully notified about.
7777
// They are not guaranteed to be in chronological order.
7878
Subscribe(name string) AlertIterator
79+
80+
// SlurpAndSubcribe returns a list of all active alerts which are available
81+
// in the provider before the call to SlurpAndSubcribe and an iterator
82+
// of all alerts available after the call to SlurpAndSubcribe.
83+
// SlurpAndSubcribe can be used by clients which need to build in memory state
84+
// to know when they've processed the 'initial' batch of alerts in a provider
85+
// after they reload their subscription.
86+
// Implementation of SlurpAndSubcribe is optional - providers may choose to
87+
// return an empty list for the first return value and the result of Subscribe
88+
// for the second return value.
89+
SlurpAndSubscribe(name string) ([]*types.Alert, AlertIterator)
90+
7991
// GetPending returns an iterator over all alerts that have
8092
// pending notifications.
8193
GetPending() AlertIterator

0 commit comments

Comments
 (0)