Skip to content

Commit 885824c

Browse files
alxricsiavashs
authored andcommitted
feat(dispatch): add start delay
This change adds a new cmd flag `--dispatch.start-delay` which corresponds to the `--rules.alert.resend-delay` flag in Prometheus. This flag controls the minimum amount of time that Prometheus waits before resending an alert to Alertmanager. By adding this value to the start time of Alertmanager, we delay the aggregation groups' first flush, until we are confident all alerts are resent by Prometheus instances. This should help avoid race conditions in inhibitions after a (re)start. Other improvements: - remove hasFlushed flag from aggrGroup - remove mutex locking from aggrGroup Signed-off-by: Alexander Rickardsson <[email protected]> Signed-off-by: Siavash Safi <[email protected]>
1 parent bbab475 commit 885824c

File tree

7 files changed

+302
-71
lines changed

7 files changed

+302
-71
lines changed

cmd/alertmanager/main.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ func run() int {
143143
maxSilenceSizeBytes = kingpin.Flag("silences.max-silence-size-bytes", "Maximum silence size in bytes. If negative or zero, no limit is set.").Default("0").Int()
144144
alertGCInterval = kingpin.Flag("alerts.gc-interval", "Interval between alert GC.").Default("30m").Duration()
145145
dispatchMaintenanceInterval = kingpin.Flag("dispatch.maintenance-interval", "Interval between maintenance of aggregation groups in the dispatcher.").Default("30s").Duration()
146+
DispatchStartDelay = kingpin.Flag("dispatch.start-delay", "Minimum amount of time to wait before dispatching alerts. This option should be synced with value of --rules.alert.resend-delay on Prometheus.").Default("0s").Duration()
146147

147148
webConfig = webflag.AddFlags(kingpin.CommandLine, ":9093")
148149
externalURL = kingpin.Flag("web.external-url", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.").String()
@@ -415,7 +416,7 @@ func run() int {
415416
prometheus.DefaultRegisterer,
416417
configLogger,
417418
)
418-
configCoordinator.Subscribe(func(conf *config.Config) error {
419+
configCoordinator.Subscribe(func(conf *config.Config, initial bool) error {
419420
tmpl, err = template.FromGlobs(conf.Templates)
420421
if err != nil {
421422
return fmt.Errorf("failed to parse templates: %w", err)
@@ -493,7 +494,17 @@ func run() int {
493494
silencer.Mutes(labels)
494495
})
495496

496-
disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, *dispatchMaintenanceInterval, nil, logger, dispMetrics)
497+
disp = dispatch.NewDispatcher(
498+
alerts,
499+
routes,
500+
pipeline,
501+
marker,
502+
timeoutFunc,
503+
*dispatchMaintenanceInterval,
504+
nil,
505+
logger,
506+
dispMetrics,
507+
)
497508
routes.Walk(func(r *dispatch.Route) {
498509
if r.RouteOpts.RepeatInterval > *retention {
499510
configLogger.Warn(
@@ -520,7 +531,13 @@ func run() int {
520531
}
521532
})
522533

523-
go disp.Run()
534+
dispatchDelay := time.Duration(0)
535+
if initial {
536+
// Only set minDispatchTime if we're in the initial start and not in a reload.
537+
// This ensures immediate dispatch after a reload and optional delay after initial start.
538+
dispatchDelay = *DispatchStartDelay
539+
}
540+
go disp.Run(dispatchDelay)
524541
go inhibitor.Run()
525542

526543
return nil

config/coordinator.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ type Coordinator struct {
3030
logger *slog.Logger
3131

3232
// Protects config and subscribers
33-
mutex sync.Mutex
34-
config *Config
35-
subscribers []func(*Config) error
33+
mutex sync.Mutex
34+
config *Config
35+
subscribers []func(*Config, bool) error
36+
initialReload bool
3637

3738
configHashMetric prometheus.Gauge
3839
configSuccessMetric prometheus.Gauge
@@ -46,6 +47,7 @@ func NewCoordinator(configFilePath string, r prometheus.Registerer, l *slog.Logg
4647
c := &Coordinator{
4748
configFilePath: configFilePath,
4849
logger: l,
50+
initialReload: true,
4951
}
5052

5153
c.registerMetrics(r)
@@ -73,7 +75,7 @@ func (c *Coordinator) registerMetrics(r prometheus.Registerer) {
7375
}
7476

7577
// Subscribe subscribes the given Subscribers to configuration changes.
76-
func (c *Coordinator) Subscribe(ss ...func(*Config) error) {
78+
func (c *Coordinator) Subscribe(ss ...func(*Config, bool) error) {
7779
c.mutex.Lock()
7880
defer c.mutex.Unlock()
7981

@@ -82,11 +84,13 @@ func (c *Coordinator) Subscribe(ss ...func(*Config) error) {
8284

8385
func (c *Coordinator) notifySubscribers() error {
8486
for _, s := range c.subscribers {
85-
if err := s(c.config); err != nil {
87+
if err := s(c.config, c.initialReload); err != nil {
8688
return err
8789
}
8890
}
8991

92+
// Set initialReload to false after the first notification.
93+
c.initialReload = false
9094
return nil
9195
}
9296

config/coordinator_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func TestCoordinatorRegistersMetrics(t *testing.T) {
4949
func TestCoordinatorNotifiesSubscribers(t *testing.T) {
5050
callBackCalled := false
5151
c := NewCoordinator("testdata/conf.good.yml", prometheus.NewRegistry(), promslog.NewNopLogger())
52-
c.Subscribe(func(*Config) error {
52+
c.Subscribe(func(*Config, bool) error {
5353
callBackCalled = true
5454
return nil
5555
})
@@ -68,7 +68,7 @@ func TestCoordinatorFailReloadWhenSubscriberFails(t *testing.T) {
6868
errMessage := "something happened"
6969
c := NewCoordinator("testdata/conf.good.yml", prometheus.NewRegistry(), promslog.NewNopLogger())
7070

71-
c.Subscribe(func(*Config) error {
71+
c.Subscribe(func(*Config, bool) error {
7272
return errors.New(errMessage)
7373
})
7474

dispatch/dispatch.go

Lines changed: 101 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,28 @@ import (
2525
"github.com/prometheus/client_golang/prometheus"
2626
"github.com/prometheus/client_golang/prometheus/promauto"
2727
"github.com/prometheus/common/model"
28+
"go.uber.org/atomic"
2829

2930
"github.com/prometheus/alertmanager/notify"
3031
"github.com/prometheus/alertmanager/provider"
3132
"github.com/prometheus/alertmanager/store"
3233
"github.com/prometheus/alertmanager/types"
3334
)
3435

36+
const (
37+
DispatcherStateUnknown = iota
38+
DispatcherStateWaitingToStart
39+
DispatcherStateRunning
40+
)
41+
42+
var (
43+
state = map[int]string{
44+
DispatcherStateUnknown: "unknown",
45+
DispatcherStateWaitingToStart: "waiting_to_start",
46+
DispatcherStateRunning: "running",
47+
}
48+
)
49+
3550
// DispatcherMetrics represents metrics associated to a dispatcher.
3651
type DispatcherMetrics struct {
3752
aggrGroups prometheus.Gauge
@@ -90,6 +105,9 @@ type Dispatcher struct {
90105
cancel func()
91106

92107
logger *slog.Logger
108+
109+
startTimer *time.Timer
110+
state int
93111
}
94112

95113
// Limits describes limits used by Dispatcher.
@@ -102,39 +120,44 @@ type Limits interface {
102120

103121
// NewDispatcher returns a new Dispatcher.
104122
func NewDispatcher(
105-
ap provider.Alerts,
106-
r *Route,
107-
s notify.Stage,
108-
mk types.GroupMarker,
109-
to func(time.Duration) time.Duration,
110-
mi time.Duration,
111-
lim Limits,
112-
l *slog.Logger,
113-
m *DispatcherMetrics,
123+
alerts provider.Alerts,
124+
route *Route,
125+
stage notify.Stage,
126+
marker types.GroupMarker,
127+
timeout func(time.Duration) time.Duration,
128+
maintenanceInterval time.Duration,
129+
limits Limits,
130+
logger *slog.Logger,
131+
metrics *DispatcherMetrics,
114132
) *Dispatcher {
115-
if lim == nil {
116-
lim = nilLimits{}
133+
if limits == nil {
134+
limits = nilLimits{}
117135
}
118136

119137
disp := &Dispatcher{
120-
alerts: ap,
121-
stage: s,
122-
route: r,
123-
marker: mk,
124-
timeout: to,
125-
maintenanceInterval: mi,
126-
logger: l.With("component", "dispatcher"),
127-
metrics: m,
128-
limits: lim,
138+
alerts: alerts,
139+
stage: stage,
140+
route: route,
141+
marker: marker,
142+
timeout: timeout,
143+
maintenanceInterval: maintenanceInterval,
144+
logger: logger.With("component", "dispatcher"),
145+
metrics: metrics,
146+
limits: limits,
147+
state: DispatcherStateUnknown,
129148
}
130149
return disp
131150
}
132151

133152
// Run starts dispatching alerts incoming via the updates channel.
134-
func (d *Dispatcher) Run() {
153+
func (d *Dispatcher) Run(dispatchDelay time.Duration) {
135154
d.done = make(chan struct{})
136155

137156
d.mtx.Lock()
157+
d.logger.Debug("preparing to start", "dispatchDelay", dispatchDelay)
158+
d.startTimer = time.NewTimer(dispatchDelay)
159+
d.state = DispatcherStateWaitingToStart
160+
d.logger.Debug("setting state", "state", state[d.state])
138161
d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{}
139162
d.aggrGroupsNum = 0
140163
d.metrics.aggrGroups.Set(0)
@@ -176,6 +199,18 @@ func (d *Dispatcher) run(it provider.AlertIterator) {
176199
}
177200
d.metrics.processingDuration.Observe(time.Since(now).Seconds())
178201

202+
case <-d.startTimer.C:
203+
if d.state == DispatcherStateWaitingToStart {
204+
d.state = DispatcherStateRunning
205+
d.logger.Debug("started", "state", state[d.state])
206+
d.logger.Debug("Starting all existing aggregation groups")
207+
for _, groups := range d.aggrGroupsPerRoute {
208+
for _, ag := range groups {
209+
d.runAG(ag)
210+
}
211+
}
212+
}
213+
179214
case <-maintenance.C:
180215
d.doMaintenance()
181216
case <-d.ctx.Done():
@@ -347,6 +382,35 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
347382
// alert is already there.
348383
ag.insert(alert)
349384

385+
// Check dispatcher and alert state to determine if we should start dispatching the alert.
386+
now := time.Now()
387+
switch d.state {
388+
case DispatcherStateWaitingToStart:
389+
d.logger.Debug("Dispatcher still waiting to start")
390+
case DispatcherStateRunning:
391+
if alert.StartsAt.Before(now) {
392+
ag.logger.Debug(
393+
"Alert is old enough for immediate flush, resetting timer to zero",
394+
"alert", alert.Name(),
395+
"fingerprint", alert.Fingerprint(),
396+
"startsAt", alert.StartsAt,
397+
)
398+
ag.resetTimer(0)
399+
d.runAG(ag)
400+
}
401+
default:
402+
s, ok := state[d.state]
403+
if !ok {
404+
s = "unknown"
405+
}
406+
d.logger.Warn("unknown state detected", "state", s)
407+
}
408+
}
409+
410+
func (d *Dispatcher) runAG(ag *aggrGroup) {
411+
if ag.running.Load() {
412+
return
413+
}
350414
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
351415
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
352416
if err != nil {
@@ -392,13 +456,18 @@ type aggrGroup struct {
392456
done chan struct{}
393457
next *time.Timer
394458
timeout func(time.Duration) time.Duration
395-
396-
mtx sync.RWMutex
397-
hasFlushed bool
459+
running atomic.Bool
398460
}
399461

400462
// newAggrGroup returns a new aggregation group.
401-
func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, marker types.AlertMarker, logger *slog.Logger) *aggrGroup {
463+
func newAggrGroup(
464+
ctx context.Context,
465+
labels model.LabelSet,
466+
r *Route,
467+
to func(time.Duration) time.Duration,
468+
marker types.AlertMarker,
469+
logger *slog.Logger,
470+
) *aggrGroup {
402471
if to == nil {
403472
to = func(d time.Duration) time.Duration { return d }
404473
}
@@ -436,6 +505,7 @@ func (ag *aggrGroup) String() string {
436505
}
437506

438507
func (ag *aggrGroup) run(nf notifyFunc) {
508+
ag.running.Store(true)
439509
defer close(ag.done)
440510
defer ag.next.Stop()
441511

@@ -462,10 +532,7 @@ func (ag *aggrGroup) run(nf notifyFunc) {
462532
ctx = notify.WithRouteID(ctx, ag.routeID)
463533

464534
// Wait the configured interval before calling flush again.
465-
ag.mtx.Lock()
466-
ag.next.Reset(ag.opts.GroupInterval)
467-
ag.hasFlushed = true
468-
ag.mtx.Unlock()
535+
ag.resetTimer(ag.opts.GroupInterval)
469536

470537
ag.flush(func(alerts ...*types.Alert) bool {
471538
return nf(ctx, alerts...)
@@ -486,19 +553,16 @@ func (ag *aggrGroup) stop() {
486553
<-ag.done
487554
}
488555

556+
// resetTimer resets the timer for the AG.
557+
func (ag *aggrGroup) resetTimer(t time.Duration) {
558+
ag.next.Reset(t)
559+
}
560+
489561
// insert inserts the alert into the aggregation group.
490562
func (ag *aggrGroup) insert(alert *types.Alert) {
491563
if err := ag.alerts.Set(alert); err != nil {
492564
ag.logger.Error("error on set alert", "err", err)
493565
}
494-
495-
// Immediately trigger a flush if the wait duration for this
496-
// alert is already over.
497-
ag.mtx.Lock()
498-
defer ag.mtx.Unlock()
499-
if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) {
500-
ag.next.Reset(0)
501-
}
502566
}
503567

504568
func (ag *aggrGroup) empty() bool {

0 commit comments

Comments
 (0)