@@ -54,24 +54,24 @@ use crate::{
5454
5555/// Actor, Arguments, State, Messages and implementation for [crate::agent::sender_account::SenderAccount]
5656pub mod sender_account;
57- /// Tokio task-based replacement for SenderAccount actor
57+ /// SenderAccount task implementation
5858pub mod sender_account_task;
5959/// Actor, Arguments, State, Messages and implementation for
6060/// [crate::agent::sender_accounts_manager::SenderAccountsManager]
6161pub mod sender_accounts_manager;
62- /// Tokio task-based replacement for SenderAccountsManager actor
62+ /// SenderAccountsManager task implementation
6363pub mod sender_accounts_manager_task;
6464/// Actor, Arguments, State, Messages and implementation for [crate::agent::sender_allocation::SenderAllocation]
6565pub mod sender_allocation;
66- /// Tokio task-based replacement for SenderAllocation actor
66+ /// SenderAllocation task implementation
6767pub mod sender_allocation_task;
6868/// Tests for task lifecycle monitoring and health checks
6969#[ cfg( test) ]
7070mod test_lifecycle_monitoring;
71- /// Comprehensive tests for tokio migration
71+ /// Comprehensive integration tests
7272#[ cfg( test) ]
7373mod test_tokio_migration;
74- /// Regression tests comparing ractor vs tokio behavior
74+ /// Regression tests for system behavior
7575#[ cfg( test) ]
7676mod test_tokio_regression;
7777/// Unaggregated receipts containing total value and last id stored in the table
@@ -83,7 +83,9 @@ use crate::actor_migrate::TaskHandle;
8383///
8484/// It uses the static [crate::CONFIG] to configure the agent.
8585///
86- /// 🎯 TOKIO MIGRATION: Now returns TaskHandle instead of ActorRef
86+ /// Returns:
87+ /// - TaskHandle for the main SenderAccountsManagerTask
88+ /// - JoinHandle for the system health monitoring task that triggers shutdown on critical failures
8789pub async fn start_agent ( ) -> ( TaskHandle < SenderAccountsManagerMessage > , JoinHandle < ( ) > ) {
8890 let Config {
8991 indexer : IndexerConfig {
@@ -246,7 +248,7 @@ pub async fn start_agent() -> (TaskHandle<SenderAccountsManagerMessage>, JoinHan
246248 config
247249 } ) ) ;
248250
249- // 🎯 TOKIO MIGRATION: Using SenderAccountsManagerTask instead of ractor SenderAccountsManager
251+ // Initialize the tokio-based sender accounts manager
250252 let lifecycle = LifecycleManager :: new ( ) ;
251253
252254 let task_handle = SenderAccountsManagerTask :: spawn (
@@ -263,15 +265,60 @@ pub async fn start_agent() -> (TaskHandle<SenderAccountsManagerMessage>, JoinHan
263265 . await
264266 . expect ( "Failed to start sender accounts manager task." ) ;
265267
266- // Create a dummy JoinHandle for compatibility with main.rs
267- // In the tokio model, the lifecycle manager handles task monitoring
268- let dummy_handle = tokio:: spawn ( async {
269- // This task runs indefinitely until the application shuts down
270- // The actual work is done by the SenderAccountsManagerTask and its children
271- loop {
272- tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( 60 ) ) . await ;
268+ // Create a proper system monitoring task that integrates with LifecycleManager
269+ // This task monitors overall system health and can trigger graceful shutdown
270+ let monitoring_handle = tokio:: spawn ( {
271+ let lifecycle_clone = lifecycle. clone ( ) ;
272+ async move {
273+ let mut interval = tokio:: time:: interval ( std:: time:: Duration :: from_secs ( 5 ) ) ;
274+ let mut consecutive_unhealthy_checks = 0 ;
275+ const MAX_UNHEALTHY_CHECKS : u32 = 12 ; // 60 seconds of unhealthy state
276+
277+ loop {
278+ interval. tick ( ) . await ;
279+
280+ // Monitor system health
281+ let system_health = lifecycle_clone. get_system_health ( ) . await ;
282+
283+ if !system_health. overall_healthy {
284+ consecutive_unhealthy_checks += 1 ;
285+ tracing:: warn!(
286+ "System unhealthy: {}/{} healthy tasks, {} failed, {} restarting (check {}/{})" ,
287+ system_health. healthy_tasks,
288+ system_health. total_tasks,
289+ system_health. failed_tasks,
290+ system_health. restarting_tasks,
291+ consecutive_unhealthy_checks,
292+ MAX_UNHEALTHY_CHECKS
293+ ) ;
294+
295+ // If system has been unhealthy for too long, trigger shutdown
296+ if consecutive_unhealthy_checks >= MAX_UNHEALTHY_CHECKS {
297+ tracing:: error!(
298+ "System has been unhealthy for {} checks, initiating graceful shutdown" ,
299+ consecutive_unhealthy_checks
300+ ) ;
301+ break ;
302+ }
303+ } else {
304+ // Reset counter on healthy check
305+ if consecutive_unhealthy_checks > 0 {
306+ tracing:: info!(
307+ "System health recovered: {}/{} healthy tasks" ,
308+ system_health. healthy_tasks,
309+ system_health. total_tasks
310+ ) ;
311+ consecutive_unhealthy_checks = 0 ;
312+ }
313+ }
314+
315+ // Perform periodic health checks on all tasks
316+ lifecycle_clone. perform_health_check ( ) . await ;
317+ }
318+
319+ tracing:: info!( "System monitoring task shutting down" ) ;
273320 }
274321 } ) ;
275322
276- ( task_handle, dummy_handle )
323+ ( task_handle, monitoring_handle )
277324}
0 commit comments