Skip to content

Commit d527f3f

Browse files
alxricsiavashs
authored andcommitted
feat(dispatch): add start delay
This change adds a new cmd flag `--dispatch.start-delay` which corresponds to the `--rules.alert.resend-delay` flag in Prometheus. This flag controls the minimum amount of time that Prometheus waits before resending an alert to Alertmanager. By adding this value to the start time of Alertmanager, we delay the aggregation groups' first flush, until we are confident all alerts are resent by Prometheus instances. This should help avoid race conditions in inhibitions after a (re)start. Other improvements: - remove hasFlushed flag from aggrGroup - remove mutex locking from aggrGroup Signed-off-by: Alexander Rickardsson <[email protected]> Signed-off-by: Siavash Safi <[email protected]>
1 parent 888fb29 commit d527f3f

File tree

7 files changed

+300
-71
lines changed

7 files changed

+300
-71
lines changed

cmd/alertmanager/main.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ func run() int {
143143
maxSilenceSizeBytes = kingpin.Flag("silences.max-silence-size-bytes", "Maximum silence size in bytes. If negative or zero, no limit is set.").Default("0").Int()
144144
alertGCInterval = kingpin.Flag("alerts.gc-interval", "Interval between alert GC.").Default("30m").Duration()
145145
dispatchMaintenanceInterval = kingpin.Flag("dispatch.maintenance-interval", "Interval between maintenance of aggregation groups in the dispatcher.").Default("30s").Duration()
146+
DispatchStartDelay = kingpin.Flag("dispatch.start-delay", "Minimum amount of time to wait before dispatching alerts. This option should be synced with value of --rules.alert.resend-delay on Prometheus.").Default("0s").Duration()
146147

147148
webConfig = webflag.AddFlags(kingpin.CommandLine, ":9093")
148149
externalURL = kingpin.Flag("web.external-url", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.").String()
@@ -415,7 +416,7 @@ func run() int {
415416
prometheus.DefaultRegisterer,
416417
configLogger,
417418
)
418-
configCoordinator.Subscribe(func(conf *config.Config) error {
419+
configCoordinator.Subscribe(func(conf *config.Config, initial bool) error {
419420
tmpl, err = template.FromGlobs(conf.Templates)
420421
if err != nil {
421422
return fmt.Errorf("failed to parse templates: %w", err)
@@ -493,7 +494,17 @@ func run() int {
493494
silencer.Mutes(labels)
494495
})
495496

496-
disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, *dispatchMaintenanceInterval, nil, logger, dispMetrics)
497+
disp = dispatch.NewDispatcher(
498+
alerts,
499+
routes,
500+
pipeline,
501+
marker,
502+
timeoutFunc,
503+
*dispatchMaintenanceInterval,
504+
nil,
505+
logger,
506+
dispMetrics,
507+
)
497508
routes.Walk(func(r *dispatch.Route) {
498509
if r.RouteOpts.RepeatInterval > *retention {
499510
configLogger.Warn(
@@ -520,7 +531,13 @@ func run() int {
520531
}
521532
})
522533

523-
go disp.Run()
534+
dispatchDelay := time.Duration(0)
535+
if initial {
536+
// Only set minDispatchTime if we're in the initial start and not in a reload.
537+
// This ensures immediate dispatch after a reload and optional delay after initial start.
538+
dispatchDelay = *DispatchStartDelay
539+
}
540+
go disp.Run(dispatchDelay)
524541
go inhibitor.Run()
525542

526543
return nil

config/coordinator.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ type Coordinator struct {
3030
logger *slog.Logger
3131

3232
// Protects config and subscribers
33-
mutex sync.Mutex
34-
config *Config
35-
subscribers []func(*Config) error
33+
mutex sync.Mutex
34+
config *Config
35+
subscribers []func(*Config, bool) error
36+
initialReload bool
3637

3738
configHashMetric prometheus.Gauge
3839
configSuccessMetric prometheus.Gauge
@@ -46,6 +47,7 @@ func NewCoordinator(configFilePath string, r prometheus.Registerer, l *slog.Logg
4647
c := &Coordinator{
4748
configFilePath: configFilePath,
4849
logger: l,
50+
initialReload: true,
4951
}
5052

5153
c.registerMetrics(r)
@@ -73,7 +75,7 @@ func (c *Coordinator) registerMetrics(r prometheus.Registerer) {
7375
}
7476

7577
// Subscribe subscribes the given Subscribers to configuration changes.
76-
func (c *Coordinator) Subscribe(ss ...func(*Config) error) {
78+
func (c *Coordinator) Subscribe(ss ...func(*Config, bool) error) {
7779
c.mutex.Lock()
7880
defer c.mutex.Unlock()
7981

@@ -82,11 +84,13 @@ func (c *Coordinator) Subscribe(ss ...func(*Config) error) {
8284

8385
func (c *Coordinator) notifySubscribers() error {
8486
for _, s := range c.subscribers {
85-
if err := s(c.config); err != nil {
87+
if err := s(c.config, c.initialReload); err != nil {
8688
return err
8789
}
8890
}
8991

92+
// Set initialReload to false after the first notification.
93+
c.initialReload = false
9094
return nil
9195
}
9296

config/coordinator_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func TestCoordinatorRegistersMetrics(t *testing.T) {
4949
func TestCoordinatorNotifiesSubscribers(t *testing.T) {
5050
callBackCalled := false
5151
c := NewCoordinator("testdata/conf.good.yml", prometheus.NewRegistry(), promslog.NewNopLogger())
52-
c.Subscribe(func(*Config) error {
52+
c.Subscribe(func(*Config, bool) error {
5353
callBackCalled = true
5454
return nil
5555
})
@@ -68,7 +68,7 @@ func TestCoordinatorFailReloadWhenSubscriberFails(t *testing.T) {
6868
errMessage := "something happened"
6969
c := NewCoordinator("testdata/conf.good.yml", prometheus.NewRegistry(), promslog.NewNopLogger())
7070

71-
c.Subscribe(func(*Config) error {
71+
c.Subscribe(func(*Config, bool) error {
7272
return errors.New(errMessage)
7373
})
7474

dispatch/dispatch.go

Lines changed: 99 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,26 @@ import (
2525
"github.com/prometheus/client_golang/prometheus"
2626
"github.com/prometheus/client_golang/prometheus/promauto"
2727
"github.com/prometheus/common/model"
28+
"go.uber.org/atomic"
2829

2930
"github.com/prometheus/alertmanager/notify"
3031
"github.com/prometheus/alertmanager/provider"
3132
"github.com/prometheus/alertmanager/store"
3233
"github.com/prometheus/alertmanager/types"
3334
)
3435

36+
const (
37+
DispatcherStateUnknown = iota
38+
DispatcherStateWaitingToStart
39+
DispatcherStateRunning
40+
)
41+
42+
var state = map[int]string{
43+
DispatcherStateUnknown: "unknown",
44+
DispatcherStateWaitingToStart: "waiting_to_start",
45+
DispatcherStateRunning: "running",
46+
}
47+
3548
// DispatcherMetrics represents metrics associated to a dispatcher.
3649
type DispatcherMetrics struct {
3750
aggrGroups prometheus.Gauge
@@ -90,6 +103,9 @@ type Dispatcher struct {
90103
cancel func()
91104

92105
logger *slog.Logger
106+
107+
startTimer *time.Timer
108+
state int
93109
}
94110

95111
// Limits describes limits used by Dispatcher.
@@ -102,39 +118,44 @@ type Limits interface {
102118

103119
// NewDispatcher returns a new Dispatcher.
104120
func NewDispatcher(
105-
ap provider.Alerts,
106-
r *Route,
107-
s notify.Stage,
108-
mk types.GroupMarker,
109-
to func(time.Duration) time.Duration,
110-
mi time.Duration,
111-
lim Limits,
112-
l *slog.Logger,
113-
m *DispatcherMetrics,
121+
alerts provider.Alerts,
122+
route *Route,
123+
stage notify.Stage,
124+
marker types.GroupMarker,
125+
timeout func(time.Duration) time.Duration,
126+
maintenanceInterval time.Duration,
127+
limits Limits,
128+
logger *slog.Logger,
129+
metrics *DispatcherMetrics,
114130
) *Dispatcher {
115-
if lim == nil {
116-
lim = nilLimits{}
131+
if limits == nil {
132+
limits = nilLimits{}
117133
}
118134

119135
disp := &Dispatcher{
120-
alerts: ap,
121-
stage: s,
122-
route: r,
123-
marker: mk,
124-
timeout: to,
125-
maintenanceInterval: mi,
126-
logger: l.With("component", "dispatcher"),
127-
metrics: m,
128-
limits: lim,
136+
alerts: alerts,
137+
stage: stage,
138+
route: route,
139+
marker: marker,
140+
timeout: timeout,
141+
maintenanceInterval: maintenanceInterval,
142+
logger: logger.With("component", "dispatcher"),
143+
metrics: metrics,
144+
limits: limits,
145+
state: DispatcherStateUnknown,
129146
}
130147
return disp
131148
}
132149

133150
// Run starts dispatching alerts incoming via the updates channel.
134-
func (d *Dispatcher) Run() {
151+
func (d *Dispatcher) Run(dispatchDelay time.Duration) {
135152
d.done = make(chan struct{})
136153

137154
d.mtx.Lock()
155+
d.logger.Debug("preparing to start", "dispatchDelay", dispatchDelay)
156+
d.startTimer = time.NewTimer(dispatchDelay)
157+
d.state = DispatcherStateWaitingToStart
158+
d.logger.Debug("setting state", "state", state[d.state])
138159
d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{}
139160
d.aggrGroupsNum = 0
140161
d.metrics.aggrGroups.Set(0)
@@ -176,6 +197,18 @@ func (d *Dispatcher) run(it provider.AlertIterator) {
176197
}
177198
d.metrics.processingDuration.Observe(time.Since(now).Seconds())
178199

200+
case <-d.startTimer.C:
201+
if d.state == DispatcherStateWaitingToStart {
202+
d.state = DispatcherStateRunning
203+
d.logger.Debug("started", "state", state[d.state])
204+
d.logger.Debug("Starting all existing aggregation groups")
205+
for _, groups := range d.aggrGroupsPerRoute {
206+
for _, ag := range groups {
207+
d.runAG(ag)
208+
}
209+
}
210+
}
211+
179212
case <-maintenance.C:
180213
d.doMaintenance()
181214
case <-d.ctx.Done():
@@ -311,6 +344,7 @@ type notifyFunc func(context.Context, ...*types.Alert) bool
311344
// processAlert determines in which aggregation group the alert falls
312345
// and inserts it.
313346
func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
347+
now := time.Now()
314348
groupLabels := getGroupLabels(alert, route)
315349

316350
fp := groupLabels.Fingerprint()
@@ -347,6 +381,34 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
347381
// alert is already there.
348382
ag.insert(alert)
349383

384+
if alert.StartsAt.Add(ag.opts.GroupWait).Before(now) {
385+
ag.logger.Debug(
386+
"Alert is old enough for immediate flush, resetting timer to zero",
387+
"alert", alert.Name(),
388+
"fingerprint", alert.Fingerprint(),
389+
"startsAt", alert.StartsAt,
390+
)
391+
ag.resetTimer(0)
392+
}
393+
// Check dispatcher and alert state to determine if we should run the AG now.
394+
switch d.state {
395+
case DispatcherStateWaitingToStart:
396+
d.logger.Debug("Dispatcher still waiting to start")
397+
case DispatcherStateRunning:
398+
d.runAG(ag)
399+
default:
400+
s, ok := state[d.state]
401+
if !ok {
402+
s = "unknown"
403+
}
404+
d.logger.Warn("unknown state detected", "state", s)
405+
}
406+
}
407+
408+
func (d *Dispatcher) runAG(ag *aggrGroup) {
409+
if ag.running.Load() {
410+
return
411+
}
350412
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
351413
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
352414
if err != nil {
@@ -392,13 +454,18 @@ type aggrGroup struct {
392454
done chan struct{}
393455
next *time.Timer
394456
timeout func(time.Duration) time.Duration
395-
396-
mtx sync.RWMutex
397-
hasFlushed bool
457+
running atomic.Bool
398458
}
399459

400460
// newAggrGroup returns a new aggregation group.
401-
func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, marker types.AlertMarker, logger *slog.Logger) *aggrGroup {
461+
func newAggrGroup(
462+
ctx context.Context,
463+
labels model.LabelSet,
464+
r *Route,
465+
to func(time.Duration) time.Duration,
466+
marker types.AlertMarker,
467+
logger *slog.Logger,
468+
) *aggrGroup {
402469
if to == nil {
403470
to = func(d time.Duration) time.Duration { return d }
404471
}
@@ -436,6 +503,7 @@ func (ag *aggrGroup) String() string {
436503
}
437504

438505
func (ag *aggrGroup) run(nf notifyFunc) {
506+
ag.running.Store(true)
439507
defer close(ag.done)
440508
defer ag.next.Stop()
441509

@@ -462,10 +530,7 @@ func (ag *aggrGroup) run(nf notifyFunc) {
462530
ctx = notify.WithRouteID(ctx, ag.routeID)
463531

464532
// Wait the configured interval before calling flush again.
465-
ag.mtx.Lock()
466-
ag.next.Reset(ag.opts.GroupInterval)
467-
ag.hasFlushed = true
468-
ag.mtx.Unlock()
533+
ag.resetTimer(ag.opts.GroupInterval)
469534

470535
ag.flush(func(alerts ...*types.Alert) bool {
471536
return nf(ctx, alerts...)
@@ -486,19 +551,16 @@ func (ag *aggrGroup) stop() {
486551
<-ag.done
487552
}
488553

554+
// resetTimer resets the timer for the AG.
555+
func (ag *aggrGroup) resetTimer(t time.Duration) {
556+
ag.next.Reset(t)
557+
}
558+
489559
// insert inserts the alert into the aggregation group.
490560
func (ag *aggrGroup) insert(alert *types.Alert) {
491561
if err := ag.alerts.Set(alert); err != nil {
492562
ag.logger.Error("error on set alert", "err", err)
493563
}
494-
495-
// Immediately trigger a flush if the wait duration for this
496-
// alert is already over.
497-
ag.mtx.Lock()
498-
defer ag.mtx.Unlock()
499-
if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) {
500-
ag.next.Reset(0)
501-
}
502564
}
503565

504566
func (ag *aggrGroup) empty() bool {

0 commit comments

Comments
 (0)