Skip to content

Commit d284d30

Browse files
committed
fix(events): aggregate rate limit logging to reduce log spam (#9)
Replace per-event rate limit warnings with periodic aggregated summaries. Problem: - Event rate limit logs generated 342,000+ warnings over 2.5 days - 3 duplicate warnings per dropped event (event_manager.go x2, exporter.go x1) Solution: - Track dropped events by reason in EventManager - Log single periodic summary during cleanup cycle (every minute) - Remove duplicate per-event warnings from CreateEvent and callers Before (40 dropped events = 120 warnings): [WARN] Event rate limit exceeded (10/min), dropping event: PeerUnreachable [WARN] Failed to create event from status: event rate limit exceeded ... (repeated 40x) After (40 dropped events = 1 summary): [WARN] Dropped 40 events due to rate limiting (10/min limit). Breakdown: PeerUnreachable=35, HighLatency=5 Tested on lab cluster (3 nodes) and a1-ops-prd (10 nodes).
1 parent d1e3a67 commit d284d30

File tree

5 files changed

+297
-7
lines changed

5 files changed

+297
-7
lines changed

.version-rc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
v1.5.3
1+
v1.5.4

deployment/daemonset.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ spec:
123123

124124
containers:
125125
- name: node-doctor
126-
image: supporttools/node-doctor:v1.5.3
126+
image: supporttools/node-doctor:v1.5.4
127127
imagePullPolicy: IfNotPresent
128128

129129
# Command and arguments

pkg/exporters/kubernetes/event_manager.go

Lines changed: 85 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ type EventManager struct {
2525
stopCh chan struct{}
2626
stopped bool
2727
wg sync.WaitGroup
28+
29+
// Dropped event tracking for aggregated logging (Issue #9)
30+
droppedByReason map[string]int // Count of dropped events by reason
31+
totalDropped int // Total dropped events since last summary
2832
}
2933

3034
// NewEventManager creates a new event manager with the specified configuration
@@ -48,6 +52,8 @@ func NewEventManager(client *K8sClient, config *types.KubernetesExporterConfig)
4852
deduplicationTTL: deduplicationTTL,
4953
cleanupInterval: time.Minute, // Clean up every minute
5054
stopCh: make(chan struct{}),
55+
droppedByReason: make(map[string]int),
56+
totalDropped: 0,
5157
}
5258

5359
return manager
@@ -81,6 +87,9 @@ func (em *EventManager) Stop() {
8187
close(em.stopCh)
8288
em.stopped = true
8389

90+
// Flush any pending dropped events summary before shutdown
91+
em.logDroppedEventsSummaryLocked()
92+
8493
// We need to wait for goroutines while holding the lock to prevent race conditions
8594
// We can't unlock and relock because Start() could be called concurrently
8695
// Instead, we'll use a separate mechanism to wait for completion
@@ -113,8 +122,7 @@ func (em *EventManager) CreateEvent(ctx context.Context, event corev1.Event) err
113122

114123
// Check rate limiting only for non-duplicate events
115124
if !em.checkRateLimit() {
116-
log.Printf("[WARN] Event rate limit exceeded (%d/min), dropping event: %s",
117-
em.maxEventsPerMin, event.Reason)
125+
em.trackDroppedEvent(event.Reason)
118126
return fmt.Errorf("event rate limit exceeded")
119127
}
120128

@@ -143,7 +151,7 @@ func (em *EventManager) CreateEventsFromStatus(ctx context.Context, status *type
143151
for _, event := range events {
144152
err := em.CreateEvent(ctx, event)
145153
if err != nil {
146-
log.Printf("[WARN] Failed to create event from status: %v", err)
154+
// Don't log per-event warnings - aggregated summary is logged periodically
147155
lastErr = err
148156
droppedCount++
149157
} else {
@@ -266,6 +274,80 @@ func (em *EventManager) cleanup() {
266274
if cleanupCount > 0 {
267275
log.Printf("[DEBUG] Event manager cleanup: removed %d expired cache entries", cleanupCount)
268276
}
277+
278+
// Log dropped events summary during cleanup cycle
279+
em.logDroppedEventsSummaryLocked()
280+
}
281+
282+
// trackDroppedEvent records a dropped event for aggregated logging
283+
func (em *EventManager) trackDroppedEvent(reason string) {
284+
em.mu.Lock()
285+
defer em.mu.Unlock()
286+
287+
em.droppedByReason[reason]++
288+
em.totalDropped++
289+
}
290+
291+
// logDroppedEventsSummaryLocked logs a summary of dropped events (must hold lock)
292+
func (em *EventManager) logDroppedEventsSummaryLocked() {
293+
if em.totalDropped == 0 {
294+
return
295+
}
296+
297+
// Build summary of dropped events by reason
298+
summary := fmt.Sprintf("Dropped %d events due to rate limiting (%d/min limit). Breakdown: ",
299+
em.totalDropped, em.maxEventsPerMin)
300+
301+
// Sort reasons by count (descending) and limit to top 5
302+
type reasonCount struct {
303+
reason string
304+
count int
305+
}
306+
var reasons []reasonCount
307+
for reason, count := range em.droppedByReason {
308+
reasons = append(reasons, reasonCount{reason, count})
309+
}
310+
311+
// Simple sort by count descending
312+
for i := 0; i < len(reasons); i++ {
313+
for j := i + 1; j < len(reasons); j++ {
314+
if reasons[j].count > reasons[i].count {
315+
reasons[i], reasons[j] = reasons[j], reasons[i]
316+
}
317+
}
318+
}
319+
320+
// Build reason breakdown (top 5)
321+
maxReasons := 5
322+
if len(reasons) < maxReasons {
323+
maxReasons = len(reasons)
324+
}
325+
326+
for i := 0; i < maxReasons; i++ {
327+
if i > 0 {
328+
summary += ", "
329+
}
330+
summary += fmt.Sprintf("%s=%d", reasons[i].reason, reasons[i].count)
331+
}
332+
333+
if len(reasons) > 5 {
334+
summary += fmt.Sprintf(", and %d more reason types", len(reasons)-5)
335+
}
336+
337+
log.Printf("[WARN] %s", summary)
338+
339+
// Reset counters
340+
em.droppedByReason = make(map[string]int)
341+
em.totalDropped = 0
342+
}
343+
344+
// FlushDroppedEventsSummary logs any pending dropped events summary
345+
// Call this before shutdown to ensure no dropped events go unreported
346+
func (em *EventManager) FlushDroppedEventsSummary() {
347+
em.mu.Lock()
348+
defer em.mu.Unlock()
349+
350+
em.logDroppedEventsSummaryLocked()
269351
}
270352

271353
// GetStats returns statistics about the event manager

pkg/exporters/kubernetes/event_manager_test.go

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,3 +450,210 @@ func TestEventSignature(t *testing.T) {
450450
t.Error("Same event should produce same signature hash")
451451
}
452452
}
453+
454+
// TestDroppedEventTracking tests that dropped events are tracked instead of logged individually
455+
func TestDroppedEventTracking(t *testing.T) {
456+
manager := createTestEventManager(2, time.Minute) // Limit to 2 events per minute
457+
ctx := context.Background()
458+
459+
// Create events up to the limit
460+
for i := 0; i < 2; i++ {
461+
event := corev1.Event{
462+
ObjectMeta: metav1.ObjectMeta{
463+
Name: fmt.Sprintf("event-%d", i),
464+
Namespace: "test-namespace",
465+
},
466+
InvolvedObject: corev1.ObjectReference{
467+
Kind: "Node",
468+
Name: "test-node",
469+
},
470+
Reason: fmt.Sprintf("Reason%d", i),
471+
Message: fmt.Sprintf("Message %d", i),
472+
Type: corev1.EventTypeNormal,
473+
FirstTimestamp: metav1.NewTime(time.Now()),
474+
LastTimestamp: metav1.NewTime(time.Now()),
475+
Count: 1,
476+
}
477+
manager.CreateEvent(ctx, event)
478+
}
479+
480+
// Verify no dropped events yet
481+
manager.mu.RLock()
482+
droppedBefore := manager.totalDropped
483+
manager.mu.RUnlock()
484+
485+
if droppedBefore != 0 {
486+
t.Errorf("Expected 0 dropped events before rate limiting, got %d", droppedBefore)
487+
}
488+
489+
// Create events that should be rate limited (dropped)
490+
droppedReasons := []string{"DroppedReason1", "DroppedReason2", "DroppedReason1", "DroppedReason3"}
491+
for i, reason := range droppedReasons {
492+
event := corev1.Event{
493+
ObjectMeta: metav1.ObjectMeta{
494+
Name: fmt.Sprintf("dropped-event-%d", i),
495+
Namespace: "test-namespace",
496+
},
497+
InvolvedObject: corev1.ObjectReference{
498+
Kind: "Node",
499+
Name: "test-node",
500+
},
501+
Reason: reason,
502+
Message: fmt.Sprintf("Dropped message %d", i),
503+
Type: corev1.EventTypeWarning,
504+
FirstTimestamp: metav1.NewTime(time.Now()),
505+
LastTimestamp: metav1.NewTime(time.Now()),
506+
Count: 1,
507+
}
508+
err := manager.CreateEvent(ctx, event)
509+
if err == nil {
510+
t.Errorf("Event %d should have been rate limited", i)
511+
}
512+
}
513+
514+
// Verify dropped events are tracked
515+
manager.mu.RLock()
516+
totalDropped := manager.totalDropped
517+
reasonCounts := make(map[string]int)
518+
for k, v := range manager.droppedByReason {
519+
reasonCounts[k] = v
520+
}
521+
manager.mu.RUnlock()
522+
523+
if totalDropped != 4 {
524+
t.Errorf("Expected 4 dropped events, got %d", totalDropped)
525+
}
526+
527+
// Check breakdown by reason
528+
if reasonCounts["DroppedReason1"] != 2 {
529+
t.Errorf("Expected DroppedReason1=2, got %d", reasonCounts["DroppedReason1"])
530+
}
531+
if reasonCounts["DroppedReason2"] != 1 {
532+
t.Errorf("Expected DroppedReason2=1, got %d", reasonCounts["DroppedReason2"])
533+
}
534+
if reasonCounts["DroppedReason3"] != 1 {
535+
t.Errorf("Expected DroppedReason3=1, got %d", reasonCounts["DroppedReason3"])
536+
}
537+
}
538+
539+
// TestFlushDroppedEventsSummary tests that FlushDroppedEventsSummary resets counters
540+
func TestFlushDroppedEventsSummary(t *testing.T) {
541+
manager := createTestEventManager(1, time.Minute) // Limit to 1 event per minute
542+
ctx := context.Background()
543+
544+
// Create one event to consume the rate limit
545+
event := corev1.Event{
546+
ObjectMeta: metav1.ObjectMeta{
547+
Name: "initial-event",
548+
Namespace: "test-namespace",
549+
},
550+
InvolvedObject: corev1.ObjectReference{
551+
Kind: "Node",
552+
Name: "test-node",
553+
},
554+
Reason: "InitialReason",
555+
Message: "Initial message",
556+
Type: corev1.EventTypeNormal,
557+
FirstTimestamp: metav1.NewTime(time.Now()),
558+
LastTimestamp: metav1.NewTime(time.Now()),
559+
Count: 1,
560+
}
561+
manager.CreateEvent(ctx, event)
562+
563+
// Create dropped events
564+
for i := 0; i < 5; i++ {
565+
droppedEvent := corev1.Event{
566+
ObjectMeta: metav1.ObjectMeta{
567+
Name: fmt.Sprintf("dropped-%d", i),
568+
Namespace: "test-namespace",
569+
},
570+
InvolvedObject: corev1.ObjectReference{
571+
Kind: "Node",
572+
Name: "test-node",
573+
},
574+
Reason: "DroppedReason",
575+
Message: fmt.Sprintf("Dropped %d", i),
576+
Type: corev1.EventTypeWarning,
577+
FirstTimestamp: metav1.NewTime(time.Now()),
578+
LastTimestamp: metav1.NewTime(time.Now()),
579+
Count: 1,
580+
}
581+
manager.CreateEvent(ctx, droppedEvent)
582+
}
583+
584+
// Verify events are tracked
585+
manager.mu.RLock()
586+
beforeFlush := manager.totalDropped
587+
manager.mu.RUnlock()
588+
589+
if beforeFlush != 5 {
590+
t.Errorf("Expected 5 dropped events before flush, got %d", beforeFlush)
591+
}
592+
593+
// Flush the summary
594+
manager.FlushDroppedEventsSummary()
595+
596+
// Verify counters are reset
597+
manager.mu.RLock()
598+
afterFlush := manager.totalDropped
599+
reasonCount := len(manager.droppedByReason)
600+
manager.mu.RUnlock()
601+
602+
if afterFlush != 0 {
603+
t.Errorf("Expected 0 dropped events after flush, got %d", afterFlush)
604+
}
605+
if reasonCount != 0 {
606+
t.Errorf("Expected 0 reason counts after flush, got %d", reasonCount)
607+
}
608+
}
609+
610+
// TestDroppedEventsSummaryDuringCleanup tests that cleanup logs dropped events summary
611+
func TestDroppedEventsSummaryDuringCleanup(t *testing.T) {
612+
manager := createTestEventManager(1, time.Minute)
613+
ctx := context.Background()
614+
615+
// Create one event to consume rate limit
616+
event := corev1.Event{
617+
ObjectMeta: metav1.ObjectMeta{
618+
Name: "event",
619+
Namespace: "test-namespace",
620+
},
621+
InvolvedObject: corev1.ObjectReference{
622+
Kind: "Node",
623+
Name: "test-node",
624+
},
625+
Reason: "Reason",
626+
Message: "Message",
627+
Type: corev1.EventTypeNormal,
628+
FirstTimestamp: metav1.NewTime(time.Now()),
629+
LastTimestamp: metav1.NewTime(time.Now()),
630+
Count: 1,
631+
}
632+
manager.CreateEvent(ctx, event)
633+
634+
// Track some dropped events
635+
manager.trackDroppedEvent("TestReason1")
636+
manager.trackDroppedEvent("TestReason1")
637+
manager.trackDroppedEvent("TestReason2")
638+
639+
// Verify tracking
640+
manager.mu.RLock()
641+
beforeCleanup := manager.totalDropped
642+
manager.mu.RUnlock()
643+
644+
if beforeCleanup != 3 {
645+
t.Errorf("Expected 3 dropped events before cleanup, got %d", beforeCleanup)
646+
}
647+
648+
// Run cleanup (which includes logDroppedEventsSummaryLocked)
649+
manager.cleanup()
650+
651+
// Verify counters are reset after cleanup
652+
manager.mu.RLock()
653+
afterCleanup := manager.totalDropped
654+
manager.mu.RUnlock()
655+
656+
if afterCleanup != 0 {
657+
t.Errorf("Expected 0 dropped events after cleanup, got %d", afterCleanup)
658+
}
659+
}

pkg/exporters/kubernetes/exporter.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,8 @@ func (ke *KubernetesExporter) ExportStatus(ctx context.Context, status *types.St
159159
// Export events
160160
if len(status.Events) > 0 {
161161
if err := ke.eventManager.CreateEventsFromStatus(ctx, status); err != nil {
162-
log.Printf("[WARN] Failed to create events from status: %v", err)
162+
// Don't log here - EventManager logs aggregated summaries periodically
163+
// Some events may have succeeded even if others were rate limited
163164
lastErr = err
164165
} else {
165166
ke.updateStats(func(s *ExporterStats) {
@@ -220,7 +221,7 @@ func (ke *KubernetesExporter) ExportProblem(ctx context.Context, problem *types.
220221

221222
// Create event from problem
222223
if err := ke.eventManager.CreateEventFromProblem(ctx, problem); err != nil {
223-
log.Printf("[WARN] Failed to create event from problem: %v", err)
224+
// Don't log here - EventManager logs aggregated summaries periodically
224225
lastErr = err
225226
} else {
226227
ke.updateStats(func(s *ExporterStats) {

0 commit comments

Comments
 (0)