Skip to content

Commit d8adf85

Browse files
committed
Merge fix/duplicate-status-processing: Fix duplicate status processing and rate limiting
2 parents 175d4ab + 9637698 commit d8adf85

File tree

5 files changed

+61
-304
lines changed

5 files changed

+61
-304
lines changed

pkg/detector/detector.go

Lines changed: 6 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,9 @@ type ProblemDetector struct {
9696
configFilePath string
9797
monitorFactory MonitorFactory
9898
configChangeCh <-chan struct{}
99-
reloadMutex sync.Mutex // Protects reload operations
100-
started bool
101-
seenProblems map[string]*types.Problem // For deduplication
102-
problemsMutex sync.RWMutex // Protects seenProblems
103-
passedMonitors []types.Monitor // Monitors passed directly to constructor
99+
reloadMutex sync.Mutex // Protects reload operations
100+
started bool
101+
passedMonitors []types.Monitor // Monitors passed directly to constructor
104102
}
105103

106104
// MonitorFactory interface for creating monitor instances during hot reload
@@ -150,7 +148,6 @@ func NewProblemDetector(config *types.NodeDoctorConfig, monitors []types.Monitor
150148
configWatcher: configWatcher,
151149
configFilePath: configFilePath,
152150
monitorFactory: monitorFactory,
153-
seenProblems: make(map[string]*types.Problem),
154151
passedMonitors: monitors, // Store passed monitors to start in Start()
155152
}
156153

@@ -345,19 +342,9 @@ func (pd *ProblemDetector) processStatus(status *types.Status) {
345342
// Update statistics
346343
pd.stats.IncrementStatusesReceived()
347344

348-
// Convert status to problems for testing
349-
problems := pd.statusToProblems(status)
350-
pd.stats.AddProblemsDetected(len(problems))
351-
352-
// Apply deduplication
353-
newProblems := pd.deduplicateProblems(problems)
354-
pd.stats.AddProblemsDeduplicated(len(newProblems))
355-
356-
for _, problem := range newProblems {
357-
pd.processProblem(problem)
358-
}
359-
360-
// Export to all exporters
345+
// Export to all exporters (single path - Status contains all data)
346+
// Note: Previously this also called ExportProblem() for converted problems,
347+
// causing duplicate Kubernetes resources. See GitHub issue #7.
361348
for _, exporter := range pd.exporters {
362349
if err := exporter.ExportStatus(pd.ctx, status); err != nil {
363350
log.Printf("[WARN] Failed to export status to exporter: %v", err)
@@ -368,98 +355,6 @@ func (pd *ProblemDetector) processStatus(status *types.Status) {
368355
}
369356
}
370357

371-
// processProblem processes a problem for testing purposes
372-
func (pd *ProblemDetector) processProblem(problem *types.Problem) {
373-
// Export to all exporters
374-
for _, exporter := range pd.exporters {
375-
if err := exporter.ExportProblem(pd.ctx, problem); err != nil {
376-
log.Printf("[WARN] Failed to export problem to exporter: %v", err)
377-
pd.stats.IncrementExportsFailed()
378-
} else {
379-
pd.stats.IncrementExportsSucceeded()
380-
}
381-
}
382-
}
383-
384-
// statusToProblems converts a status update to problems for testing
385-
func (pd *ProblemDetector) statusToProblems(status *types.Status) []*types.Problem {
386-
var problems []*types.Problem
387-
388-
// Convert conditions to problems
389-
for _, condition := range status.Conditions {
390-
if condition.Status == types.ConditionFalse {
391-
problems = append(problems, &types.Problem{
392-
Type: "condition-" + condition.Type,
393-
Resource: status.Source,
394-
Message: condition.Message,
395-
Severity: types.ProblemCritical, // False conditions are critical problems
396-
DetectedAt: condition.Transition,
397-
Metadata: map[string]string{
398-
"condition": condition.Type,
399-
},
400-
})
401-
}
402-
}
403-
404-
// Convert high severity events to problems
405-
for _, event := range status.Events {
406-
if event.Severity == types.EventError || event.Severity == types.EventWarning {
407-
severity := pd.eventSeverityToProblemSeverity(event.Severity)
408-
problems = append(problems, &types.Problem{
409-
Type: "event-" + event.Reason,
410-
Resource: status.Source,
411-
Message: event.Message,
412-
Severity: severity,
413-
DetectedAt: event.Timestamp,
414-
Metadata: map[string]string{
415-
"event": event.Reason,
416-
},
417-
})
418-
}
419-
}
420-
421-
return problems
422-
}
423-
424-
// deduplicateProblems filters out problems that have already been seen with the same severity
425-
func (pd *ProblemDetector) deduplicateProblems(problems []*types.Problem) []*types.Problem {
426-
pd.problemsMutex.Lock()
427-
defer pd.problemsMutex.Unlock()
428-
429-
var newProblems []*types.Problem
430-
431-
for _, problem := range problems {
432-
// Create a unique key for this problem
433-
key := fmt.Sprintf("%s:%s:%s", problem.Type, problem.Resource, problem.Severity)
434-
435-
// Check if we've seen this exact problem before
436-
if existingProblem, exists := pd.seenProblems[key]; exists {
437-
// If severity is the same, skip (it's a duplicate)
438-
if existingProblem.Severity == problem.Severity {
439-
continue
440-
}
441-
}
442-
443-
// This is a new problem or severity has changed
444-
pd.seenProblems[key] = problem
445-
newProblems = append(newProblems, problem)
446-
}
447-
448-
return newProblems
449-
}
450-
451-
// eventSeverityToProblemSeverity converts EventSeverity to ProblemSeverity
452-
func (pd *ProblemDetector) eventSeverityToProblemSeverity(severity types.EventSeverity) types.ProblemSeverity {
453-
switch severity {
454-
case types.EventError:
455-
return types.ProblemCritical
456-
case types.EventWarning:
457-
return types.ProblemWarning
458-
default:
459-
return types.ProblemInfo
460-
}
461-
}
462-
463358
// fanInFromMonitor reads statuses from a monitor and forwards them to the main status channel
464359
func (pd *ProblemDetector) fanInFromMonitor(ctx context.Context, statusCh <-chan *types.Status, monitorName string) {
465360
log.Printf("[DEBUG] Starting fan-in for monitor %s", monitorName)

pkg/detector/detector_test.go

Lines changed: 3 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -202,145 +202,6 @@ func TestProblemDetector_StatusProcessing(t *testing.T) {
202202
}
203203
}
204204

205-
func TestStatusToProblems_Events(t *testing.T) {
206-
helper := NewTestHelper()
207-
config := helper.CreateTestConfig()
208-
factory := NewMockMonitorFactory()
209-
detector, _ := NewProblemDetector(config, []types.Monitor{}, []types.Exporter{NewMockExporter("test")}, "/tmp/test-config.yaml", factory)
210-
211-
tests := []struct {
212-
name string
213-
status *types.Status
214-
expectedCount int
215-
containsTypes []string // Check for presence, not order
216-
containsSeverities []types.ProblemSeverity
217-
}{
218-
{
219-
name: "error event",
220-
status: func() *types.Status {
221-
status := types.NewStatus("test-source")
222-
status.AddEvent(types.NewEvent(types.EventError, "CriticalError", "Critical error occurred"))
223-
status.AddCondition(types.NewCondition("SystemHealth", types.ConditionFalse, "HealthCheck", "System not healthy"))
224-
return status
225-
}(),
226-
expectedCount: 2, // 1 error event + 1 false condition
227-
containsTypes: []string{"event-CriticalError", "condition-SystemHealth"},
228-
containsSeverities: []types.ProblemSeverity{types.ProblemCritical}, // Both error events and false conditions are Critical
229-
},
230-
{
231-
name: "warning event",
232-
status: func() *types.Status {
233-
status := types.NewStatus("test-source")
234-
status.AddEvent(types.NewEvent(types.EventWarning, "PerformanceWarning", "Warning occurred"))
235-
return status
236-
}(),
237-
expectedCount: 1,
238-
containsTypes: []string{"event-PerformanceWarning"},
239-
containsSeverities: []types.ProblemSeverity{types.ProblemWarning},
240-
},
241-
{
242-
name: "healthy status",
243-
status: func() *types.Status {
244-
status := types.NewStatus("test-source")
245-
status.AddCondition(types.NewCondition("Ready", types.ConditionTrue, "Ready", "Ready"))
246-
return status
247-
}(),
248-
expectedCount: 0, // Info events and true conditions are ignored
249-
},
250-
{
251-
name: "empty status",
252-
status: types.NewStatus("test-source"),
253-
expectedCount: 0,
254-
},
255-
}
256-
257-
for _, tt := range tests {
258-
t.Run(tt.name, func(t *testing.T) {
259-
problems := detector.statusToProblems(tt.status)
260-
261-
if len(problems) != tt.expectedCount {
262-
t.Errorf("statusToProblems() problem count = %d, want %d", len(problems), tt.expectedCount)
263-
}
264-
265-
// Check that all expected types are present
266-
if tt.expectedCount > 0 {
267-
problemTypes := make(map[string]bool)
268-
problemSeverities := make(map[types.ProblemSeverity]bool)
269-
270-
for _, problem := range problems {
271-
problemTypes[problem.Type] = true
272-
problemSeverities[problem.Severity] = true
273-
}
274-
275-
for _, expectedType := range tt.containsTypes {
276-
if !problemTypes[expectedType] {
277-
t.Errorf("statusToProblems() missing expected problem type: %s", expectedType)
278-
}
279-
}
280-
281-
for _, expectedSeverity := range tt.containsSeverities {
282-
if !problemSeverities[expectedSeverity] {
283-
t.Errorf("statusToProblems() missing expected problem severity: %s", expectedSeverity)
284-
}
285-
}
286-
}
287-
})
288-
}
289-
}
290-
291-
func TestStatusToProblems_Conditions(t *testing.T) {
292-
helper := NewTestHelper()
293-
config := helper.CreateTestConfig()
294-
factory := NewMockMonitorFactory()
295-
detector, _ := NewProblemDetector(config, []types.Monitor{}, []types.Exporter{NewMockExporter("test")}, "/tmp/test-config.yaml", factory)
296-
297-
status := types.NewStatus("test-source")
298-
status.AddCondition(types.NewCondition("DiskPressure", types.ConditionFalse, "DiskFull", "Disk is full"))
299-
status.AddCondition(types.NewCondition("NetworkReady", types.ConditionTrue, "NetworkOK", "Network is ready"))
300-
status.AddCondition(types.NewCondition("UnknownCondition", types.ConditionUnknown, "Unknown", "Status unknown"))
301-
302-
problems := detector.statusToProblems(status)
303-
304-
// Should only create problem for False condition
305-
if len(problems) != 1 {
306-
t.Errorf("statusToProblems() problem count = %d, want 1", len(problems))
307-
}
308-
309-
if problems[0].Type != "condition-DiskPressure" {
310-
t.Errorf("statusToProblems() problem type = %s, want condition-DiskPressure", problems[0].Type)
311-
}
312-
}
313-
314-
func TestDeduplicateProblems(t *testing.T) {
315-
helper := NewTestHelper()
316-
config := helper.CreateTestConfig()
317-
factory := NewMockMonitorFactory()
318-
detector, _ := NewProblemDetector(config, []types.Monitor{}, []types.Exporter{NewMockExporter("test")}, "/tmp/test-config.yaml", factory)
319-
320-
// First set of problems
321-
problem1 := types.NewProblem("disk-full", "node1", types.ProblemCritical, "Disk is full")
322-
problem2 := types.NewProblem("memory-pressure", "node1", types.ProblemWarning, "Memory pressure detected")
323-
324-
newProblems := detector.deduplicateProblems([]*types.Problem{problem1, problem2})
325-
if len(newProblems) != 2 {
326-
t.Errorf("First deduplication expected 2 new problems, got %d", len(newProblems))
327-
}
328-
329-
// Same problems again (should be deduplicated)
330-
problem1Dup := types.NewProblem("disk-full", "node1", types.ProblemCritical, "Disk is full")
331-
newProblems = detector.deduplicateProblems([]*types.Problem{problem1Dup})
332-
if len(newProblems) != 0 {
333-
t.Errorf("Second deduplication expected 0 new problems, got %d", len(newProblems))
334-
}
335-
336-
// Same type/resource but different severity (should be reported)
337-
problem1Updated := types.NewProblem("disk-full", "node1", types.ProblemWarning, "Disk pressure reduced")
338-
newProblems = detector.deduplicateProblems([]*types.Problem{problem1Updated})
339-
if len(newProblems) != 1 {
340-
t.Errorf("Third deduplication expected 1 new problem, got %d", len(newProblems))
341-
}
342-
}
343-
344205
func TestExportDistribution(t *testing.T) {
345206
helper := NewTestHelper()
346207
config := helper.CreateTestConfig()
@@ -556,17 +417,17 @@ func TestExportFailureIsolation(t *testing.T) {
556417
config := helper.CreateTestConfig()
557418

558419
// Create exporters with different failure behaviors
420+
// Note: We only test status export failures since ExportProblem is no longer called (issue #7 fix)
559421
successExporter := NewMockExporter("success-exporter")
560422
failStatusExporter := NewMockExporter("fail-status").SetStatusExportError(fmt.Errorf("status export failed"))
561-
failProblemExporter := NewMockExporter("fail-problem").SetProblemExportError(fmt.Errorf("problem export failed"))
562423

563424
factory := NewMockMonitorFactory().SetCreateFunc(func(config types.MonitorConfig) (types.Monitor, error) {
564425
status := types.NewStatus(config.Name)
565426
status.AddEvent(types.NewEvent(types.EventError, "CriticalError", "Critical error occurred"))
566427
return NewMockMonitor(config.Name).AddStatusUpdate(status), nil
567428
})
568429

569-
detector, err := NewProblemDetector(config, []types.Monitor{}, []types.Exporter{successExporter, failStatusExporter, failProblemExporter}, "/tmp/test-config.yaml", factory)
430+
detector, err := NewProblemDetector(config, []types.Monitor{}, []types.Exporter{successExporter, failStatusExporter}, "/tmp/test-config.yaml", factory)
570431
if err != nil {
571432
t.Fatalf("NewProblemDetector() error = %v", err)
572433
}
@@ -650,7 +511,7 @@ func TestStatisticsTracking(t *testing.T) {
650511
})
651512

652513
successExporter := NewMockExporter("success-exp")
653-
failExporter := NewMockExporter("fail-exp").SetProblemExportError(fmt.Errorf("export failed"))
514+
failExporter := NewMockExporter("fail-exp").SetStatusExportError(fmt.Errorf("export failed"))
654515

655516
detector, err := NewProblemDetector(config, []types.Monitor{}, []types.Exporter{successExporter, failExporter}, "/tmp/test-config.yaml", factory)
656517
if err != nil {
@@ -679,9 +540,6 @@ func TestStatisticsTracking(t *testing.T) {
679540
if stats.GetStatusesReceived() == 0 {
680541
t.Errorf("Expected statuses to be received")
681542
}
682-
if stats.GetProblemsDetected() == 0 {
683-
t.Errorf("Expected problems to be detected")
684-
}
685543

686544
// Verify export statistics
687545
if stats.GetExportsSucceeded() == 0 {

pkg/exporters/kubernetes/event_manager.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -103,21 +103,21 @@ func (em *EventManager) CreateEvent(ctx context.Context, event corev1.Event) err
103103
namespace = "default"
104104
}
105105

106-
// Check rate limiting first
107-
if !em.checkRateLimit() {
108-
log.Printf("[WARN] Event rate limit exceeded (%d/min), dropping event: %s",
109-
em.maxEventsPerMin, event.Reason)
110-
return fmt.Errorf("event rate limit exceeded")
111-
}
112-
113-
// Check deduplication
106+
// Check deduplication FIRST - duplicates should not consume rate limit quota
114107
signature := CreateEventSignature(event)
115108
if em.isDuplicate(signature) {
116109
log.Printf("[DEBUG] Duplicate event suppressed: %s (signature: %s)",
117110
event.Reason, signature.Hash())
118111
return nil
119112
}
120113

114+
// Check rate limiting only for non-duplicate events
115+
if !em.checkRateLimit() {
116+
log.Printf("[WARN] Event rate limit exceeded (%d/min), dropping event: %s",
117+
em.maxEventsPerMin, event.Reason)
118+
return fmt.Errorf("event rate limit exceeded")
119+
}
120+
121121
// Create the event
122122
err := em.client.CreateEvent(ctx, event, namespace)
123123
if err != nil {

pkg/exporters/kubernetes/event_manager_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package kubernetes
22

33
import (
44
"context"
5+
"fmt"
56
"testing"
67
"time"
78

@@ -163,19 +164,19 @@ func TestEventRateLimiting(t *testing.T) {
163164
manager := createTestEventManager(2, time.Minute) // Limit to 2 events per minute
164165
ctx := context.Background()
165166

166-
// Create events up to the limit
167+
// Create unique events up to the limit (different Reasons to avoid deduplication)
167168
for i := 0; i < 2; i++ {
168169
event := corev1.Event{
169170
ObjectMeta: metav1.ObjectMeta{
170-
Name: "rate-limit-event-" + string(rune('1'+i)),
171+
Name: fmt.Sprintf("rate-limit-event-%d", i+1),
171172
Namespace: "test-namespace",
172173
},
173174
InvolvedObject: corev1.ObjectReference{
174175
Kind: "Node",
175176
Name: "test-node",
176177
},
177-
Reason: "RateLimitTest",
178-
Message: "Rate limit message",
178+
Reason: fmt.Sprintf("RateLimitTest%d", i+1), // Unique reason to avoid deduplication
179+
Message: fmt.Sprintf("Rate limit message %d", i+1),
179180
Type: corev1.EventTypeNormal,
180181
FirstTimestamp: metav1.NewTime(time.Now()),
181182
LastTimestamp: metav1.NewTime(time.Now()),

0 commit comments

Comments
 (0)