diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index d29f0576cd..4c37a09b13 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -491,7 +491,7 @@ func run() int { silencer.Mutes(labels) }) - disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, *dispatchMaintenanceInterval, nil, logger, dispMetrics) + newDisp := dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, *dispatchMaintenanceInterval, nil, logger, dispMetrics) routes.Walk(func(r *dispatch.Route) { if r.RouteOpts.RepeatInterval > *retention { configLogger.Warn( @@ -518,8 +518,18 @@ func run() int { } }) - go disp.Run() + // first, start the inhibitor so the inhibition cache can populate + // wait for this to load alerts before starting the dispatcher so + // we don't accidentially notify for an alert that will be inhibited go inhibitor.Run() + inhibitor.WaitForLoading() + + // next, start the dispatcher and wait for it to load before swapping the disp pointer. + // This ensures that the API doesn't see the new dispatcher before it finishes populating + // the aggrGroups + go newDisp.Run() + newDisp.WaitForLoading() + disp = newDisp return nil }) diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 9eff1816ec..974c6d3cb2 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -81,6 +81,7 @@ type Dispatcher struct { timeout func(time.Duration) time.Duration mtx sync.RWMutex + loadingFinished sync.WaitGroup aggrGroupsPerRoute map[*Route]map[model.Fingerprint]*aggrGroup aggrGroupsNum int @@ -127,6 +128,7 @@ func NewDispatcher( metrics: m, limits: lim, } + disp.loadingFinished.Add(1) return disp } @@ -141,7 +143,13 @@ func (d *Dispatcher) Run() { d.ctx, d.cancel = context.WithCancel(context.Background()) d.mtx.Unlock() - d.run(d.alerts.Subscribe("dispatcher")) + initalAlerts, it := d.alerts.SlurpAndSubscribe("dispatcher") + for _, alert := range initalAlerts { + d.ingestAlert(alert) + } + d.loadingFinished.Done() + + d.run(it) close(d.done) } @@ -170,11 +178,7 @@ func (d *Dispatcher) run(it provider.AlertIterator) { continue } - now := time.Now() - for _, r := range d.route.Match(alert.Labels) { - d.processAlert(alert, r) - } - d.metrics.processingDuration.Observe(time.Since(now).Seconds()) + d.ingestAlert(alert) case <-maintenance.C: d.doMaintenance() @@ -200,6 +204,18 @@ func (d *Dispatcher) doMaintenance() { } } +func (d *Dispatcher) ingestAlert(alert *types.Alert) { + now := time.Now() + for _, r := range d.route.Match(alert.Labels) { + d.processAlert(alert, r) + } + d.metrics.processingDuration.Observe(time.Since(now).Seconds()) +} + +func (d *Dispatcher) WaitForLoading() { + d.loadingFinished.Wait() +} + // AlertGroup represents how alerts exist within an aggrGroup. type AlertGroup struct { Alerts types.AlertSlice @@ -222,6 +238,7 @@ func (ag AlertGroups) Len() int { return len(ag) } // Groups returns a slice of AlertGroups from the dispatcher's internal state. func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*types.Alert, time.Time) bool) (AlertGroups, map[model.Fingerprint][]string) { + d.WaitForLoading() groups := AlertGroups{} d.mtx.RLock() diff --git a/inhibit/inhibit.go b/inhibit/inhibit.go index f0eae6af82..31eae39fd1 100644 --- a/inhibit/inhibit.go +++ b/inhibit/inhibit.go @@ -40,8 +40,9 @@ type Inhibitor struct { logger *slog.Logger metrics *InhibitorMetrics - mtx sync.RWMutex - cancel func() + mtx sync.RWMutex + loadingFinished sync.WaitGroup + cancel func() } // NewInhibitor returns a new Inhibitor. @@ -53,6 +54,7 @@ func NewInhibitor(ap provider.Alerts, rs []config.InhibitRule, mk types.AlertMar metrics: metrics, } + ih.loadingFinished.Add(1) ruleNames := make(map[string]struct{}) for i, cr := range rs { if _, ok := ruleNames[cr.Name]; ok { @@ -66,14 +68,19 @@ func NewInhibitor(ap provider.Alerts, rs []config.InhibitRule, mk types.AlertMar ruleNames[cr.Name] = struct{}{} } } - return ih } func (ih *Inhibitor) run(ctx context.Context) { - it := ih.alerts.Subscribe("inhibitor") + initalAlerts, it := ih.alerts.SlurpAndSubscribe("inhibitor") defer it.Close() + for _, a := range initalAlerts { + ih.processAlert(a) + } + + ih.loadingFinished.Done() + for { select { case <-ctx.Done(): @@ -83,33 +90,41 @@ func (ih *Inhibitor) run(ctx context.Context) { ih.logger.Error("Error iterating alerts", "err", err) continue } - // Update the inhibition rules' cache. - cachedSum := 0 - indexedSum := 0 - for _, r := range ih.rules { - if r.SourceMatchers.Matches(a.Labels) { - if err := r.scache.Set(a); err != nil { - ih.logger.Error("error on set alert", "err", err) - continue - } - r.updateIndex(a) - - } - cached := r.scache.Len() - indexed := r.sindex.Len() - - if r.Name != "" { - r.metrics.sourceAlertsCacheItems.With(prometheus.Labels{"rule": r.Name}).Set(float64(cached)) - r.metrics.sourceAlertsIndexItems.With(prometheus.Labels{"rule": r.Name}).Set(float64(indexed)) - } - - cachedSum += cached - indexedSum += indexed + ih.processAlert(a) + } + } +} + +func (ih *Inhibitor) processAlert(a *types.Alert) { + // Update the inhibition rules' cache. + cachedSum := 0 + indexedSum := 0 + for _, r := range ih.rules { + if r.SourceMatchers.Matches(a.Labels) { + if err := r.scache.Set(a); err != nil { + ih.logger.Error("error on set alert", "err", err) + continue } - ih.metrics.sourceAlertsCacheItems.Set(float64(cachedSum)) - ih.metrics.sourceAlertsIndexItems.Set(float64(indexedSum)) + r.updateIndex(a) + + } + cached := r.scache.Len() + indexed := r.sindex.Len() + + if r.Name != "" { + r.metrics.sourceAlertsCacheItems.With(prometheus.Labels{"rule": r.Name}).Set(float64(cached)) + r.metrics.sourceAlertsIndexItems.With(prometheus.Labels{"rule": r.Name}).Set(float64(indexed)) } + + cachedSum += cached + indexedSum += indexed } + ih.metrics.sourceAlertsCacheItems.Set(float64(cachedSum)) + ih.metrics.sourceAlertsIndexItems.Set(float64(indexedSum)) +} + +func (ih *Inhibitor) WaitForLoading() { + ih.loadingFinished.Wait() } // Run the Inhibitor's background processing. diff --git a/inhibit/inhibit_test.go b/inhibit/inhibit_test.go index 2d73a8eba2..6326cf1037 100644 --- a/inhibit/inhibit_test.go +++ b/inhibit/inhibit_test.go @@ -413,6 +413,27 @@ func (f *fakeAlerts) Subscribe(name string) provider.AlertIterator { return provider.NewAlertIterator(ch, done, nil) } +func (f *fakeAlerts) SlurpAndSubscribe(name string) ([]*types.Alert, provider.AlertIterator) { + ch := make(chan *types.Alert) + done := make(chan struct{}) + go func() { + for _, a := range f.alerts { + ch <- a + } + // Send another (meaningless) alert to make sure that the inhibitor has + // processed everything. + ch <- &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{}, + StartsAt: time.Now(), + }, + } + close(f.finished) + <-done + }() + return []*types.Alert{}, provider.NewAlertIterator(ch, done, nil) +} + func TestInhibit(t *testing.T) { t.Parallel() diff --git a/provider/mem/mem.go b/provider/mem/mem.go index 4995ad2944..5c1512c986 100644 --- a/provider/mem/mem.go +++ b/provider/mem/mem.go @@ -195,6 +195,22 @@ func (a *Alerts) Subscribe(name string) provider.AlertIterator { return provider.NewAlertIterator(ch, done, nil) } +func (a *Alerts) SlurpAndSubscribe(name string) ([]*types.Alert, provider.AlertIterator) { + a.mtx.Lock() + defer a.mtx.Unlock() + + var ( + done = make(chan struct{}) + alerts = a.alerts.List() + ch = make(chan *types.Alert, alertChannelLength) + ) + + a.listeners[a.next] = listeningAlerts{name: name, alerts: ch, done: done} + a.next++ + + return alerts, provider.NewAlertIterator(ch, done, nil) +} + // GetPending returns an iterator over all the alerts that have // pending notifications. func (a *Alerts) GetPending() provider.AlertIterator { diff --git a/provider/provider.go b/provider/provider.go index 0763e02d02..867e8149db 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -76,6 +76,18 @@ type Alerts interface { // resolved and successfully notified about. // They are not guaranteed to be in chronological order. Subscribe(name string) AlertIterator + + // SlurpAndSubcribe returns a list of all active alerts which are available + // in the provider before the call to SlurpAndSubcribe and an iterator + // of all alerts available after the call to SlurpAndSubcribe. + // SlurpAndSubcribe can be used by clients which need to build in memory state + // to know when they've processed the 'initial' batch of alerts in a provider + // after they reload their subscription. + // Implementation of SlurpAndSubcribe is optional - providers may choose to + // return an empty list for the first return value and the result of Subscribe + // for the second return value. + SlurpAndSubscribe(name string) ([]*types.Alert, AlertIterator) + // GetPending returns an iterator over all alerts that have // pending notifications. GetPending() AlertIterator