diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 3e0bb994eb..ca40976452 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -143,6 +143,7 @@ func run() int { maxSilenceSizeBytes = kingpin.Flag("silences.max-silence-size-bytes", "Maximum silence size in bytes. If negative or zero, no limit is set.").Default("0").Int() alertGCInterval = kingpin.Flag("alerts.gc-interval", "Interval between alert GC.").Default("30m").Duration() dispatchMaintenanceInterval = kingpin.Flag("dispatch.maintenance-interval", "Interval between maintenance of aggregation groups in the dispatcher.").Default("30s").Duration() + DispatchStartDelay = kingpin.Flag("dispatch.start-delay", "Minimum amount of time to wait before dispatching alerts. This option should be synced with value of --rules.alert.resend-delay on Prometheus.").Default("0s").Duration() webConfig = webflag.AddFlags(kingpin.CommandLine, ":9093") externalURL = kingpin.Flag("web.external-url", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.").String() @@ -415,7 +416,7 @@ func run() int { prometheus.DefaultRegisterer, configLogger, ) - configCoordinator.Subscribe(func(conf *config.Config) error { + configCoordinator.Subscribe(func(conf *config.Config, initial bool) error { tmpl, err = template.FromGlobs(conf.Templates) if err != nil { return fmt.Errorf("failed to parse templates: %w", err) @@ -493,7 +494,17 @@ func run() int { silencer.Mutes(labels) }) - disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, *dispatchMaintenanceInterval, nil, logger, dispMetrics) + disp = dispatch.NewDispatcher( + alerts, + routes, + pipeline, + marker, + timeoutFunc, + *dispatchMaintenanceInterval, + nil, + logger, + dispMetrics, + ) routes.Walk(func(r *dispatch.Route) { if r.RouteOpts.RepeatInterval > *retention { configLogger.Warn( @@ -520,7 +531,13 @@ func run() int { } }) - go disp.Run() + dispatchDelay := time.Duration(0) + if initial { + // Only set minDispatchTime if we're in the initial start and not in a reload. + // This ensures immediate dispatch after a reload and optional delay after initial start. + dispatchDelay = *DispatchStartDelay + } + go disp.Run(dispatchDelay) go inhibitor.Run() return nil diff --git a/config/coordinator.go b/config/coordinator.go index e9f3e21d8f..f563a442d9 100644 --- a/config/coordinator.go +++ b/config/coordinator.go @@ -30,9 +30,10 @@ type Coordinator struct { logger *slog.Logger // Protects config and subscribers - mutex sync.Mutex - config *Config - subscribers []func(*Config) error + mutex sync.Mutex + config *Config + subscribers []func(*Config, bool) error + initialReload bool configHashMetric prometheus.Gauge configSuccessMetric prometheus.Gauge @@ -46,6 +47,7 @@ func NewCoordinator(configFilePath string, r prometheus.Registerer, l *slog.Logg c := &Coordinator{ configFilePath: configFilePath, logger: l, + initialReload: true, } c.registerMetrics(r) @@ -73,7 +75,7 @@ func (c *Coordinator) registerMetrics(r prometheus.Registerer) { } // Subscribe subscribes the given Subscribers to configuration changes. -func (c *Coordinator) Subscribe(ss ...func(*Config) error) { +func (c *Coordinator) Subscribe(ss ...func(*Config, bool) error) { c.mutex.Lock() defer c.mutex.Unlock() @@ -82,11 +84,13 @@ func (c *Coordinator) Subscribe(ss ...func(*Config) error) { func (c *Coordinator) notifySubscribers() error { for _, s := range c.subscribers { - if err := s(c.config); err != nil { + if err := s(c.config, c.initialReload); err != nil { return err } } + // Set initialReload to false after the first notification. + c.initialReload = false return nil } diff --git a/config/coordinator_test.go b/config/coordinator_test.go index 4ddebb9528..35c84a390d 100644 --- a/config/coordinator_test.go +++ b/config/coordinator_test.go @@ -49,7 +49,7 @@ func TestCoordinatorRegistersMetrics(t *testing.T) { func TestCoordinatorNotifiesSubscribers(t *testing.T) { callBackCalled := false c := NewCoordinator("testdata/conf.good.yml", prometheus.NewRegistry(), promslog.NewNopLogger()) - c.Subscribe(func(*Config) error { + c.Subscribe(func(*Config, bool) error { callBackCalled = true return nil }) @@ -68,7 +68,7 @@ func TestCoordinatorFailReloadWhenSubscriberFails(t *testing.T) { errMessage := "something happened" c := NewCoordinator("testdata/conf.good.yml", prometheus.NewRegistry(), promslog.NewNopLogger()) - c.Subscribe(func(*Config) error { + c.Subscribe(func(*Config, bool) error { return errors.New(errMessage) }) diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index d1d7a39eb0..2a016e8792 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" + "go.uber.org/atomic" "github.com/prometheus/alertmanager/notify" "github.com/prometheus/alertmanager/provider" @@ -32,6 +33,18 @@ import ( "github.com/prometheus/alertmanager/types" ) +const ( + DispatcherStateUnknown = iota + DispatcherStateWaitingToStart + DispatcherStateRunning +) + +var state = map[int]string{ + DispatcherStateUnknown: "unknown", + DispatcherStateWaitingToStart: "waiting_to_start", + DispatcherStateRunning: "running", +} + // DispatcherMetrics represents metrics associated to a dispatcher. type DispatcherMetrics struct { aggrGroups prometheus.Gauge @@ -90,6 +103,9 @@ type Dispatcher struct { cancel func() logger *slog.Logger + + startTimer *time.Timer + state int } // Limits describes limits used by Dispatcher. @@ -102,39 +118,44 @@ type Limits interface { // NewDispatcher returns a new Dispatcher. func NewDispatcher( - ap provider.Alerts, - r *Route, - s notify.Stage, - mk types.GroupMarker, - to func(time.Duration) time.Duration, - mi time.Duration, - lim Limits, - l *slog.Logger, - m *DispatcherMetrics, + alerts provider.Alerts, + route *Route, + stage notify.Stage, + marker types.GroupMarker, + timeout func(time.Duration) time.Duration, + maintenanceInterval time.Duration, + limits Limits, + logger *slog.Logger, + metrics *DispatcherMetrics, ) *Dispatcher { - if lim == nil { - lim = nilLimits{} + if limits == nil { + limits = nilLimits{} } disp := &Dispatcher{ - alerts: ap, - stage: s, - route: r, - marker: mk, - timeout: to, - maintenanceInterval: mi, - logger: l.With("component", "dispatcher"), - metrics: m, - limits: lim, + alerts: alerts, + stage: stage, + route: route, + marker: marker, + timeout: timeout, + maintenanceInterval: maintenanceInterval, + logger: logger.With("component", "dispatcher"), + metrics: metrics, + limits: limits, + state: DispatcherStateUnknown, } return disp } // Run starts dispatching alerts incoming via the updates channel. -func (d *Dispatcher) Run() { +func (d *Dispatcher) Run(dispatchDelay time.Duration) { d.done = make(chan struct{}) d.mtx.Lock() + d.logger.Debug("preparing to start", "dispatchDelay", dispatchDelay) + d.startTimer = time.NewTimer(dispatchDelay) + d.state = DispatcherStateWaitingToStart + d.logger.Debug("setting state", "state", state[d.state]) d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{} d.aggrGroupsNum = 0 d.metrics.aggrGroups.Set(0) @@ -176,6 +197,18 @@ func (d *Dispatcher) run(it provider.AlertIterator) { } d.metrics.processingDuration.Observe(time.Since(now).Seconds()) + case <-d.startTimer.C: + if d.state == DispatcherStateWaitingToStart { + d.state = DispatcherStateRunning + d.logger.Debug("started", "state", state[d.state]) + d.logger.Debug("Starting all existing aggregation groups") + for _, groups := range d.aggrGroupsPerRoute { + for _, ag := range groups { + d.runAG(ag) + } + } + } + case <-maintenance.C: d.doMaintenance() case <-d.ctx.Done(): @@ -311,6 +344,7 @@ type notifyFunc func(context.Context, ...*types.Alert) bool // processAlert determines in which aggregation group the alert falls // and inserts it. func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { + now := time.Now() groupLabels := getGroupLabels(alert, route) fp := groupLabels.Fingerprint() @@ -347,6 +381,34 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { // alert is already there. ag.insert(alert) + if alert.StartsAt.Add(ag.opts.GroupWait).Before(now) { + ag.logger.Debug( + "Alert is old enough for immediate flush, resetting timer to zero", + "alert", alert.Name(), + "fingerprint", alert.Fingerprint(), + "startsAt", alert.StartsAt, + ) + ag.resetTimer(0) + } + // Check dispatcher and alert state to determine if we should run the AG now. + switch d.state { + case DispatcherStateWaitingToStart: + d.logger.Debug("Dispatcher still waiting to start") + case DispatcherStateRunning: + d.runAG(ag) + default: + s, ok := state[d.state] + if !ok { + s = "unknown" + } + d.logger.Warn("unknown state detected", "state", s) + } +} + +func (d *Dispatcher) runAG(ag *aggrGroup) { + if ag.running.Load() { + return + } go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool { _, _, err := d.stage.Exec(ctx, d.logger, alerts...) if err != nil { @@ -392,13 +454,18 @@ type aggrGroup struct { done chan struct{} next *time.Timer timeout func(time.Duration) time.Duration - - mtx sync.RWMutex - hasFlushed bool + running atomic.Bool } // newAggrGroup returns a new aggregation group. -func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, marker types.AlertMarker, logger *slog.Logger) *aggrGroup { +func newAggrGroup( + ctx context.Context, + labels model.LabelSet, + r *Route, + to func(time.Duration) time.Duration, + marker types.AlertMarker, + logger *slog.Logger, +) *aggrGroup { if to == nil { to = func(d time.Duration) time.Duration { return d } } @@ -436,6 +503,7 @@ func (ag *aggrGroup) String() string { } func (ag *aggrGroup) run(nf notifyFunc) { + ag.running.Store(true) defer close(ag.done) defer ag.next.Stop() @@ -462,10 +530,7 @@ func (ag *aggrGroup) run(nf notifyFunc) { ctx = notify.WithRouteID(ctx, ag.routeID) // Wait the configured interval before calling flush again. - ag.mtx.Lock() - ag.next.Reset(ag.opts.GroupInterval) - ag.hasFlushed = true - ag.mtx.Unlock() + ag.resetTimer(ag.opts.GroupInterval) ag.flush(func(alerts ...*types.Alert) bool { return nf(ctx, alerts...) @@ -486,19 +551,16 @@ func (ag *aggrGroup) stop() { <-ag.done } +// resetTimer resets the timer for the AG. +func (ag *aggrGroup) resetTimer(t time.Duration) { + ag.next.Reset(t) +} + // insert inserts the alert into the aggregation group. func (ag *aggrGroup) insert(alert *types.Alert) { if err := ag.alerts.Set(alert); err != nil { ag.logger.Error("error on set alert", "err", err) } - - // Immediately trigger a flush if the wait duration for this - // alert is already over. - ag.mtx.Lock() - defer ag.mtx.Unlock() - if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) { - ag.next.Reset(0) - } } func (ag *aggrGroup) empty() bool { diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index fe4df6d2ef..e654dbd05e 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -191,8 +191,6 @@ func TestAggrGroup(t *testing.T) { ag.stop() - // Add an alert that started more than group_interval in the past. We expect - // immediate flushing. // Finally, set all alerts to be resolved. After successful notify the aggregation group // should empty itself. ag = newAggrGroup(context.Background(), lset, route, nil, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger()) @@ -201,18 +199,12 @@ func TestAggrGroup(t *testing.T) { ag.insert(a1) ag.insert(a2) - // a2 lies way in the past so the initial group_wait should be skipped. - select { - case <-time.After(opts.GroupWait / 2): - t.Fatalf("expected immediate alert but received none") - - case batch := <-alertsCh: - exp := removeEndsAt(types.AlertSlice{a1, a2}) - sort.Sort(batch) + batch := <-alertsCh + exp := removeEndsAt(types.AlertSlice{a1, a2}) + sort.Sort(batch) - if !reflect.DeepEqual(batch, exp) { - t.Fatalf("expected alerts %v but got %v", exp, batch) - } + if !reflect.DeepEqual(batch, exp) { + t.Fatalf("expected alerts %v but got %v", exp, batch) } for i := 0; i < 3; i++ { @@ -243,7 +235,7 @@ func TestAggrGroup(t *testing.T) { a1r := *a1 a1r.EndsAt = time.Now() ag.insert(&a1r) - exp := append(types.AlertSlice{&a1r}, removeEndsAt(types.AlertSlice{a2, a3})...) + exp = append(types.AlertSlice{&a1r}, removeEndsAt(types.AlertSlice{a2, a3})...) select { case <-time.After(2 * opts.GroupInterval): @@ -403,7 +395,7 @@ route: timeout := func(d time.Duration) time.Duration { return time.Duration(0) } recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, reg)) - go dispatcher.Run() + go dispatcher.Run(time.Duration(0)) defer dispatcher.Stop() // Create alerts. the dispatcher will automatically create the groups. @@ -556,7 +548,7 @@ route: lim := limits{groups: 6} m := NewDispatcherMetrics(true, reg) dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, testMaintenanceInterval, lim, logger, m) - go dispatcher.Run() + go dispatcher.Run(time.Duration(0)) defer dispatcher.Stop() // Create alerts. the dispatcher will automatically create the groups. @@ -675,7 +667,7 @@ func TestDispatcherRace(t *testing.T) { timeout := func(d time.Duration) time.Duration { return time.Duration(0) } dispatcher := NewDispatcher(alerts, nil, nil, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, reg)) - go dispatcher.Run() + go dispatcher.Run(time.Duration(0)) dispatcher.Stop() } @@ -704,7 +696,7 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T) timeout := func(d time.Duration) time.Duration { return d } recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, reg)) - go dispatcher.Run() + go dispatcher.Run(time.Duration(0)) defer dispatcher.Stop() // Push all alerts. @@ -973,3 +965,95 @@ func TestDispatcher_DeleteResolvedAlertsFromMarker(t *testing.T) { require.True(t, marker.Active(resolvedAlert.Fingerprint()), "marker should not be deleted when alert is modified during flush") }) } + +func TestDispatchOnStartup(t *testing.T) { + logger := promslog.NewNopLogger() + reg := prometheus.NewRegistry() + marker := types.NewMarker(reg) + alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, reg) + if err != nil { + t.Fatal(err) + } + defer alerts.Close() + + // Set up a route with GroupBy to separate alerts into different aggregation groups. + route := &Route{ + RouteOpts: RouteOpts{ + Receiver: "default", + GroupBy: map[model.LabelName]struct{}{"instance": {}}, + GroupWait: 1 * time.Second, + GroupInterval: 3 * time.Minute, + RepeatInterval: 1 * time.Hour, + }, + } + + recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} + timeout := func(d time.Duration) time.Duration { return d } + + // Set start time to 3 seconds in the future + now := time.Now() + startDelay := 2 * time.Second + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, reg)) + go dispatcher.Run(startDelay) + defer dispatcher.Stop() + + // Create 2 similar alerts with start times in the past + alert1 := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{"alertname": "TestAlert1", "instance": "1"}, + Annotations: model.LabelSet{"foo": "bar"}, + StartsAt: now.Add(-1 * time.Hour), + EndsAt: now.Add(time.Hour), + GeneratorURL: "http://example.com/prometheus", + }, + UpdatedAt: now, + Timeout: false, + } + + alert2 := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{"alertname": "TestAlert2", "instance": "2"}, + Annotations: model.LabelSet{"foo": "bar"}, + StartsAt: now.Add(-1 * time.Hour), + EndsAt: now.Add(time.Hour), + GeneratorURL: "http://example.com/prometheus", + }, + UpdatedAt: now, + Timeout: false, + } + + // Send alert1 + require.NoError(t, alerts.Put(alert1)) + + var recordedAlerts []*types.Alert + // Expect a recorded alert after startTime + GroupWait which is in future + require.Eventually(t, func() bool { + recordedAlerts = recorder.Alerts() + return len(recordedAlerts) == 1 + }, startDelay+route.RouteOpts.GroupWait, 500*time.Millisecond) + + require.Equal(t, alert1.Fingerprint(), recordedAlerts[0].Fingerprint(), "expected alert1 to be dispatched after GroupWait") + + // Send alert2 + require.NoError(t, alerts.Put(alert2)) + + // Expect a recorded alert after GroupInterval + require.Eventually(t, func() bool { + recordedAlerts = recorder.Alerts() + return len(recordedAlerts) == 2 + }, route.RouteOpts.GroupInterval, 100*time.Millisecond) + + // Sort alerts by fingerprint for deterministic ordering + sort.Slice(recordedAlerts, func(i, j int) bool { + return recordedAlerts[i].Fingerprint() < recordedAlerts[j].Fingerprint() + }) + require.Equal(t, alert2.Fingerprint(), recordedAlerts[1].Fingerprint(), "expected alert2 to be dispatched after GroupInterval") + + // Verify both alerts are present + fingerprints := make(map[model.Fingerprint]bool) + for _, a := range recordedAlerts { + fingerprints[a.Fingerprint()] = true + } + require.True(t, fingerprints[alert1.Fingerprint()], "expected alert1 to be present") + require.True(t, fingerprints[alert2.Fingerprint()], "expected alert2 to be present") +} diff --git a/test/with_api_v2/acceptance.go b/test/with_api_v2/acceptance.go index 4a1f824b7a..60878d1248 100644 --- a/test/with_api_v2/acceptance.go +++ b/test/with_api_v2/acceptance.go @@ -164,7 +164,7 @@ func (t *AcceptanceTest) Collector(name string) *Collector { // Run starts all Alertmanagers and runs queries against them. It then checks // whether all expected notifications have arrived at the expected receiver. -func (t *AcceptanceTest) Run() { +func (t *AcceptanceTest) Run(additionalArgs ...string) { errc := make(chan error) for _, am := range t.amc.ams { @@ -173,7 +173,7 @@ func (t *AcceptanceTest) Run() { t.Cleanup(am.cleanup) } - err := t.amc.Start() + err := t.amc.Start(additionalArgs...) if err != nil { t.Log(err) t.Fail() @@ -263,14 +263,14 @@ type AlertmanagerCluster struct { } // Start the Alertmanager cluster and wait until it is ready to receive. -func (amc *AlertmanagerCluster) Start() error { - var peerFlags []string +func (amc *AlertmanagerCluster) Start(additionalArgs ...string) error { + args := additionalArgs for _, am := range amc.ams { - peerFlags = append(peerFlags, "--cluster.peer="+am.clusterAddr) + args = append(args, "--cluster.peer="+am.clusterAddr) } for _, am := range amc.ams { - err := am.Start(peerFlags) + err := am.Start(args) if err != nil { return fmt.Errorf("failed to start alertmanager cluster: %w", err) } diff --git a/test/with_api_v2/acceptance/send_test.go b/test/with_api_v2/acceptance/send_test.go index fa190f44c1..1678299320 100644 --- a/test/with_api_v2/acceptance/send_test.go +++ b/test/with_api_v2/acceptance/send_test.go @@ -419,6 +419,68 @@ receivers: } } +func TestColdStart(t *testing.T) { + t.Parallel() + + // This integration test ensures that the first alert isn't notified before + // the AlertManager process has started considering the resend delay. + conf := ` +route: + receiver: "default" + group_by: [] + group_wait: 1s + group_interval: 6s + repeat_interval: 10m + +receivers: +- name: "default" + webhook_configs: + - url: 'http://%s' +` + + at := NewAcceptanceTest(t, &AcceptanceOpts{ + Tolerance: 150 * time.Millisecond, + }) + + co := at.Collector("webhook") + wh := NewWebhook(t, co) + + amc := at.AlertmanagerCluster(fmt.Sprintf(conf, wh.Address()), 1) + + amc.Push(At(1), Alert("alertname", "test1").Active(-100)) + amc.Push(At(2), Alert("alertname", "test2")) + + // Alerts are dispatched 5 seconds after the AlertManager process has started. + // start delay: 5s + // first alert received at: 1s + // first alert dispatched at: 5s - 1s = 4s + co.Want(Between(4, 5), + Alert("alertname", "test1").Active(1), + Alert("alertname", "test2").Active(4), + ) + + // Reload AlertManager process. + at.Do(At(5), amc.Reload) + + amc.Push(At(6), Alert("alertname", "test3").Active(-100)) + amc.Push(At(7), Alert("alertname", "test4")) + + // Group interval is applied on top of start delay. + // start delay: 5s + // group interval: 6s + // alerts dispatched at: 5s + 6s = 11s + co.Want(Between(11, 11.5), + Alert("alertname", "test1").Active(1), + Alert("alertname", "test2").Active(4), + Alert("alertname", "test3").Active(6), + Alert("alertname", "test4").Active(7), + ) + + at.Run("--dispatch.start-delay", "5s") + + t.Log(co.Check()) +} + func TestReload(t *testing.T) { t.Parallel()