@@ -4,6 +4,7 @@ package network
44import (
55 "context"
66 "fmt"
7+ "strings"
78 "sync"
89 "time"
910
@@ -109,6 +110,10 @@ type CNIMonitor struct {
109110 peerStatuses map [string ]* PeerStatus // keyed by node name
110111 started bool
111112
113+ // State tracking for delta-based event emission (issue #8)
114+ lastHighLatencyPeers []string // Previous high latency peer list
115+ lastPersistentlyUnreachable []string // Previous persistently unreachable peer list
116+
112117 * monitors.BaseMonitor
113118}
114119
@@ -394,6 +399,7 @@ func (m *CNIMonitor) checkCNI(ctx context.Context) (*types.Status, error) {
394399 reachableCount := 0
395400 unreachablePeers := make ([]string , 0 )
396401 highLatencyPeers := make ([]string , 0 )
402+ persistentlyUnreachablePeers := make ([]string , 0 ) // Peers exceeding failure threshold
397403 var totalLatency time.Duration
398404
399405 for _ , peer := range peers {
@@ -407,34 +413,68 @@ func (m *CNIMonitor) checkCNI(ctx context.Context) (*types.Status, error) {
407413 reachableCount ++
408414 totalLatency += peerStatus .AvgLatency
409415
410- // Check for high latency
416+ // Check for high latency (collect but don't emit individual events)
411417 if peerStatus .AvgLatency > m .config .Connectivity .CriticalLatency {
412418 highLatencyPeers = append (highLatencyPeers , fmt .Sprintf ("%s (%.2fms)" , peer .NodeName , float64 (peerStatus .AvgLatency )/ float64 (time .Millisecond )))
413- status .AddEvent (types .NewEvent (
414- types .EventWarning ,
415- "HighPeerLatency" ,
416- fmt .Sprintf ("High latency to peer %s: %.2fms (critical threshold: %.2fms)" ,
417- peer .NodeName , float64 (peerStatus .AvgLatency )/ float64 (time .Millisecond ),
418- float64 (m .config .Connectivity .CriticalLatency )/ float64 (time .Millisecond )),
419- ))
420419 } else if peerStatus .AvgLatency > m .config .Connectivity .WarningLatency {
421420 highLatencyPeers = append (highLatencyPeers , fmt .Sprintf ("%s (%.2fms)" , peer .NodeName , float64 (peerStatus .AvgLatency )/ float64 (time .Millisecond )))
422421 }
423422 } else {
424423 unreachablePeers = append (unreachablePeers , peer .NodeName )
425424
426- // Check if this is a persistent failure
425+ // Track peers that have exceeded the failure threshold
427426 if peerStatus .ConsecutiveFails >= m .config .Connectivity .FailureThreshold {
428- status .AddEvent (types .NewEvent (
429- types .EventError ,
430- "PeerUnreachable" ,
431- fmt .Sprintf ("Peer %s has been unreachable for %d consecutive checks" ,
432- peer .NodeName , peerStatus .ConsecutiveFails ),
433- ))
427+ persistentlyUnreachablePeers = append (persistentlyUnreachablePeers ,
428+ fmt .Sprintf ("%s (%d consecutive failures)" , peer .NodeName , peerStatus .ConsecutiveFails ))
434429 }
435430 }
436431 }
437432
433+ // Emit aggregate events only on state changes (delta-based emission, fixes issue #8)
434+ // This prevents duplicate events when the same peers are problematic across checks
435+ m .mu .Lock ()
436+ previousHighLatency := m .lastHighLatencyPeers
437+ previousUnreachable := m .lastPersistentlyUnreachable
438+ m .mu .Unlock ()
439+
440+ // Aggregate event for high latency peers (only on change)
441+ if len (highLatencyPeers ) > 0 && peerListChanged (highLatencyPeers , previousHighLatency ) {
442+ status .AddEvent (types .NewEvent (
443+ types .EventWarning ,
444+ "HighPeerLatencySummary" ,
445+ formatPeerListMessage ("High latency detected to %d peer(s)" , highLatencyPeers , 5 ),
446+ ))
447+ } else if len (highLatencyPeers ) == 0 && len (previousHighLatency ) > 0 {
448+ // Emit recovery event when latency issues are resolved
449+ status .AddEvent (types .NewEvent (
450+ types .EventInfo ,
451+ "HighPeerLatencyRecovered" ,
452+ fmt .Sprintf ("High latency resolved for %d peer(s)" , len (previousHighLatency )),
453+ ))
454+ }
455+
456+ // Aggregate event for persistently unreachable peers (only on change)
457+ if len (persistentlyUnreachablePeers ) > 0 && peerListChanged (persistentlyUnreachablePeers , previousUnreachable ) {
458+ status .AddEvent (types .NewEvent (
459+ types .EventError ,
460+ "PeersUnreachableSummary" ,
461+ formatPeerListMessage ("%d peer(s) persistently unreachable" , persistentlyUnreachablePeers , 5 ),
462+ ))
463+ } else if len (persistentlyUnreachablePeers ) == 0 && len (previousUnreachable ) > 0 {
464+ // Emit recovery event when peers become reachable again
465+ status .AddEvent (types .NewEvent (
466+ types .EventInfo ,
467+ "PeersReachableRecovered" ,
468+ fmt .Sprintf ("%d peer(s) now reachable again" , len (previousUnreachable )),
469+ ))
470+ }
471+
472+ // Update state tracking for next check
473+ m .mu .Lock ()
474+ m .lastHighLatencyPeers = highLatencyPeers
475+ m .lastPersistentlyUnreachable = persistentlyUnreachablePeers
476+ m .mu .Unlock ()
477+
438478 // Calculate reachability percentage
439479 totalPeers := len (peers )
440480 reachablePercent := (reachableCount * 100 ) / totalPeers
@@ -586,3 +626,40 @@ func (m *CNIMonitor) GetPeerStatuses() map[string]*PeerStatus {
586626 }
587627 return statuses
588628}
629+
630+ // formatPeerListMessage formats a message with a peer list, truncating if necessary.
631+ // headerFormat should contain exactly one %d verb for the total count.
632+ // maxDisplay limits how many peers are shown before truncating with "and N more".
633+ func formatPeerListMessage (headerFormat string , peers []string , maxDisplay int ) string {
634+ total := len (peers )
635+ header := fmt .Sprintf (headerFormat , total )
636+ if total == 0 {
637+ return header
638+ }
639+ if total <= maxDisplay {
640+ return fmt .Sprintf ("%s: %s" , header , strings .Join (peers , ", " ))
641+ }
642+ displayed := peers [:maxDisplay ]
643+ remaining := total - maxDisplay
644+ return fmt .Sprintf ("%s: %s, and %d more" , header , strings .Join (displayed , ", " ), remaining )
645+ }
646+
647+ // peerListChanged compares two peer lists and returns true if they differ.
648+ // This is used for delta-based event emission to avoid duplicate events.
649+ func peerListChanged (current , previous []string ) bool {
650+ if len (current ) != len (previous ) {
651+ return true
652+ }
653+ // Create a map of previous entries for O(1) lookup
654+ prevMap := make (map [string ]struct {}, len (previous ))
655+ for _ , p := range previous {
656+ prevMap [p ] = struct {}{}
657+ }
658+ // Check if all current entries exist in previous
659+ for _ , p := range current {
660+ if _ , exists := prevMap [p ]; ! exists {
661+ return true
662+ }
663+ }
664+ return false
665+ }
0 commit comments