Skip to content

Commit f7a0d01

Browse files
authored
Merge pull request #4438 from siavashs/marker-gc
fix(marker): stop state leakage from aggregation groups
2 parents 8479a85 + d7f2c92 commit f7a0d01

File tree

2 files changed

+209
-5
lines changed

2 files changed

+209
-5
lines changed

dispatch/dispatch.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
340340
return
341341
}
342342

343-
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
343+
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.marker.(types.AlertMarker), d.logger)
344344
routeGroups[fp] = ag
345345
d.aggrGroupsNum++
346346
d.metrics.aggrGroups.Inc()
@@ -389,6 +389,7 @@ type aggrGroup struct {
389389
routeKey string
390390

391391
alerts *store.Alerts
392+
marker types.AlertMarker
392393
ctx context.Context
393394
cancel func()
394395
done chan struct{}
@@ -400,7 +401,7 @@ type aggrGroup struct {
400401
}
401402

402403
// newAggrGroup returns a new aggregation group.
403-
func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, logger *slog.Logger) *aggrGroup {
404+
func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, marker types.AlertMarker, logger *slog.Logger) *aggrGroup {
404405
if to == nil {
405406
to = func(d time.Duration) time.Duration { return d }
406407
}
@@ -411,6 +412,7 @@ func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(
411412
opts: &r.RouteOpts,
412413
timeout: to,
413414
alerts: store.NewAlerts(),
415+
marker: marker,
414416
done: make(chan struct{}),
415417
}
416418
ag.ctx, ag.cancel = context.WithCancel(ctx)
@@ -539,6 +541,14 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
539541
// we would delete an active alert thinking it was resolved.
540542
if err := ag.alerts.DeleteIfNotModified(resolvedSlice); err != nil {
541543
ag.logger.Error("error on delete alerts", "err", err)
544+
} else {
545+
// Delete markers for resolved alerts that are not in the store.
546+
for _, alert := range resolvedSlice {
547+
_, err := ag.alerts.Get(alert.Fingerprint())
548+
if errors.Is(err, store.ErrNotFound) {
549+
ag.marker.Delete(alert.Fingerprint())
550+
}
551+
}
542552
}
543553
}
544554
}

dispatch/dispatch_test.go

Lines changed: 197 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func TestAggrGroup(t *testing.T) {
141141
}
142142

143143
// Test regular situation where we wait for group_wait to send out alerts.
144-
ag := newAggrGroup(context.Background(), lset, route, nil, promslog.NewNopLogger())
144+
ag := newAggrGroup(context.Background(), lset, route, nil, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger())
145145
go ag.run(ntfy)
146146

147147
ag.insert(a1)
@@ -195,7 +195,7 @@ func TestAggrGroup(t *testing.T) {
195195
// immediate flushing.
196196
// Finally, set all alerts to be resolved. After successful notify the aggregation group
197197
// should empty itself.
198-
ag = newAggrGroup(context.Background(), lset, route, nil, promslog.NewNopLogger())
198+
ag = newAggrGroup(context.Background(), lset, route, nil, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger())
199199
go ag.run(ntfy)
200200

201201
ag.insert(a1)
@@ -757,7 +757,7 @@ func TestDispatcher_DoMaintenance(t *testing.T) {
757757

758758
// Insert an aggregation group with no alerts.
759759
labels := model.LabelSet{"alertname": "1"}
760-
aggrGroup1 := newAggrGroup(ctx, labels, route, timeout, promslog.NewNopLogger())
760+
aggrGroup1 := newAggrGroup(ctx, labels, route, timeout, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger())
761761
aggrGroups[route][aggrGroup1.fingerprint()] = aggrGroup1
762762
dispatcher.aggrGroupsPerRoute = aggrGroups
763763
// Must run otherwise doMaintenance blocks on aggrGroup1.stop().
@@ -775,3 +775,197 @@ func TestDispatcher_DoMaintenance(t *testing.T) {
775775
require.False(t, isMuted)
776776
require.Empty(t, mutedBy)
777777
}
778+
779+
func TestDispatcher_DeleteResolvedAlertsFromMarker(t *testing.T) {
780+
t.Run("successful flush deletes markers for resolved alerts", func(t *testing.T) {
781+
ctx := context.Background()
782+
marker := types.NewMarker(prometheus.NewRegistry())
783+
labels := model.LabelSet{"alertname": "TestAlert"}
784+
route := &Route{
785+
RouteOpts: RouteOpts{
786+
Receiver: "test",
787+
GroupBy: map[model.LabelName]struct{}{"alertname": {}},
788+
GroupWait: 0,
789+
GroupInterval: time.Minute,
790+
RepeatInterval: time.Hour,
791+
},
792+
}
793+
timeout := func(d time.Duration) time.Duration { return d }
794+
logger := promslog.NewNopLogger()
795+
796+
// Create an aggregation group
797+
ag := newAggrGroup(ctx, labels, route, timeout, marker, logger)
798+
799+
// Create test alerts: one active and one resolved
800+
now := time.Now()
801+
activeAlert := &types.Alert{
802+
Alert: model.Alert{
803+
Labels: model.LabelSet{
804+
"alertname": "TestAlert",
805+
"instance": "1",
806+
},
807+
StartsAt: now.Add(-time.Hour),
808+
EndsAt: now.Add(time.Hour), // Active alert
809+
},
810+
UpdatedAt: now,
811+
}
812+
resolvedAlert := &types.Alert{
813+
Alert: model.Alert{
814+
Labels: model.LabelSet{
815+
"alertname": "TestAlert",
816+
"instance": "2",
817+
},
818+
StartsAt: now.Add(-time.Hour),
819+
EndsAt: now.Add(-time.Minute), // Resolved alert
820+
},
821+
UpdatedAt: now,
822+
}
823+
824+
// Insert alerts into the aggregation group
825+
ag.insert(activeAlert)
826+
ag.insert(resolvedAlert)
827+
828+
// Set markers for both alerts
829+
marker.SetActiveOrSilenced(activeAlert.Fingerprint(), 0, nil, nil)
830+
marker.SetActiveOrSilenced(resolvedAlert.Fingerprint(), 0, nil, nil)
831+
832+
// Verify markers exist before flush
833+
require.True(t, marker.Active(activeAlert.Fingerprint()))
834+
require.True(t, marker.Active(resolvedAlert.Fingerprint()))
835+
836+
// Create a notify function that succeeds
837+
notifyFunc := func(alerts ...*types.Alert) bool {
838+
return true
839+
}
840+
841+
// Flush the alerts
842+
ag.flush(notifyFunc)
843+
844+
// Verify that the resolved alert's marker was deleted
845+
require.True(t, marker.Active(activeAlert.Fingerprint()), "active alert marker should still exist")
846+
require.False(t, marker.Active(resolvedAlert.Fingerprint()), "resolved alert marker should be deleted")
847+
})
848+
849+
t.Run("failed flush does not delete markers", func(t *testing.T) {
850+
ctx := context.Background()
851+
marker := types.NewMarker(prometheus.NewRegistry())
852+
labels := model.LabelSet{"alertname": "TestAlert"}
853+
route := &Route{
854+
RouteOpts: RouteOpts{
855+
Receiver: "test",
856+
GroupBy: map[model.LabelName]struct{}{"alertname": {}},
857+
GroupWait: 0,
858+
GroupInterval: time.Minute,
859+
RepeatInterval: time.Hour,
860+
},
861+
}
862+
timeout := func(d time.Duration) time.Duration { return d }
863+
logger := promslog.NewNopLogger()
864+
865+
// Create an aggregation group
866+
ag := newAggrGroup(ctx, labels, route, timeout, marker, logger)
867+
868+
// Create a resolved alert
869+
now := time.Now()
870+
resolvedAlert := &types.Alert{
871+
Alert: model.Alert{
872+
Labels: model.LabelSet{
873+
"alertname": "TestAlert",
874+
"instance": "1",
875+
},
876+
StartsAt: now.Add(-time.Hour),
877+
EndsAt: now.Add(-time.Minute), // Resolved alert
878+
},
879+
UpdatedAt: now,
880+
}
881+
882+
// Insert alert into the aggregation group
883+
ag.insert(resolvedAlert)
884+
885+
// Set marker for the alert
886+
marker.SetActiveOrSilenced(resolvedAlert.Fingerprint(), 0, nil, nil)
887+
888+
// Verify marker exists before flush
889+
require.True(t, marker.Active(resolvedAlert.Fingerprint()))
890+
891+
// Create a notify function that fails
892+
notifyFunc := func(alerts ...*types.Alert) bool {
893+
return false
894+
}
895+
896+
// Flush the alerts (notify will fail)
897+
ag.flush(notifyFunc)
898+
899+
// Verify that the marker was NOT deleted due to failed notification
900+
require.True(t, marker.Active(resolvedAlert.Fingerprint()), "marker should not be deleted when notify fails")
901+
})
902+
903+
t.Run("markers not deleted when alert is modified during flush", func(t *testing.T) {
904+
ctx := context.Background()
905+
marker := types.NewMarker(prometheus.NewRegistry())
906+
labels := model.LabelSet{"alertname": "TestAlert"}
907+
route := &Route{
908+
RouteOpts: RouteOpts{
909+
Receiver: "test",
910+
GroupBy: map[model.LabelName]struct{}{"alertname": {}},
911+
GroupWait: 0,
912+
GroupInterval: time.Minute,
913+
RepeatInterval: time.Hour,
914+
},
915+
}
916+
timeout := func(d time.Duration) time.Duration { return d }
917+
logger := promslog.NewNopLogger()
918+
919+
// Create an aggregation group
920+
ag := newAggrGroup(ctx, labels, route, timeout, marker, logger)
921+
922+
// Create a resolved alert
923+
now := time.Now()
924+
resolvedAlert := &types.Alert{
925+
Alert: model.Alert{
926+
Labels: model.LabelSet{
927+
"alertname": "TestAlert",
928+
"instance": "1",
929+
},
930+
StartsAt: now.Add(-time.Hour),
931+
EndsAt: now.Add(-time.Minute), // Resolved alert
932+
},
933+
UpdatedAt: now,
934+
}
935+
936+
// Insert alert into the aggregation group
937+
ag.insert(resolvedAlert)
938+
939+
// Set marker for the alert
940+
marker.SetActiveOrSilenced(resolvedAlert.Fingerprint(), 0, nil, nil)
941+
942+
// Verify marker exists before flush
943+
require.True(t, marker.Active(resolvedAlert.Fingerprint()))
944+
945+
// Create a notify function that modifies the alert before returning
946+
notifyFunc := func(alerts ...*types.Alert) bool {
947+
// Simulate the alert being modified (e.g., firing again) during flush
948+
modifiedAlert := &types.Alert{
949+
Alert: model.Alert{
950+
Labels: model.LabelSet{
951+
"alertname": "TestAlert",
952+
"instance": "1",
953+
},
954+
StartsAt: now.Add(-time.Hour),
955+
EndsAt: now.Add(time.Hour), // Active again
956+
},
957+
UpdatedAt: now.Add(time.Second), // More recent update
958+
}
959+
// Update the alert in the store
960+
ag.alerts.Set(modifiedAlert)
961+
return true
962+
}
963+
964+
// Flush the alerts
965+
ag.flush(notifyFunc)
966+
967+
// Verify that the marker was NOT deleted because the alert was modified
968+
// during the flush (DeleteIfNotModified should have failed)
969+
require.True(t, marker.Active(resolvedAlert.Fingerprint()), "marker should not be deleted when alert is modified during flush")
970+
})
971+
}

0 commit comments

Comments
 (0)