Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions cmd/alertmanager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func run() int {
maxSilenceSizeBytes = kingpin.Flag("silences.max-silence-size-bytes", "Maximum silence size in bytes. If negative or zero, no limit is set.").Default("0").Int()
alertGCInterval = kingpin.Flag("alerts.gc-interval", "Interval between alert GC.").Default("30m").Duration()
dispatchMaintenanceInterval = kingpin.Flag("dispatch.maintenance-interval", "Interval between maintenance of aggregation groups in the dispatcher.").Default("30s").Duration()
DispatchStartDelay = kingpin.Flag("dispatch.start-delay", "Minimum amount of time to wait before dispatching alerts. This option should be synced with value of --rules.alert.resend-delay on Prometheus.").Default("0s").Duration()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default in prometheus is 1m so should the default in AM also be 1m?

Copy link
Contributor Author

@siavashs siavashs Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially set this to 1m but it changes the default behaviour and lots of acceptance tests fail. We can avoid breaking the existing tests by setting it to 0 for all tests.
Adjusting timings is not possible since we add +1m to each test.

But I thought same thing could happen to users unexpectedly if they don't pay attention to the changelog and this new cmd flag.

I'm open to setting the default to 1m to sync it with Prometheus defaults.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, whatever you choose, you will choose wrong :D


webConfig = webflag.AddFlags(kingpin.CommandLine, ":9093")
externalURL = kingpin.Flag("web.external-url", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.").String()
Expand Down Expand Up @@ -415,7 +416,7 @@ func run() int {
prometheus.DefaultRegisterer,
configLogger,
)
configCoordinator.Subscribe(func(conf *config.Config) error {
configCoordinator.Subscribe(func(conf *config.Config, initial bool) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to rather pass a delay here, of type time.Duration, then we can pass 0 on reload, and *DispatchStartDelay on initial, rather?

tmpl, err = template.FromGlobs(conf.Templates)
if err != nil {
return fmt.Errorf("failed to parse templates: %w", err)
Expand Down Expand Up @@ -493,7 +494,17 @@ func run() int {
silencer.Mutes(labels)
})

disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, *dispatchMaintenanceInterval, nil, logger, dispMetrics)
disp = dispatch.NewDispatcher(
alerts,
routes,
pipeline,
marker,
timeoutFunc,
*dispatchMaintenanceInterval,
nil,
logger,
dispMetrics,
)
routes.Walk(func(r *dispatch.Route) {
if r.RouteOpts.RepeatInterval > *retention {
configLogger.Warn(
Expand All @@ -520,7 +531,13 @@ func run() int {
}
})

go disp.Run()
dispatchDelay := time.Duration(0)
if initial {
// Only set minDispatchTime if we're in the initial start and not in a reload.
// This ensures immediate dispatch after a reload and optional delay after initial start.
dispatchDelay = *DispatchStartDelay
}
go disp.Run(dispatchDelay)
go inhibitor.Run()

return nil
Expand Down
14 changes: 9 additions & 5 deletions config/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ type Coordinator struct {
logger *slog.Logger

// Protects config and subscribers
mutex sync.Mutex
config *Config
subscribers []func(*Config) error
mutex sync.Mutex
config *Config
subscribers []func(*Config, bool) error
initialReload bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we could have an initialDelay, and pass it into the function, plus set it to 0 after the initial notification.
This would b set explicitly at New and not implied...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case we can bypass this and just do the reset to zero after the first time dispatch runs in main.


configHashMetric prometheus.Gauge
configSuccessMetric prometheus.Gauge
Expand All @@ -46,6 +47,7 @@ func NewCoordinator(configFilePath string, r prometheus.Registerer, l *slog.Logg
c := &Coordinator{
configFilePath: configFilePath,
logger: l,
initialReload: true,
}

c.registerMetrics(r)
Expand Down Expand Up @@ -73,7 +75,7 @@ func (c *Coordinator) registerMetrics(r prometheus.Registerer) {
}

// Subscribe subscribes the given Subscribers to configuration changes.
func (c *Coordinator) Subscribe(ss ...func(*Config) error) {
func (c *Coordinator) Subscribe(ss ...func(*Config, bool) error) {
c.mutex.Lock()
defer c.mutex.Unlock()

Expand All @@ -82,11 +84,13 @@ func (c *Coordinator) Subscribe(ss ...func(*Config) error) {

func (c *Coordinator) notifySubscribers() error {
for _, s := range c.subscribers {
if err := s(c.config); err != nil {
if err := s(c.config, c.initialReload); err != nil {
return err
}
}

// Set initialReload to false after the first notification.
c.initialReload = false
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions config/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestCoordinatorRegistersMetrics(t *testing.T) {
func TestCoordinatorNotifiesSubscribers(t *testing.T) {
callBackCalled := false
c := NewCoordinator("testdata/conf.good.yml", prometheus.NewRegistry(), promslog.NewNopLogger())
c.Subscribe(func(*Config) error {
c.Subscribe(func(*Config, bool) error {
callBackCalled = true
return nil
})
Expand All @@ -68,7 +68,7 @@ func TestCoordinatorFailReloadWhenSubscriberFails(t *testing.T) {
errMessage := "something happened"
c := NewCoordinator("testdata/conf.good.yml", prometheus.NewRegistry(), promslog.NewNopLogger())

c.Subscribe(func(*Config) error {
c.Subscribe(func(*Config, bool) error {
return errors.New(errMessage)
})

Expand Down
136 changes: 99 additions & 37 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,26 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"go.uber.org/atomic"

"github.com/prometheus/alertmanager/notify"
"github.com/prometheus/alertmanager/provider"
"github.com/prometheus/alertmanager/store"
"github.com/prometheus/alertmanager/types"
)

const (
DispatcherStateUnknown = iota
DispatcherStateWaitingToStart
DispatcherStateRunning
)

var state = map[int]string{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably more idiomatic to implement Stringer if we need this.

e.g.

type DispatcherState int

const (
    DispatcherStateUnknown DispatcherState = iota
	DispatcherStateWaitingToStart DispatcherState
	DispatcherStateRunning DispatcherState
)

func (d DispatcherState) String() string {
    switch d {
        case DispatcherStateUnknown:
           return "unknown"
     ...
}

DispatcherStateUnknown: "unknown",
DispatcherStateWaitingToStart: "waiting_to_start",
DispatcherStateRunning: "running",
}

// DispatcherMetrics represents metrics associated to a dispatcher.
type DispatcherMetrics struct {
aggrGroups prometheus.Gauge
Expand Down Expand Up @@ -90,6 +103,9 @@ type Dispatcher struct {
cancel func()

logger *slog.Logger

startTimer *time.Timer
state int
}

// Limits describes limits used by Dispatcher.
Expand All @@ -102,39 +118,44 @@ type Limits interface {

// NewDispatcher returns a new Dispatcher.
func NewDispatcher(
ap provider.Alerts,
r *Route,
s notify.Stage,
mk types.GroupMarker,
to func(time.Duration) time.Duration,
mi time.Duration,
lim Limits,
l *slog.Logger,
m *DispatcherMetrics,
alerts provider.Alerts,
route *Route,
stage notify.Stage,
marker types.GroupMarker,
timeout func(time.Duration) time.Duration,
maintenanceInterval time.Duration,
limits Limits,
logger *slog.Logger,
metrics *DispatcherMetrics,
) *Dispatcher {
if lim == nil {
lim = nilLimits{}
if limits == nil {
limits = nilLimits{}
}

disp := &Dispatcher{
alerts: ap,
stage: s,
route: r,
marker: mk,
timeout: to,
maintenanceInterval: mi,
logger: l.With("component", "dispatcher"),
metrics: m,
limits: lim,
alerts: alerts,
stage: stage,
route: route,
marker: marker,
timeout: timeout,
maintenanceInterval: maintenanceInterval,
logger: logger.With("component", "dispatcher"),
metrics: metrics,
limits: limits,
state: DispatcherStateUnknown,
}
return disp
}

// Run starts dispatching alerts incoming via the updates channel.
func (d *Dispatcher) Run() {
func (d *Dispatcher) Run(dispatchDelay time.Duration) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nitpick, take it or leave it: I think it'd be slightly better if this took at startAt time.Time since that's a little closer to the meaning and it's a little more flexible for the caller, in my opinion.

d.done = make(chan struct{})

d.mtx.Lock()
d.logger.Debug("preparing to start", "dispatchDelay", dispatchDelay)
d.startTimer = time.NewTimer(dispatchDelay)
d.state = DispatcherStateWaitingToStart
d.logger.Debug("setting state", "state", state[d.state])
d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{}
d.aggrGroupsNum = 0
d.metrics.aggrGroups.Set(0)
Expand Down Expand Up @@ -176,6 +197,18 @@ func (d *Dispatcher) run(it provider.AlertIterator) {
}
d.metrics.processingDuration.Observe(time.Since(now).Seconds())

case <-d.startTimer.C:
if d.state == DispatcherStateWaitingToStart {
d.state = DispatcherStateRunning
d.logger.Debug("started", "state", state[d.state])
d.logger.Debug("Starting all existing aggregation groups")
for _, groups := range d.aggrGroupsPerRoute {
for _, ag := range groups {
d.runAG(ag)
}
}
}

case <-maintenance.C:
d.doMaintenance()
case <-d.ctx.Done():
Expand Down Expand Up @@ -311,6 +344,7 @@ type notifyFunc func(context.Context, ...*types.Alert) bool
// processAlert determines in which aggregation group the alert falls
// and inserts it.
func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
now := time.Now()
groupLabels := getGroupLabels(alert, route)

fp := groupLabels.Fingerprint()
Expand Down Expand Up @@ -347,6 +381,34 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
// alert is already there.
ag.insert(alert)

if alert.StartsAt.Add(ag.opts.GroupWait).Before(now) {
ag.logger.Debug(
"Alert is old enough for immediate flush, resetting timer to zero",
"alert", alert.Name(),
"fingerprint", alert.Fingerprint(),
"startsAt", alert.StartsAt,
)
ag.resetTimer(0)
}
// Check dispatcher and alert state to determine if we should run the AG now.
switch d.state {
case DispatcherStateWaitingToStart:
d.logger.Debug("Dispatcher still waiting to start")
case DispatcherStateRunning:
d.runAG(ag)
default:
s, ok := state[d.state]
if !ok {
s = "unknown"
}
d.logger.Warn("unknown state detected", "state", s)
}
}

func (d *Dispatcher) runAG(ag *aggrGroup) {
if ag.running.Load() {
return
}
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
if err != nil {
Expand Down Expand Up @@ -392,13 +454,18 @@ type aggrGroup struct {
done chan struct{}
next *time.Timer
timeout func(time.Duration) time.Duration

mtx sync.RWMutex
hasFlushed bool
running atomic.Bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we use go.uber.org/atomic elsewhere, but does this buy us anything over sync/atomic Bool? https://pkg.go.dev/sync/atomic#pkg-types Should we use this and start a switch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is currently a lint check which enforces this.
I guess we inherit this from Prometheus.
I need to check if this is still required.
cc @SuperQ

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, there's an open effort in Prometheus to switch to the new stdlib atomic types (those only got added in Go 1.19):

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I'm not sure. This was introduced in Prometheus in 2020. prometheus/prometheus#7647

I guess we'll need to find out from the rest of the Prometheus devs if we still need this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the check, std lib is what we want as per prometheus/prometheus#14866

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we can migrate to to the stdlib now: prometheus/prometheus#14866

}

// newAggrGroup returns a new aggregation group.
func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, marker types.AlertMarker, logger *slog.Logger) *aggrGroup {
func newAggrGroup(
ctx context.Context,
labels model.LabelSet,
r *Route,
to func(time.Duration) time.Duration,
marker types.AlertMarker,
logger *slog.Logger,
) *aggrGroup {
if to == nil {
to = func(d time.Duration) time.Duration { return d }
}
Expand Down Expand Up @@ -436,6 +503,7 @@ func (ag *aggrGroup) String() string {
}

func (ag *aggrGroup) run(nf notifyFunc) {
ag.running.Store(true)
defer close(ag.done)
defer ag.next.Stop()

Expand All @@ -462,10 +530,7 @@ func (ag *aggrGroup) run(nf notifyFunc) {
ctx = notify.WithRouteID(ctx, ag.routeID)

// Wait the configured interval before calling flush again.
ag.mtx.Lock()
ag.next.Reset(ag.opts.GroupInterval)
ag.hasFlushed = true
ag.mtx.Unlock()
ag.resetTimer(ag.opts.GroupInterval)

ag.flush(func(alerts ...*types.Alert) bool {
return nf(ctx, alerts...)
Expand All @@ -486,19 +551,16 @@ func (ag *aggrGroup) stop() {
<-ag.done
}

// resetTimer resets the timer for the AG.
func (ag *aggrGroup) resetTimer(t time.Duration) {
ag.next.Reset(t)
}

// insert inserts the alert into the aggregation group.
func (ag *aggrGroup) insert(alert *types.Alert) {
if err := ag.alerts.Set(alert); err != nil {
ag.logger.Error("error on set alert", "err", err)
}

// Immediately trigger a flush if the wait duration for this
// alert is already over.
ag.mtx.Lock()
defer ag.mtx.Unlock()
if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhat unrelated to this change, but I noticed it when reviewing the new code - I think there's a very minor logic bug here - if an alert's StartsAt is in the past, but not at least ag.opts.GroupWait in the past, I think we should check if the next flush is before or after it would be scheduled purely from the new alert. If it's after, we should reset the timer to that duration. I don't thin we're keeping track of the next flush time outside of the timer, so that'd need to change too 🤔

E.g.

wantedFlush := time.Since(alert.StartsAt.Add(ag.opts.GroupWait))
if wantedFlush < time.Duration(0) {
    wantedFlush = time.Duration(0)
}
actualFlush := ag.durationToNextFlush()
if wantedFlush < actualFlush {
  timer.Reset(wantedFlush)
}

I don't think we should change the behavior in this PR though. Perhaps as a follow up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, we can add it here or as a follow up.

ag.next.Reset(0)
}
}

func (ag *aggrGroup) empty() bool {
Expand Down
Loading
Loading