@@ -54,24 +54,24 @@ use crate::{
54
54
55
55
/// Actor, Arguments, State, Messages and implementation for [crate::agent::sender_account::SenderAccount]
56
56
pub mod sender_account;
57
- /// Tokio task-based replacement for SenderAccount actor
57
+ /// SenderAccount task implementation
58
58
pub mod sender_account_task;
59
59
/// Actor, Arguments, State, Messages and implementation for
60
60
/// [crate::agent::sender_accounts_manager::SenderAccountsManager]
61
61
pub mod sender_accounts_manager;
62
- /// Tokio task-based replacement for SenderAccountsManager actor
62
+ /// SenderAccountsManager task implementation
63
63
pub mod sender_accounts_manager_task;
64
64
/// Actor, Arguments, State, Messages and implementation for [crate::agent::sender_allocation::SenderAllocation]
65
65
pub mod sender_allocation;
66
- /// Tokio task-based replacement for SenderAllocation actor
66
+ /// SenderAllocation task implementation
67
67
pub mod sender_allocation_task;
68
68
/// Tests for task lifecycle monitoring and health checks
69
69
#[ cfg( test) ]
70
70
mod test_lifecycle_monitoring;
71
- /// Comprehensive tests for tokio migration
71
+ /// Comprehensive integration tests
72
72
#[ cfg( test) ]
73
73
mod test_tokio_migration;
74
- /// Regression tests comparing ractor vs tokio behavior
74
+ /// Regression tests for system behavior
75
75
#[ cfg( test) ]
76
76
mod test_tokio_regression;
77
77
/// Unaggregated receipts containing total value and last id stored in the table
@@ -83,7 +83,9 @@ use crate::actor_migrate::TaskHandle;
83
83
///
84
84
/// It uses the static [crate::CONFIG] to configure the agent.
85
85
///
86
- /// 🎯 TOKIO MIGRATION: Now returns TaskHandle instead of ActorRef
86
+ /// Returns:
87
+ /// - TaskHandle for the main SenderAccountsManagerTask
88
+ /// - JoinHandle for the system health monitoring task that triggers shutdown on critical failures
87
89
pub async fn start_agent ( ) -> ( TaskHandle < SenderAccountsManagerMessage > , JoinHandle < ( ) > ) {
88
90
let Config {
89
91
indexer : IndexerConfig {
@@ -246,7 +248,7 @@ pub async fn start_agent() -> (TaskHandle<SenderAccountsManagerMessage>, JoinHan
246
248
config
247
249
} ) ) ;
248
250
249
- // 🎯 TOKIO MIGRATION: Using SenderAccountsManagerTask instead of ractor SenderAccountsManager
251
+ // Initialize the tokio-based sender accounts manager
250
252
let lifecycle = LifecycleManager :: new ( ) ;
251
253
252
254
let task_handle = SenderAccountsManagerTask :: spawn (
@@ -263,15 +265,60 @@ pub async fn start_agent() -> (TaskHandle<SenderAccountsManagerMessage>, JoinHan
263
265
. await
264
266
. expect ( "Failed to start sender accounts manager task." ) ;
265
267
266
- // Create a dummy JoinHandle for compatibility with main.rs
267
- // In the tokio model, the lifecycle manager handles task monitoring
268
- let dummy_handle = tokio:: spawn ( async {
269
- // This task runs indefinitely until the application shuts down
270
- // The actual work is done by the SenderAccountsManagerTask and its children
271
- loop {
272
- tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( 60 ) ) . await ;
268
+ // Create a proper system monitoring task that integrates with LifecycleManager
269
+ // This task monitors overall system health and can trigger graceful shutdown
270
+ let monitoring_handle = tokio:: spawn ( {
271
+ let lifecycle_clone = lifecycle. clone ( ) ;
272
+ async move {
273
+ let mut interval = tokio:: time:: interval ( std:: time:: Duration :: from_secs ( 5 ) ) ;
274
+ let mut consecutive_unhealthy_checks = 0 ;
275
+ const MAX_UNHEALTHY_CHECKS : u32 = 12 ; // 60 seconds of unhealthy state
276
+
277
+ loop {
278
+ interval. tick ( ) . await ;
279
+
280
+ // Monitor system health
281
+ let system_health = lifecycle_clone. get_system_health ( ) . await ;
282
+
283
+ if !system_health. overall_healthy {
284
+ consecutive_unhealthy_checks += 1 ;
285
+ tracing:: warn!(
286
+ "System unhealthy: {}/{} healthy tasks, {} failed, {} restarting (check {}/{})" ,
287
+ system_health. healthy_tasks,
288
+ system_health. total_tasks,
289
+ system_health. failed_tasks,
290
+ system_health. restarting_tasks,
291
+ consecutive_unhealthy_checks,
292
+ MAX_UNHEALTHY_CHECKS
293
+ ) ;
294
+
295
+ // If system has been unhealthy for too long, trigger shutdown
296
+ if consecutive_unhealthy_checks >= MAX_UNHEALTHY_CHECKS {
297
+ tracing:: error!(
298
+ "System has been unhealthy for {} checks, initiating graceful shutdown" ,
299
+ consecutive_unhealthy_checks
300
+ ) ;
301
+ break ;
302
+ }
303
+ } else {
304
+ // Reset counter on healthy check
305
+ if consecutive_unhealthy_checks > 0 {
306
+ tracing:: info!(
307
+ "System health recovered: {}/{} healthy tasks" ,
308
+ system_health. healthy_tasks,
309
+ system_health. total_tasks
310
+ ) ;
311
+ consecutive_unhealthy_checks = 0 ;
312
+ }
313
+ }
314
+
315
+ // Perform periodic health checks on all tasks
316
+ lifecycle_clone. perform_health_check ( ) . await ;
317
+ }
318
+
319
+ tracing:: info!( "System monitoring task shutting down" ) ;
273
320
}
274
321
} ) ;
275
322
276
- ( task_handle, dummy_handle )
323
+ ( task_handle, monitoring_handle )
277
324
}
0 commit comments