Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,11 @@ func (r *Route) UnmarshalYAML(unmarshal func(any) error) error {
return nil
}

type Source struct {
SrcMatchers Matchers `yaml:"matchers,omitempty" json:"matchers,omitempty"`
Equal []string `yaml:"equal,omitempty" json:"equal,omitempty"`
}

// InhibitRule defines an inhibition rule that mutes alerts that match the
// target labels if an alert matching the source labels exists.
// Both alerts have to have a set of labels being equal.
Expand All @@ -973,6 +978,8 @@ type InhibitRule struct {
SourceMatchRE MatchRegexps `yaml:"source_match_re,omitempty" json:"source_match_re,omitempty"`
// SourceMatchers defines a set of label matchers that have to be fulfilled for source alerts.
SourceMatchers Matchers `yaml:"source_matchers,omitempty" json:"source_matchers,omitempty"`
// Sources defines a set of source matchers and equal labels.
Sources []Source `yaml:"source,omitempty" json:"source,omitempty"`
// TargetMatch defines a set of labels that have to equal the given
// value for target alerts. Deprecated. Remove before v1.0 release.
TargetMatch map[string]string `yaml:"target_match,omitempty" json:"target_match,omitempty"`
Expand Down
256 changes: 220 additions & 36 deletions inhibit/inhibit.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package inhibit
import (
"context"
"log/slog"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -86,23 +87,41 @@ func (ih *Inhibitor) run(ctx context.Context) {
// Update the inhibition rules' cache.
cachedSum := 0
indexedSum := 0
cached := 0
indexed := 0
for _, r := range ih.rules {
if r.SourceMatchers.Matches(a.Labels) {
if err := r.scache.Set(a); err != nil {
ih.logger.Error("error on set alert", "err", err)
continue
if len(r.Sources) > 0 {
cached = 0
indexed = 0
for _, src := range r.Sources {
if src.SrcMatchers.Matches(a.Labels) {
if err := src.scache.Set(a); err != nil {
ih.logger.Error("error on set alert", "err", err)
continue
}
src.updateIndex(a)
cached += src.scache.Len()
indexed += src.sindex.Len()
break
}
}
r.updateIndex(a)

}
cached := r.scache.Len()
indexed := r.sindex.Len()
} else {
if r.SourceMatchers.Matches(a.Labels) {
if err := r.scache.Set(a); err != nil {
ih.logger.Error("error on set alert", "err", err)
continue
}
r.updateIndex(a)
}
cached = r.scache.Len()
indexed = r.sindex.Len()

}
if r.Name != "" {
r.metrics.sourceAlertsCacheItems.With(prometheus.Labels{"rule": r.Name}).Set(float64(cached))
r.metrics.sourceAlertsIndexItems.With(prometheus.Labels{"rule": r.Name}).Set(float64(indexed))
}

cachedSum += cached
indexedSum += indexed
}
Expand Down Expand Up @@ -169,23 +188,70 @@ func (ih *Inhibitor) Mutes(lset model.LabelSet) bool {
r.metrics.matchesDurationMatched.Observe(time.Since(ruleStart).Seconds())
// If we are here, the target side matches. If the source side matches, too, we
// need to exclude inhibiting alerts for which the same is true.
if inhibitedByFP, eq := r.hasEqual(lset, r.SourceMatchers.Matches(lset), ruleStart); eq {
ih.marker.SetInhibited(fp, inhibitedByFP.String())
now := time.Now()
sinceStart := now.Sub(start)
sinceRuleStart := now.Sub(ruleStart)
ih.metrics.mutesDurationMuted.Observe(sinceStart.Seconds())
r.metrics.mutesDurationMuted.Observe(sinceRuleStart.Seconds())
return true

if len(r.Sources) > 0 {
var inhibitorIDs []string
for _, source := range r.Sources {
if inhibitedByFP, eq := source.hasEqual(lset, source.SrcMatchers.Matches(lset), ruleStart, r.TargetMatchers); eq && !source.foundMatch {
inhibitorIDs = append(inhibitorIDs, inhibitedByFP.String())
source.foundMatch = true
}
}
if allSourcesMatched := r.allSourcesSatisfied(); allSourcesMatched {
compositeInhibitorID := strings.Join(inhibitorIDs, ",")
ih.marker.SetInhibited(fp, compositeInhibitorID)
now := time.Now()
sinceStart := now.Sub(start)
sinceRuleStart := now.Sub(ruleStart)
ih.metrics.mutesDurationMuted.Observe(sinceStart.Seconds())
r.metrics.mutesDurationMuted.Observe(sinceRuleStart.Seconds())
return true
}
// Reset for next use.
for _, source := range r.Sources {
source.foundMatch = false
}

} else {
if inhibitedByFP, eq := r.hasEqual(lset, r.SourceMatchers.Matches(lset), ruleStart); eq {
ih.marker.SetInhibited(fp, inhibitedByFP.String())
now := time.Now()
sinceStart := now.Sub(start)
sinceRuleStart := now.Sub(ruleStart)
ih.metrics.mutesDurationMuted.Observe(sinceStart.Seconds())
r.metrics.mutesDurationMuted.Observe(sinceRuleStart.Seconds())
return true
}

}
r.metrics.mutesDurationNotMuted.Observe(time.Since(ruleStart).Seconds())
}

ih.marker.SetInhibited(fp)
ih.metrics.mutesDurationNotMuted.Observe(time.Since(start).Seconds())

return false
}

type Source struct {
// The set of Filters which define the group of source alerts (which inhibit
// the target alerts).
SrcMatchers labels.Matchers
// A set of label names whose label values need to be identical in source and
// target alerts in order for the inhibition to take effect.
Equal map[model.LabelName]struct{}
// Cache of alerts matching source labels.
scache *store.Alerts

// Index of fingerprints of source alert equal labels to fingerprint of source alert.
// The index helps speed up source alert lookups from scache significantely in scenarios with 100s of source alerts cached.
// The index items might overwrite eachother if multiple source alerts have exact equal labels.
// Overwrites only happen if the new source alert has bigger EndsAt value.
sindex *index

foundMatch bool
}

// An InhibitRule specifies that a class of (source) alerts should inhibit
// notifications for another class of (target) alerts if all specified matching
// labels are equal between the two alerts. This may be used to inhibit alerts
Expand All @@ -197,6 +263,7 @@ type InhibitRule struct {
// The set of Filters which define the group of source alerts (which inhibit
// the target alerts).
SourceMatchers labels.Matchers
Sources []*Source
// The set of Filters which define the group of target alerts (which are
// inhibited by the source alerts).
TargetMatchers labels.Matchers
Expand All @@ -219,30 +286,49 @@ type InhibitRule struct {
// NewInhibitRule returns a new InhibitRule based on a configuration definition.
func NewInhibitRule(cr config.InhibitRule, metrics *RuleMetrics) *InhibitRule {
var (
sources []*Source
sourcem labels.Matchers
targetm labels.Matchers
)

// cr.SourceMatch will be deprecated. This for loop appends regex matchers.
for ln, lv := range cr.SourceMatch {
matcher, err := labels.NewMatcher(labels.MatchEqual, ln, lv)
if err != nil {
// This error must not happen because the config already validates the yaml.
panic(err)
if len(cr.Sources) > 0 {
for _, sm := range cr.Sources {
var sourcesm labels.Matchers
sourcesm = append(sourcesm, sm.SrcMatchers...)
equal := map[model.LabelName]struct{}{}
for _, ln := range sm.Equal {
equal[model.LabelName(ln)] = struct{}{}
}
sources = append(sources, &Source{
SrcMatchers: sourcesm,
Equal: equal,
scache: store.NewAlerts(),
sindex: newIndex(),
})
}
sourcem = append(sourcem, matcher)
}
// cr.SourceMatchRE will be deprecated. This for loop appends regex matchers.
for ln, lv := range cr.SourceMatchRE {
matcher, err := labels.NewMatcher(labels.MatchRegexp, ln, lv.String())
if err != nil {
// This error must not happen because the config already validates the yaml.
panic(err)
} else {

// cr.SourceMatch will be deprecated. This for loop appends regex matchers.
for ln, lv := range cr.SourceMatch {
matcher, err := labels.NewMatcher(labels.MatchEqual, ln, lv)
if err != nil {
// This error must not happen because the config already validates the yaml.
panic(err)
}
sourcem = append(sourcem, matcher)
}
// cr.SourceMatchRE will be deprecated. This for loop appends regex matchers.
for ln, lv := range cr.SourceMatchRE {
matcher, err := labels.NewMatcher(labels.MatchRegexp, ln, lv.String())
if err != nil {
// This error must not happen because the config already validates the yaml.
panic(err)
}
sourcem = append(sourcem, matcher)
}
sourcem = append(sourcem, matcher)
// We append the new-style matchers. This can be simplified once the deprecated matcher syntax is removed.
sourcem = append(sourcem, cr.SourceMatchers...)
}
// We append the new-style matchers. This can be simplified once the deprecated matcher syntax is removed.
sourcem = append(sourcem, cr.SourceMatchers...)

// cr.TargetMatch will be deprecated. This for loop appends regex matchers.
for ln, lv := range cr.TargetMatch {
Expand Down Expand Up @@ -278,6 +364,7 @@ func NewInhibitRule(cr config.InhibitRule, metrics *RuleMetrics) *InhibitRule {
scache: store.NewAlerts(),
sindex: newIndex(),
metrics: metrics,
Sources: sources,
}

rule.scache.SetGCCallback(rule.gcCallback)
Expand All @@ -291,6 +378,15 @@ func (r *InhibitRule) fingerprintEquals(lset model.LabelSet) model.Fingerprint {
for n := range r.Equal {
equalSet[n] = lset[n]
}

return equalSet.Fingerprint()
}

func (s *Source) fingerprintEquals(lset model.LabelSet) model.Fingerprint {
equalSet := model.LabelSet{}
for n := range s.Equal {
equalSet[n] = lset[n]
}
return equalSet.Fingerprint()
}

Expand Down Expand Up @@ -328,6 +424,39 @@ func (r *InhibitRule) updateIndex(alert *types.Alert) {
// If the existing alert resolves after the new alert, do nothing.
}

func (src *Source) updateIndex(alert *types.Alert) {
fp := alert.Fingerprint()
// Calculate source labelset subset which is in equals.
eq := src.fingerprintEquals(alert.Labels)

// Check if the equal labelset is already in the index.
indexed, ok := src.sindex.Get(eq)
if !ok {
// If not, add it.
src.sindex.Set(eq, fp)
return
}
// If the indexed fingerprint is the same as the new fingerprint, do nothing.
if indexed == fp {
return
}

// New alert and existing index are not the same, compare them.
existing, err := src.scache.Get(indexed)
if err != nil {
// failed to get the existing alert, overwrite the index.
src.sindex.Set(eq, fp)
return
}

// If the new alert resolves after the existing alert, replace the index.
if existing.ResolvedAt(alert.EndsAt) {
src.sindex.Set(eq, fp)
return
}
// If the existing alert resolves after the new alert, do nothing.
}

// findEqualSourceAlert returns the source alert that matches the equal labels of the given label set.
func (r *InhibitRule) findEqualSourceAlert(lset model.LabelSet, now time.Time) (*types.Alert, bool) {
equalsFP := r.fingerprintEquals(lset)
Expand All @@ -348,10 +477,40 @@ func (r *InhibitRule) findEqualSourceAlert(lset model.LabelSet, now time.Time) (
return nil, false
}

func (s *Source) findEqualSourceAlert(lset model.LabelSet, now time.Time) (*types.Alert, bool) {
equalsFP := s.fingerprintEquals(lset)
sourceFP, ok := s.sindex.Get(equalsFP)
if ok {
alert, err := s.scache.Get(sourceFP)
if err != nil {
return nil, false
}

if alert.ResolvedAt(now) {
return nil, false
}

return alert, true
}

return nil, false
}

func (r *InhibitRule) gcCallback(alerts []types.Alert) {
for _, a := range alerts {
fp := r.fingerprintEquals(a.Labels)
r.sindex.Delete(fp)
if len(r.Sources) > 0 {
for _, src := range r.Sources {
if src.SrcMatchers.Matches(a.Labels) {
fp := src.fingerprintEquals(a.Labels)
src.sindex.Delete(fp)

break
}
}
} else {
fp := r.fingerprintEquals(a.Labels)
r.sindex.Delete(fp)
}
}
if r.Name != "" {
r.metrics.sourceAlertsCacheItems.With(prometheus.Labels{"rule": r.Name}).Set(float64(r.scache.Len()))
Expand All @@ -374,3 +533,28 @@ func (r *InhibitRule) hasEqual(lset model.LabelSet, excludeTwoSidedMatch bool, n

return model.Fingerprint(0), false
}

func (s *Source) hasEqual(lset model.LabelSet, excludeTwoSidedMatch bool, now time.Time, targetMatchers labels.Matchers) (model.Fingerprint, bool) {
equal, found := s.findEqualSourceAlert(lset, now)
if found {
if excludeTwoSidedMatch && targetMatchers.Matches(equal.Labels) {
return model.Fingerprint(0), false
}
return equal.Fingerprint(), found
}

return model.Fingerprint(0), false
}

func (r *InhibitRule) allSourcesSatisfied() bool {
for _, source := range r.Sources {
if !source.foundMatch {
return false
}
}
// Reset for next use.
for _, source := range r.Sources {
source.foundMatch = false
}
return true
}
16 changes: 12 additions & 4 deletions inhibit/inhibit_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,12 @@ func allRulesMatchBenchmark(b *testing.B, numInhibitionRules, numInhibitingAlert
n: numInhibitionRules,
newRuleFunc: func(idx int) config.InhibitRule {
return config.InhibitRule{
SourceMatchers: config.Matchers{
mustNewMatcher(b, labels.MatchEqual, "src", strconv.Itoa(idx)),
Sources: []config.Source{
{
SrcMatchers: config.Matchers{
mustNewMatcher(b, labels.MatchEqual, "src", strconv.Itoa(idx)),
},
},
},
TargetMatchers: config.Matchers{
mustNewMatcher(b, labels.MatchEqual, "dst", "0"),
Expand Down Expand Up @@ -152,8 +156,12 @@ func lastRuleMatchesBenchmark(b *testing.B, n int) benchmarkOptions {
n: n,
newRuleFunc: func(idx int) config.InhibitRule {
return config.InhibitRule{
SourceMatchers: config.Matchers{
mustNewMatcher(b, labels.MatchEqual, "src", strconv.Itoa(idx)),
Sources: []config.Source{
{
SrcMatchers: config.Matchers{
mustNewMatcher(b, labels.MatchEqual, "src", strconv.Itoa(idx)),
},
},
},
TargetMatchers: config.Matchers{
mustNewMatcher(b, labels.MatchEqual, "dst", "0"),
Expand Down
Loading