Skip to content

Commit 0247eba

Browse files
alxricsiavashs
authored andcommitted
feat(dispatch): add start delay
This change adds a new cmd flag `--dispatch.start-delay` which corresponds to the `--rules.alert.resend-delay` flag in Prometheus. This flag controls the minimum amount of time that Prometheus waits before resending an alert to Alertmanager. By adding this value to the start time of Alertmanager, we delay the aggregation groups' first flush, until we are confident all alerts are resent by Prometheus instances. This should help avoid race conditions in inhibitions after a (re)start. Other improvements: - remove hasFlushed flag from aggrGroup - remove mutex locking from aggrGroup Signed-off-by: Alexander Rickardsson <[email protected]> Signed-off-by: Siavash Safi <[email protected]>
1 parent 92ecf8b commit 0247eba

File tree

8 files changed

+281
-71
lines changed

8 files changed

+281
-71
lines changed

cmd/alertmanager/main.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ import (
6464
)
6565

6666
var (
67+
startTime = time.Now()
6768
requestDuration = promauto.NewHistogramVec(
6869
prometheus.HistogramOpts{
6970
Name: "alertmanager_http_request_duration_seconds",
@@ -143,6 +144,7 @@ func run() int {
143144
maxSilenceSizeBytes = kingpin.Flag("silences.max-silence-size-bytes", "Maximum silence size in bytes. If negative or zero, no limit is set.").Default("0").Int()
144145
alertGCInterval = kingpin.Flag("alerts.gc-interval", "Interval between alert GC.").Default("30m").Duration()
145146
dispatchMaintenanceInterval = kingpin.Flag("dispatch.maintenance-interval", "Interval between maintenance of aggregation groups in the dispatcher.").Default("30s").Duration()
147+
DispatchStartDelay = kingpin.Flag("dispatch.start-delay", "Minimum amount of time to wait before dispatching alerts. This option should be synced with value of --rules.alert.resend-delay on Prometheus.").Default("0s").Duration()
146148

147149
webConfig = webflag.AddFlags(kingpin.CommandLine, ":9093")
148150
externalURL = kingpin.Flag("web.external-url", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.").String()
@@ -413,7 +415,7 @@ func run() int {
413415
prometheus.DefaultRegisterer,
414416
configLogger,
415417
)
416-
configCoordinator.Subscribe(func(conf *config.Config) error {
418+
configCoordinator.Subscribe(func(conf *config.Config, initial bool) error {
417419
tmpl, err = template.FromGlobs(conf.Templates)
418420
if err != nil {
419421
return fmt.Errorf("failed to parse templates: %w", err)
@@ -491,7 +493,17 @@ func run() int {
491493
silencer.Mutes(labels)
492494
})
493495

494-
disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, *dispatchMaintenanceInterval, nil, logger, dispMetrics)
496+
disp = dispatch.NewDispatcher(
497+
alerts,
498+
routes,
499+
pipeline,
500+
marker,
501+
timeoutFunc,
502+
*dispatchMaintenanceInterval,
503+
nil,
504+
logger,
505+
dispMetrics,
506+
)
495507
routes.Walk(func(r *dispatch.Route) {
496508
if r.RouteOpts.RepeatInterval > *retention {
497509
configLogger.Warn(
@@ -518,7 +530,10 @@ func run() int {
518530
}
519531
})
520532

521-
go disp.Run()
533+
go disp.Run(
534+
startTime.Add(*DispatchStartDelay),
535+
initial, // signal to the dispatcher that this is the initial config load
536+
)
522537
go inhibitor.Run()
523538

524539
return nil

config/coordinator.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ type Coordinator struct {
3030
logger *slog.Logger
3131

3232
// Protects config and subscribers
33-
mutex sync.Mutex
34-
config *Config
35-
subscribers []func(*Config) error
33+
mutex sync.Mutex
34+
config *Config
35+
subscribers []func(*Config, bool) error
36+
initialReload bool
3637

3738
configHashMetric prometheus.Gauge
3839
configSuccessMetric prometheus.Gauge
@@ -46,6 +47,7 @@ func NewCoordinator(configFilePath string, r prometheus.Registerer, l *slog.Logg
4647
c := &Coordinator{
4748
configFilePath: configFilePath,
4849
logger: l,
50+
initialReload: true,
4951
}
5052

5153
c.registerMetrics(r)
@@ -73,7 +75,7 @@ func (c *Coordinator) registerMetrics(r prometheus.Registerer) {
7375
}
7476

7577
// Subscribe subscribes the given Subscribers to configuration changes.
76-
func (c *Coordinator) Subscribe(ss ...func(*Config) error) {
78+
func (c *Coordinator) Subscribe(ss ...func(*Config, bool) error) {
7779
c.mutex.Lock()
7880
defer c.mutex.Unlock()
7981

@@ -82,11 +84,13 @@ func (c *Coordinator) Subscribe(ss ...func(*Config) error) {
8284

8385
func (c *Coordinator) notifySubscribers() error {
8486
for _, s := range c.subscribers {
85-
if err := s(c.config); err != nil {
87+
if err := s(c.config, c.initialReload); err != nil {
8688
return err
8789
}
8890
}
8991

92+
// Set initialReload to false after the first notification.
93+
c.initialReload = false
9094
return nil
9195
}
9296

config/coordinator_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func TestCoordinatorRegistersMetrics(t *testing.T) {
4949
func TestCoordinatorNotifiesSubscribers(t *testing.T) {
5050
callBackCalled := false
5151
c := NewCoordinator("testdata/conf.good.yml", prometheus.NewRegistry(), promslog.NewNopLogger())
52-
c.Subscribe(func(*Config) error {
52+
c.Subscribe(func(*Config, bool) error {
5353
callBackCalled = true
5454
return nil
5555
})
@@ -68,7 +68,7 @@ func TestCoordinatorFailReloadWhenSubscriberFails(t *testing.T) {
6868
errMessage := "something happened"
6969
c := NewCoordinator("testdata/conf.good.yml", prometheus.NewRegistry(), promslog.NewNopLogger())
7070

71-
c.Subscribe(func(*Config) error {
71+
c.Subscribe(func(*Config, bool) error {
7272
return errors.New(errMessage)
7373
})
7474

dispatch/dispatch.go

Lines changed: 77 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ import (
3232
"github.com/prometheus/alertmanager/types"
3333
)
3434

35+
const (
36+
DISPATCHER_STATE_UNKNOWN = iota
37+
DISPATCHER_STATE_STARTING
38+
DISPATCHER_STATE_RUNNING
39+
)
40+
3541
// DispatcherMetrics represents metrics associated to a dispatcher.
3642
type DispatcherMetrics struct {
3743
aggrGroups prometheus.Gauge
@@ -90,6 +96,9 @@ type Dispatcher struct {
9096
cancel func()
9197

9298
logger *slog.Logger
99+
100+
minDispatchTime time.Time
101+
state int
93102
}
94103

95104
// Limits describes limits used by Dispatcher.
@@ -102,39 +111,47 @@ type Limits interface {
102111

103112
// NewDispatcher returns a new Dispatcher.
104113
func NewDispatcher(
105-
ap provider.Alerts,
106-
r *Route,
107-
s notify.Stage,
108-
mk types.GroupMarker,
109-
to func(time.Duration) time.Duration,
110-
mi time.Duration,
111-
lim Limits,
112-
l *slog.Logger,
113-
m *DispatcherMetrics,
114+
alerts provider.Alerts,
115+
route *Route,
116+
stage notify.Stage,
117+
marker types.GroupMarker,
118+
timeout func(time.Duration) time.Duration,
119+
maintenanceInterval time.Duration,
120+
limits Limits,
121+
logger *slog.Logger,
122+
metrics *DispatcherMetrics,
114123
) *Dispatcher {
115-
if lim == nil {
116-
lim = nilLimits{}
124+
if limits == nil {
125+
limits = nilLimits{}
117126
}
118127

119128
disp := &Dispatcher{
120-
alerts: ap,
121-
stage: s,
122-
route: r,
123-
marker: mk,
124-
timeout: to,
125-
maintenanceInterval: mi,
126-
logger: l.With("component", "dispatcher"),
127-
metrics: m,
128-
limits: lim,
129+
alerts: alerts,
130+
stage: stage,
131+
route: route,
132+
marker: marker,
133+
timeout: timeout,
134+
maintenanceInterval: maintenanceInterval,
135+
logger: logger.With("component", "dispatcher"),
136+
metrics: metrics,
137+
limits: limits,
138+
state: DISPATCHER_STATE_STARTING,
129139
}
130140
return disp
131141
}
132142

133143
// Run starts dispatching alerts incoming via the updates channel.
134-
func (d *Dispatcher) Run() {
144+
func (d *Dispatcher) Run(minDispatchTime time.Time, initial bool) {
135145
d.done = make(chan struct{})
136146

137147
d.mtx.Lock()
148+
d.minDispatchTime = minDispatchTime
149+
// If this is not the initial run, set the state to running already.
150+
d.logger.Debug("Starting dispatcher", "initialRun", initial)
151+
if !initial {
152+
d.logger.Debug("Setting dispatcher state to 'running'")
153+
d.state = DISPATCHER_STATE_RUNNING
154+
}
138155
d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{}
139156
d.aggrGroupsNum = 0
140157
d.metrics.aggrGroups.Set(0)
@@ -347,6 +364,31 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
347364
// alert is already there.
348365
ag.insert(alert)
349366

367+
// Check dispatcher and alert state to determine if we should start dispatching the alert.
368+
now := time.Now()
369+
switch d.state {
370+
case DISPATCHER_STATE_STARTING:
371+
if now.After(d.minDispatchTime) {
372+
d.logger.Debug("Setting dispatch state to: running")
373+
d.state = DISPATCHER_STATE_RUNNING
374+
} else {
375+
// Reset timer to the time when the dispatcher will be in running state.
376+
ag.resetTimer(time.Until(d.minDispatchTime))
377+
}
378+
case DISPATCHER_STATE_RUNNING:
379+
if alert.StartsAt.Before(now) {
380+
ag.logger.Debug(
381+
"Alert is old enough for immediate flush, resetting timer to zero",
382+
"alert", alert.Name(),
383+
"fingerprint", alert.Fingerprint(),
384+
"startsAt", alert.StartsAt,
385+
)
386+
ag.resetTimer(0)
387+
}
388+
default:
389+
d.logger.Warn("Unknow dispatcher state", "state", d.state)
390+
}
391+
350392
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
351393
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
352394
if err != nil {
@@ -392,13 +434,17 @@ type aggrGroup struct {
392434
done chan struct{}
393435
next *time.Timer
394436
timeout func(time.Duration) time.Duration
395-
396-
mtx sync.RWMutex
397-
hasFlushed bool
398437
}
399438

400439
// newAggrGroup returns a new aggregation group.
401-
func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, marker types.AlertMarker, logger *slog.Logger) *aggrGroup {
440+
func newAggrGroup(
441+
ctx context.Context,
442+
labels model.LabelSet,
443+
r *Route,
444+
to func(time.Duration) time.Duration,
445+
marker types.AlertMarker,
446+
logger *slog.Logger,
447+
) *aggrGroup {
402448
if to == nil {
403449
to = func(d time.Duration) time.Duration { return d }
404450
}
@@ -462,10 +508,7 @@ func (ag *aggrGroup) run(nf notifyFunc) {
462508
ctx = notify.WithRouteID(ctx, ag.routeID)
463509

464510
// Wait the configured interval before calling flush again.
465-
ag.mtx.Lock()
466-
ag.next.Reset(ag.opts.GroupInterval)
467-
ag.hasFlushed = true
468-
ag.mtx.Unlock()
511+
ag.resetTimer(ag.opts.GroupInterval)
469512

470513
ag.flush(func(alerts ...*types.Alert) bool {
471514
return nf(ctx, alerts...)
@@ -486,19 +529,16 @@ func (ag *aggrGroup) stop() {
486529
<-ag.done
487530
}
488531

532+
// resetTimer resets the timer for the AG.
533+
func (ag *aggrGroup) resetTimer(t time.Duration) {
534+
ag.next.Reset(t)
535+
}
536+
489537
// insert inserts the alert into the aggregation group.
490538
func (ag *aggrGroup) insert(alert *types.Alert) {
491539
if err := ag.alerts.Set(alert); err != nil {
492540
ag.logger.Error("error on set alert", "err", err)
493541
}
494-
495-
// Immediately trigger a flush if the wait duration for this
496-
// alert is already over.
497-
ag.mtx.Lock()
498-
defer ag.mtx.Unlock()
499-
if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) {
500-
ag.next.Reset(0)
501-
}
502542
}
503543

504544
func (ag *aggrGroup) empty() bool {

0 commit comments

Comments
 (0)