Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions cmd/alertmanager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func run() int {
silencer.Mutes(labels)
})

disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, *dispatchMaintenanceInterval, nil, logger, dispMetrics)
newDisp := dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, *dispatchMaintenanceInterval, nil, logger, dispMetrics)
routes.Walk(func(r *dispatch.Route) {
if r.RouteOpts.RepeatInterval > *retention {
configLogger.Warn(
Expand All @@ -518,8 +518,18 @@ func run() int {
}
})

go disp.Run()
// first, start the inhibitor so the inhibition cache can populate
// wait for this to load alerts before starting the dispatcher so
// we don't accidentially notify for an alert that will be inhibited
go inhibitor.Run()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if this is better, or we should instead load everything not in a goroutine, then call run?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, that actually would simplify things a little. The reason I did it this way was to allow this all to happen concurrently (and without waiting) if we wanted. However, in practice it's all sequential.

Maybe I could change this so that both the Inhibitor and Dispatcher have Load() methods? I think Run should still call Load(), but maybe we can make this work such that Load is a no-op after being called once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I could change this so that both the Inhibitor and Dispatcher have Load() methods? I think Run should still call Load(), but maybe we can make this work such that Load is a no-op after being called once.

I tried this, and it ended up pretty awkward. It might work, but not without a bigger refactoring. Basically, it becomes necessary to move the iterator into a struct field that's protected by the struct's global mutex. This is very awkward because we'd have to lock and unlock the mutex around reads of the iterator's channel.

The only easy fix would be splitting SlurpAndSubscribe method into two, but that's not desirable because the entire benefit comes from being able to atomic separate alerts into "before the call" and "after the call" groups.

inhibitor.WaitForLoading()

// next, start the dispatcher and wait for it to load before swapping the disp pointer.
// This ensures that the API doesn't see the new dispatcher before it finishes populating
// the aggrGroups
go newDisp.Run()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here...

newDisp.WaitForLoading()
disp = newDisp

return nil
})
Expand Down
29 changes: 23 additions & 6 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type Dispatcher struct {
timeout func(time.Duration) time.Duration

mtx sync.RWMutex
loadingFinished sync.WaitGroup
aggrGroupsPerRoute map[*Route]map[model.Fingerprint]*aggrGroup
aggrGroupsNum int

Expand Down Expand Up @@ -127,6 +128,7 @@ func NewDispatcher(
metrics: m,
limits: lim,
}
disp.loadingFinished.Add(1)
return disp
}

Expand All @@ -141,7 +143,13 @@ func (d *Dispatcher) Run() {
d.ctx, d.cancel = context.WithCancel(context.Background())
d.mtx.Unlock()

d.run(d.alerts.Subscribe("dispatcher"))
initalAlerts, it := d.alerts.SlurpAndSubscribe("dispatcher")
for _, alert := range initalAlerts {
d.ingestAlert(alert)
}
d.loadingFinished.Done()

d.run(it)
close(d.done)
}

Expand Down Expand Up @@ -170,11 +178,7 @@ func (d *Dispatcher) run(it provider.AlertIterator) {
continue
}

now := time.Now()
for _, r := range d.route.Match(alert.Labels) {
d.processAlert(alert, r)
}
d.metrics.processingDuration.Observe(time.Since(now).Seconds())
d.ingestAlert(alert)

case <-maintenance.C:
d.doMaintenance()
Expand All @@ -200,6 +204,18 @@ func (d *Dispatcher) doMaintenance() {
}
}

func (d *Dispatcher) ingestAlert(alert *types.Alert) {
now := time.Now()
for _, r := range d.route.Match(alert.Labels) {
d.processAlert(alert, r)
}
d.metrics.processingDuration.Observe(time.Since(now).Seconds())
}

func (d *Dispatcher) WaitForLoading() {
d.loadingFinished.Wait()
}

// AlertGroup represents how alerts exist within an aggrGroup.
type AlertGroup struct {
Alerts types.AlertSlice
Expand All @@ -222,6 +238,7 @@ func (ag AlertGroups) Len() int { return len(ag) }

// Groups returns a slice of AlertGroups from the dispatcher's internal state.
func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*types.Alert, time.Time) bool) (AlertGroups, map[model.Fingerprint][]string) {
d.WaitForLoading()
groups := AlertGroups{}

d.mtx.RLock()
Expand Down
71 changes: 43 additions & 28 deletions inhibit/inhibit.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ type Inhibitor struct {
logger *slog.Logger
metrics *InhibitorMetrics

mtx sync.RWMutex
cancel func()
mtx sync.RWMutex
loadingFinished sync.WaitGroup
cancel func()
}

// NewInhibitor returns a new Inhibitor.
Expand All @@ -53,6 +54,7 @@ func NewInhibitor(ap provider.Alerts, rs []config.InhibitRule, mk types.AlertMar
metrics: metrics,
}

ih.loadingFinished.Add(1)
ruleNames := make(map[string]struct{})
for i, cr := range rs {
if _, ok := ruleNames[cr.Name]; ok {
Expand All @@ -66,14 +68,19 @@ func NewInhibitor(ap provider.Alerts, rs []config.InhibitRule, mk types.AlertMar
ruleNames[cr.Name] = struct{}{}
}
}

return ih
}

func (ih *Inhibitor) run(ctx context.Context) {
it := ih.alerts.Subscribe("inhibitor")
initalAlerts, it := ih.alerts.SlurpAndSubscribe("inhibitor")
defer it.Close()

for _, a := range initalAlerts {
ih.processAlert(a)
}

ih.loadingFinished.Done()

for {
select {
case <-ctx.Done():
Expand All @@ -83,33 +90,41 @@ func (ih *Inhibitor) run(ctx context.Context) {
ih.logger.Error("Error iterating alerts", "err", err)
continue
}
// Update the inhibition rules' cache.
cachedSum := 0
indexedSum := 0
for _, r := range ih.rules {
if r.SourceMatchers.Matches(a.Labels) {
if err := r.scache.Set(a); err != nil {
ih.logger.Error("error on set alert", "err", err)
continue
}
r.updateIndex(a)

}
cached := r.scache.Len()
indexed := r.sindex.Len()

if r.Name != "" {
r.metrics.sourceAlertsCacheItems.With(prometheus.Labels{"rule": r.Name}).Set(float64(cached))
r.metrics.sourceAlertsIndexItems.With(prometheus.Labels{"rule": r.Name}).Set(float64(indexed))
}

cachedSum += cached
indexedSum += indexed
ih.processAlert(a)
}
}
}

func (ih *Inhibitor) processAlert(a *types.Alert) {
// Update the inhibition rules' cache.
cachedSum := 0
indexedSum := 0
for _, r := range ih.rules {
if r.SourceMatchers.Matches(a.Labels) {
if err := r.scache.Set(a); err != nil {
ih.logger.Error("error on set alert", "err", err)
continue
}
ih.metrics.sourceAlertsCacheItems.Set(float64(cachedSum))
ih.metrics.sourceAlertsIndexItems.Set(float64(indexedSum))
r.updateIndex(a)

}
cached := r.scache.Len()
indexed := r.sindex.Len()

if r.Name != "" {
r.metrics.sourceAlertsCacheItems.With(prometheus.Labels{"rule": r.Name}).Set(float64(cached))
r.metrics.sourceAlertsIndexItems.With(prometheus.Labels{"rule": r.Name}).Set(float64(indexed))
}

cachedSum += cached
indexedSum += indexed
}
ih.metrics.sourceAlertsCacheItems.Set(float64(cachedSum))
ih.metrics.sourceAlertsIndexItems.Set(float64(indexedSum))
}

func (ih *Inhibitor) WaitForLoading() {
ih.loadingFinished.Wait()
}

// Run the Inhibitor's background processing.
Expand Down
21 changes: 21 additions & 0 deletions inhibit/inhibit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,27 @@ func (f *fakeAlerts) Subscribe(name string) provider.AlertIterator {
return provider.NewAlertIterator(ch, done, nil)
}

func (f *fakeAlerts) SlurpAndSubscribe(name string) ([]*types.Alert, provider.AlertIterator) {
ch := make(chan *types.Alert)
done := make(chan struct{})
go func() {
for _, a := range f.alerts {
ch <- a
}
// Send another (meaningless) alert to make sure that the inhibitor has
// processed everything.
ch <- &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{},
StartsAt: time.Now(),
},
}
close(f.finished)
<-done
}()
return []*types.Alert{}, provider.NewAlertIterator(ch, done, nil)
}

func TestInhibit(t *testing.T) {
t.Parallel()

Expand Down
16 changes: 16 additions & 0 deletions provider/mem/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,22 @@ func (a *Alerts) Subscribe(name string) provider.AlertIterator {
return provider.NewAlertIterator(ch, done, nil)
}

func (a *Alerts) SlurpAndSubscribe(name string) ([]*types.Alert, provider.AlertIterator) {
a.mtx.Lock()
defer a.mtx.Unlock()

var (
done = make(chan struct{})
alerts = a.alerts.List()
ch = make(chan *types.Alert, alertChannelLength)
)

a.listeners[a.next] = listeningAlerts{name: name, alerts: ch, done: done}
a.next++

return alerts, provider.NewAlertIterator(ch, done, nil)
}

// GetPending returns an iterator over all the alerts that have
// pending notifications.
func (a *Alerts) GetPending() provider.AlertIterator {
Expand Down
12 changes: 12 additions & 0 deletions provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,18 @@ type Alerts interface {
// resolved and successfully notified about.
// They are not guaranteed to be in chronological order.
Subscribe(name string) AlertIterator

// SlurpAndSubcribe returns a list of all active alerts which are available
// in the provider before the call to SlurpAndSubcribe and an iterator
// of all alerts available after the call to SlurpAndSubcribe.
// SlurpAndSubcribe can be used by clients which need to build in memory state
// to know when they've processed the 'initial' batch of alerts in a provider
// after they reload their subscription.
// Implementation of SlurpAndSubcribe is optional - providers may choose to
// return an empty list for the first return value and the result of Subscribe
// for the second return value.
SlurpAndSubscribe(name string) ([]*types.Alert, AlertIterator)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok the name is funny, but maybe SnapshotAndSubscribe?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternative names SubscribeWithHistory() or SubscribeWithReplay().
Also why not modify Subscribe()? On startup it has no alerts, so we can always send if anything is in memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, happy to change the name. I think the origin of this name was "slurp up all the existing alerts" 😆

Also why not modify Subscribe()? On startup it has no alerts, so we can always send if anything is in memory.

I wanted to avoid every possible consumer needing to make a code change after this update. However, we already made a breaking change to Subscribe in the last few weeks, so maybe we can sneak this in there too.

I know there are a few projects out there which are linking alertmanager and depending on these interfaces...


// GetPending returns an iterator over all alerts that have
// pending notifications.
GetPending() AlertIterator
Expand Down
Loading