@@ -6,6 +6,7 @@ use tokio::sync::broadcast;
66use tokio_util:: sync:: CancellationToken ;
77use tracing:: { info, error} ;
88use std:: error:: Error ;
9+ use std:: collections:: HashMap ;
910
1011#[ derive( Clone , Debug ) ]
1112pub struct HealthCheck {
@@ -47,27 +48,29 @@ pub async fn monitor_health(
4748 unhealthy_threshold : u32 ,
4849 cancel : CancellationToken ,
4950) -> Result < ( ) , Box < dyn Error > > {
50- let mut unhealthy_count = 0 ;
51+ let mut unhealthy_counts : HashMap < String , u32 > = HashMap :: new ( ) ;
5152
5253 loop {
5354 tokio:: select! {
5455 result = rx. recv( ) => {
5556 match result {
5657 Ok ( health_check) => {
57- if !health_check. is_healthy {
58- unhealthy_count += 1 ;
59- error!( "Node {} is unhealthy. Unhealthy count: {}" , health_check. node_id, unhealthy_count) ;
60- } else {
61- unhealthy_count = 0 ;
62- info!( "Node {} is healthy." , health_check. node_id) ;
63- }
58+ let threshold_reached = update_unhealthy_counts( & mut unhealthy_counts, & health_check, unhealthy_threshold) ;
6459
65- if unhealthy_count >= unhealthy_threshold {
60+ if health_check. is_healthy {
61+ info!( "Node {} is healthy." , health_check. node_id) ;
62+ } else if threshold_reached {
6663 error!(
6764 "Node {} has been unhealthy for {} consecutive checks. Taking corrective action." ,
6865 health_check. node_id, unhealthy_threshold
6966 ) ;
7067 // Add corrective actions here, such as restarting the node or notifying other services.
68+ } else {
69+ if let Some ( count) = unhealthy_counts. get( & health_check. node_id) {
70+ error!( "Node {} is unhealthy. Unhealthy count: {}" , health_check. node_id, count) ;
71+ } else {
72+ error!( "Node {} is unhealthy." , health_check. node_id) ;
73+ }
7174 }
7275 }
7376 Err ( broadcast:: error:: RecvError :: Closed ) => {
@@ -88,10 +91,29 @@ pub async fn monitor_health(
8891 }
8992}
9093
94+ fn update_unhealthy_counts (
95+ unhealthy_counts : & mut HashMap < String , u32 > ,
96+ health_check : & HealthCheck ,
97+ unhealthy_threshold : u32 ,
98+ ) -> bool {
99+ if health_check. is_healthy {
100+ unhealthy_counts. remove ( & health_check. node_id ) ;
101+ return false ;
102+ }
103+
104+ let count = unhealthy_counts
105+ . entry ( health_check. node_id . clone ( ) )
106+ . and_modify ( |counter| * counter = counter. saturating_add ( 1 ) )
107+ . or_insert ( 1 ) ;
108+
109+ * count >= unhealthy_threshold
110+ }
111+
91112#[ cfg( test) ]
92113mod tests {
93114 use super :: * ;
94115 use tokio:: time:: Duration ;
116+ use std:: collections:: HashMap ;
95117
96118 #[ tokio:: test]
97119 async fn monitor_health_returns_on_channel_close ( ) {
@@ -129,4 +151,51 @@ mod tests {
129151 . expect ( "start_heartbeat did not stop in time" )
130152 . expect ( "task panicked" ) ;
131153 }
154+
155+ #[ test]
156+ fn update_unhealthy_counts_tracks_per_node_thresholds ( ) {
157+ let mut counts = HashMap :: new ( ) ;
158+ let threshold = 2 ;
159+
160+ let node_a = "node-a" . to_string ( ) ;
161+ let node_b = "node-b" . to_string ( ) ;
162+
163+ // First unhealthy report for node A.
164+ let alert = update_unhealthy_counts (
165+ & mut counts,
166+ & HealthCheck { node_id : node_a. clone ( ) , is_healthy : false } ,
167+ threshold,
168+ ) ;
169+ assert ! ( !alert) ;
170+ assert_eq ! ( counts. get( & node_a) , Some ( & 1 ) ) ;
171+
172+ // First unhealthy report for node B.
173+ let alert = update_unhealthy_counts (
174+ & mut counts,
175+ & HealthCheck { node_id : node_b. clone ( ) , is_healthy : false } ,
176+ threshold,
177+ ) ;
178+ assert ! ( !alert) ;
179+ assert_eq ! ( counts. get( & node_b) , Some ( & 1 ) ) ;
180+
181+ // Second unhealthy report for node A should trigger the alert for node A only.
182+ let alert = update_unhealthy_counts (
183+ & mut counts,
184+ & HealthCheck { node_id : node_a. clone ( ) , is_healthy : false } ,
185+ threshold,
186+ ) ;
187+ assert ! ( alert) ;
188+ assert_eq ! ( counts. get( & node_a) , Some ( & 2 ) ) ;
189+ assert_eq ! ( counts. get( & node_b) , Some ( & 1 ) ) ;
190+
191+ // Healthy report for node B should reset its counter without affecting node A.
192+ let alert = update_unhealthy_counts (
193+ & mut counts,
194+ & HealthCheck { node_id : node_b. clone ( ) , is_healthy : true } ,
195+ threshold,
196+ ) ;
197+ assert ! ( !alert) ;
198+ assert ! ( counts. get( & node_b) . is_none( ) ) ;
199+ assert_eq ! ( counts. get( & node_a) , Some ( & 2 ) ) ;
200+ }
132201}
0 commit comments