Skip to content

Commit 332d349

Browse files
committed
Add job heartbeat as webhook event (closes #99)
1 parent 4276362 commit 332d349

File tree

9 files changed

+411
-32
lines changed

9 files changed

+411
-32
lines changed

samples/concurrent-streaming-output.factfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
{
2929
"name": "stream-branch-2",
3030
"executor": "shell",
31-
"command": "for i in {1..5}; do echo \"Branch 2: Line $i\"; sleep 1; done",
31+
"command": "for i in {1..100}; do echo \"Branch 2: Line $i\"; sleep 1; done",
3232
"arguments": [],
3333
"dependsOn": [ "start" ],
3434
"onResult": {
@@ -39,7 +39,7 @@
3939
{
4040
"name": "stream-branch-3",
4141
"executor": "shell",
42-
"command": "for i in {1..5}; do echo \"Branch 3: Line $i\"; sleep 1; done",
42+
"command": "for i in {1..20}; do echo \"Branch 3: Line $i\"; sleep 1; done",
4343
"arguments": [],
4444
"dependsOn": [ "start" ],
4545
"onResult": {

samples/test-heartbeat.factfile

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
{
2+
"schema": "iglu:com.snowplowanalytics.factotum/factfile/jsonschema/1-0-0",
3+
"data": {
4+
"name": "heartbeat_test",
5+
"tasks": [
6+
{
7+
"name": "quick_task",
8+
"executor": "shell",
9+
"command": "echo",
10+
"arguments": ["Starting tests..."],
11+
"dependsOn": [],
12+
"onResult": {
13+
"terminateJobWithSuccess": [],
14+
"continueJob": [0]
15+
}
16+
},
17+
{
18+
"name": "long_task_with_logs",
19+
"executor": "shell",
20+
"command": "bash",
21+
"arguments": [
22+
"-c",
23+
"for i in 1 2 3 4 5 6 7 8 9 10; do echo Progress: $i/10; sleep 3; done; echo Task completed"
24+
],
25+
"dependsOn": ["quick_task"],
26+
"onResult": {
27+
"terminateJobWithSuccess": [],
28+
"continueJob": [0]
29+
}
30+
}
31+
]
32+
}
33+
}

src/factotum/executor/execution_strategy/mod.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::process::{Command, Stdio};
1818
use std::time::{Instant, Duration};
1919
use std::io::{BufRead, BufReader};
2020
use std::thread;
21+
use std::sync::{Arc, Mutex};
2122
use colored::*;
2223

2324
#[derive(Clone, PartialEq, Debug)]
@@ -66,6 +67,10 @@ pub fn execute_simulation(name: &str, command: &mut Command) -> RunResult {
6667
}
6768

6869
pub fn execute_os(name: &str, command: &mut Command) -> RunResult {
70+
execute_os_with_buffers(name, command, None, None)
71+
}
72+
73+
pub fn execute_os_with_buffers(name: &str, command: &mut Command, stdout_buffer: Option<Arc<Mutex<String>>>, stderr_buffer: Option<Arc<Mutex<String>>>) -> RunResult {
6974
let run_start = Instant::now();
7075
info!("Executing sh {:?}", command);
7176

@@ -84,25 +89,45 @@ pub fn execute_os(name: &str, command: &mut Command) -> RunResult {
8489
let name_stdout = name.to_string();
8590
let name_stderr = name.to_string();
8691

92+
let stdout_buffer_clone = stdout_buffer.clone();
8793
let stdout_handle = thread::spawn(move || {
8894
let reader = BufReader::new(stdout);
8995
let mut lines = Vec::new();
9096
for line in reader.lines() {
9197
if let Ok(line) = line {
9298
println!("[{}] {}", name_stdout.cyan(), line);
93-
lines.push(line);
99+
lines.push(line.clone());
100+
101+
// Also append to shared buffer if provided
102+
if let Some(ref buffer) = stdout_buffer_clone {
103+
let mut buf = buffer.lock().unwrap_or_else(|p| p.into_inner());
104+
if !buf.is_empty() {
105+
buf.push('\n');
106+
}
107+
buf.push_str(&line);
108+
}
94109
}
95110
}
96111
lines.join("\n")
97112
});
98113

114+
let stderr_buffer_clone = stderr_buffer.clone();
99115
let stderr_handle = thread::spawn(move || {
100116
let reader = BufReader::new(stderr);
101117
let mut lines = Vec::new();
102118
for line in reader.lines() {
103119
if let Ok(line) = line {
104120
eprintln!("[{}] {}", name_stderr.cyan(), line.red());
105-
lines.push(line);
121+
lines.push(line.clone());
122+
123+
// Also append to shared buffer if provided
124+
if let Some(ref buffer) = stderr_buffer_clone {
125+
let mut buf = buffer.lock().unwrap_or_else(|p| p.into_inner());
126+
if !buf.is_empty() {
127+
buf.push('\n');
128+
}
129+
buf.push_str(&line);
130+
}
106131
}
107132
}
108133
lines.join("\n")

src/factotum/executor/heartbeat.rs

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
use std::sync::Mutex;
2+
use std::collections::HashMap;
3+
use chrono::{DateTime, UTC};
4+
use std::sync::Arc;
5+
use std::time::Duration;
6+
use std::sync::mpsc::Sender;
7+
use std::sync::atomic::{AtomicBool, Ordering};
8+
use std::thread::{self, JoinHandle};
9+
use super::{ExecutionUpdate, ExecutionState, Transition, HeartbeatTaskData, get_task_snapshot};
10+
11+
// Shared state for streaming logs
12+
pub struct TaskStreamState {
13+
pub start_time: DateTime<UTC>,
14+
pub stdout_buffer: Arc<Mutex<String>>,
15+
pub stderr_buffer: Arc<Mutex<String>>,
16+
}
17+
18+
// Global state - tracks running tasks with log buffers
19+
lazy_static! {
20+
static ref RUNNING_TASKS: Mutex<HashMap<String, TaskStreamState>> =
21+
Mutex::new(HashMap::new());
22+
}
23+
24+
// Called by executor when task starts
25+
// Returns Arc clones to log buffers for the streaming threads
26+
pub fn register_task_start(name: String, start: DateTime<UTC>)
27+
-> (Arc<Mutex<String>>, Arc<Mutex<String>>)
28+
{
29+
let stdout_buffer = Arc::new(Mutex::new(String::new()));
30+
let stderr_buffer = Arc::new(Mutex::new(String::new()));
31+
32+
RUNNING_TASKS.lock()
33+
.unwrap_or_else(|poisoned| poisoned.into_inner())
34+
.insert(name, TaskStreamState {
35+
start_time: start,
36+
stdout_buffer: stdout_buffer.clone(),
37+
stderr_buffer: stderr_buffer.clone(),
38+
});
39+
40+
(stdout_buffer, stderr_buffer)
41+
}
42+
43+
// Called by executor when task completes
44+
pub fn unregister_task_completion(name: &str) {
45+
RUNNING_TASKS.lock()
46+
.unwrap_or_else(|poisoned| poisoned.into_inner())
47+
.remove(name);
48+
}
49+
50+
// Snapshot with cumulative logs (for heartbeats)
51+
pub struct TaskHeartbeatData {
52+
pub name: String,
53+
pub start_time: DateTime<UTC>,
54+
pub elapsed: Duration,
55+
pub stdout: String, // Cumulative stdout up to now
56+
pub stderr: String, // Cumulative stderr up to now
57+
}
58+
59+
// Get snapshot for heartbeat with cumulative logs
60+
pub fn get_running_tasks_with_logs() -> Vec<TaskHeartbeatData> {
61+
let now = UTC::now();
62+
let tasks = RUNNING_TASKS.lock()
63+
.unwrap_or_else(|poisoned| poisoned.into_inner());
64+
65+
tasks.iter()
66+
.map(|(name, state)| {
67+
let stdout = state.stdout_buffer.lock()
68+
.unwrap_or_else(|poisoned| poisoned.into_inner())
69+
.clone();
70+
let stderr = state.stderr_buffer.lock()
71+
.unwrap_or_else(|poisoned| poisoned.into_inner())
72+
.clone();
73+
74+
let elapsed_chrono = now - state.start_time;
75+
let elapsed_std = elapsed_chrono.to_std()
76+
.unwrap_or(Duration::from_secs(0));
77+
78+
TaskHeartbeatData {
79+
name: name.clone(),
80+
start_time: state.start_time,
81+
elapsed: elapsed_std,
82+
stdout,
83+
stderr,
84+
}
85+
})
86+
.collect()
87+
}
88+
89+
// Spawn heartbeat timer thread
90+
pub fn spawn_heartbeat_thread(
91+
interval: Duration,
92+
progress_channel: Sender<ExecutionUpdate>,
93+
shutdown_flag: Arc<AtomicBool>,
94+
shared_snapshot: Arc<Mutex<super::TaskSnapshot>>,
95+
) -> JoinHandle<()> {
96+
thread::Builder::new()
97+
.name("heartbeat".to_string())
98+
.spawn(move || {
99+
loop {
100+
thread::sleep(interval);
101+
102+
// Check shutdown first
103+
if shutdown_flag.load(Ordering::SeqCst) {
104+
break;
105+
}
106+
107+
let running = get_running_tasks_with_logs();
108+
if running.is_empty() {
109+
continue;
110+
}
111+
112+
// Convert to HeartbeatTaskData for the transition
113+
let heartbeat_data: Vec<HeartbeatTaskData> = running.iter()
114+
.map(|t| HeartbeatTaskData {
115+
task_name: t.name.clone(),
116+
elapsed_seconds: t.elapsed.as_secs(),
117+
stdout: t.stdout.clone(),
118+
stderr: t.stderr.clone(),
119+
})
120+
.collect();
121+
122+
// Get live snapshot from shared state
123+
let snapshot = shared_snapshot.lock()
124+
.unwrap_or_else(|poisoned| poisoned.into_inner())
125+
.clone();
126+
127+
// Create heartbeat ExecutionUpdate
128+
let update = ExecutionUpdate::new(
129+
ExecutionState::Running,
130+
snapshot,
131+
Transition::Heartbeat(heartbeat_data)
132+
);
133+
134+
match progress_channel.send(update) {
135+
Ok(_) => {},
136+
Err(e) => {
137+
warn!("Heartbeat send failed: {}, exiting", e);
138+
break;
139+
}
140+
}
141+
}
142+
})
143+
.expect("Failed to spawn heartbeat thread")
144+
}

0 commit comments

Comments
 (0)