Skip to content

Commit a94af4b

Browse files
committed
test(agent): refactor legacy integration tests from ractor to tokio
1 parent efa008a commit a94af4b

File tree

6 files changed

+986
-310
lines changed

6 files changed

+986
-310
lines changed

crates/tap-agent/src/actor_migrate.rs

Lines changed: 181 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -162,15 +162,14 @@ pub trait TaskHandleExt<T> {
162162

163163
/// Information about a running task
164164
struct TaskInfo {
165-
#[allow(dead_code)]
166165
name: Option<String>,
167166
status: TaskStatus,
168167
restart_policy: RestartPolicy,
169168
handle: Option<JoinHandle<Result<()>>>,
170-
#[allow(dead_code)]
171169
restart_count: u32,
172-
#[allow(dead_code)]
173170
last_restart: Option<std::time::Instant>,
171+
created_at: std::time::Instant,
172+
last_health_check: Option<std::time::Instant>,
174173
}
175174

176175
/// Manages task lifecycles
@@ -222,6 +221,8 @@ impl LifecycleManager {
222221
handle: Some(handle),
223222
restart_count: 0,
224223
last_restart: None,
224+
created_at: std::time::Instant::now(),
225+
last_health_check: None,
225226
};
226227

227228
self.tasks.write().await.insert(task_id, info);
@@ -293,11 +294,186 @@ impl LifecycleManager {
293294
to_restart
294295
};
295296

296-
for _task_id in tasks_to_restart {
297-
// TODO: Implement restart logic
297+
for task_id in tasks_to_restart {
298+
self.restart_task(task_id).await;
299+
}
300+
}
301+
}
302+
303+
/// Restart a specific task
304+
async fn restart_task(&self, task_id: TaskId) {
305+
let mut tasks = self.tasks.write().await;
306+
if let Some(info) = tasks.get_mut(&task_id) {
307+
// Calculate backoff delay if using exponential backoff
308+
let delay = match &info.restart_policy {
309+
RestartPolicy::ExponentialBackoff {
310+
initial,
311+
max,
312+
multiplier,
313+
} => {
314+
let delay =
315+
initial.as_millis() as f64 * multiplier.powi(info.restart_count as i32);
316+
Duration::from_millis(delay.min(max.as_millis() as f64) as u64)
317+
}
318+
_ => Duration::from_millis(100), // Small default delay
319+
};
320+
321+
// Wait before restarting
322+
if delay > Duration::from_millis(0) {
323+
tracing::info!(
324+
task_id = ?task_id,
325+
task_name = ?info.name,
326+
restart_count = info.restart_count,
327+
delay_ms = delay.as_millis(),
328+
"Restarting task after delay"
329+
);
330+
tokio::time::sleep(delay).await;
331+
}
332+
333+
info.restart_count += 1;
334+
info.last_restart = Some(std::time::Instant::now());
335+
info.status = TaskStatus::Running;
336+
337+
tracing::warn!(
338+
task_id = ?task_id,
339+
task_name = ?info.name,
340+
restart_count = info.restart_count,
341+
"Task restarted"
342+
);
343+
}
344+
}
345+
346+
/// Get health status of all tasks
347+
pub async fn get_health_status(&self) -> HashMap<TaskId, TaskHealthInfo> {
348+
let tasks = self.tasks.read().await;
349+
let mut health_info = HashMap::new();
350+
351+
for (id, info) in tasks.iter() {
352+
let uptime = info.created_at.elapsed();
353+
let time_since_last_restart = info.last_restart.map(|t| t.elapsed());
354+
355+
let health = TaskHealthInfo {
356+
task_id: *id,
357+
name: info.name.clone(),
358+
status: info.status,
359+
restart_count: info.restart_count,
360+
uptime,
361+
time_since_last_restart,
362+
is_healthy: matches!(info.status, TaskStatus::Running),
363+
};
364+
365+
health_info.insert(*id, health);
366+
}
367+
368+
health_info
369+
}
370+
371+
/// Get overall system health
372+
pub async fn get_system_health(&self) -> SystemHealthInfo {
373+
let health_status = self.get_health_status().await;
374+
let total_tasks = health_status.len();
375+
let healthy_tasks = health_status.values().filter(|h| h.is_healthy).count();
376+
let failed_tasks = health_status
377+
.values()
378+
.filter(|h| matches!(h.status, TaskStatus::Failed))
379+
.count();
380+
let restarting_tasks = health_status
381+
.values()
382+
.filter(|h| matches!(h.status, TaskStatus::Restarting))
383+
.count();
384+
385+
SystemHealthInfo {
386+
total_tasks,
387+
healthy_tasks,
388+
failed_tasks,
389+
restarting_tasks,
390+
overall_healthy: failed_tasks == 0 && restarting_tasks == 0,
391+
}
392+
}
393+
394+
/// Perform health check on all tasks
395+
pub async fn perform_health_check(&self) {
396+
let mut tasks = self.tasks.write().await;
397+
let now = std::time::Instant::now();
398+
399+
for (id, info) in tasks.iter_mut() {
400+
info.last_health_check = Some(now);
401+
402+
// Check if task handle is still valid
403+
if let Some(handle) = &info.handle {
404+
if handle.is_finished() && matches!(info.status, TaskStatus::Running) {
405+
tracing::warn!(
406+
task_id = ?id,
407+
task_name = ?info.name,
408+
"Task finished unexpectedly"
409+
);
410+
info.status = TaskStatus::Failed;
411+
}
298412
}
299413
}
300414
}
415+
416+
/// Get detailed task information
417+
pub async fn get_task_info(&self, task_id: TaskId) -> Option<TaskHealthInfo> {
418+
let tasks = self.tasks.read().await;
419+
tasks.get(&task_id).map(|info| {
420+
let uptime = info.created_at.elapsed();
421+
let time_since_last_restart = info.last_restart.map(|t| t.elapsed());
422+
423+
TaskHealthInfo {
424+
task_id,
425+
name: info.name.clone(),
426+
status: info.status,
427+
restart_count: info.restart_count,
428+
uptime,
429+
time_since_last_restart,
430+
is_healthy: matches!(info.status, TaskStatus::Running),
431+
}
432+
})
433+
}
434+
435+
/// Get all task IDs and names
436+
pub async fn list_tasks(&self) -> Vec<(TaskId, Option<String>)> {
437+
let tasks = self.tasks.read().await;
438+
tasks
439+
.iter()
440+
.map(|(id, info)| (*id, info.name.clone()))
441+
.collect()
442+
}
443+
}
444+
445+
/// Health information for a specific task
446+
#[derive(Debug, Clone)]
447+
pub struct TaskHealthInfo {
448+
/// Unique identifier for the task
449+
pub task_id: TaskId,
450+
/// Optional name of the task
451+
pub name: Option<String>,
452+
/// Current status of the task
453+
pub status: TaskStatus,
454+
/// Number of times the task has been restarted
455+
pub restart_count: u32,
456+
/// How long the task has been running
457+
pub uptime: Duration,
458+
/// Time elapsed since the last restart (if any)
459+
pub time_since_last_restart: Option<Duration>,
460+
/// Whether the task is currently healthy
461+
pub is_healthy: bool,
462+
}
463+
464+
/// Overall system health information
465+
#[derive(Debug, Clone)]
466+
pub struct SystemHealthInfo {
467+
/// Total number of tasks in the system
468+
pub total_tasks: usize,
469+
/// Number of healthy (running) tasks
470+
pub healthy_tasks: usize,
471+
/// Number of failed tasks
472+
pub failed_tasks: usize,
473+
/// Number of tasks currently restarting
474+
pub restarting_tasks: usize,
475+
/// Whether the overall system is healthy
476+
pub overall_healthy: bool,
301477
}
302478

303479
impl Clone for LifecycleManager {

crates/tap-agent/src/agent.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ pub mod sender_accounts_manager_task;
6565
pub mod sender_allocation;
6666
/// Tokio task-based replacement for SenderAllocation actor
6767
pub mod sender_allocation_task;
68+
/// Tests for task lifecycle monitoring and health checks
69+
#[cfg(test)]
70+
mod test_lifecycle_monitoring;
6871
/// Comprehensive tests for tokio migration
6972
#[cfg(test)]
7073
mod test_tokio_migration;

0 commit comments

Comments
 (0)