Skip to content

Commit 36f4cf1

Browse files
authored
Merge pull request #148 from dmoliveira/loopmux-br-2fy-observability
Add fleet heartbeat telemetry for active vs idle loop behavior
2 parents 12f350b + c58eca4 commit 36f4cf1

File tree

1 file changed

+129
-0
lines changed

1 file changed

+129
-0
lines changed

src/main.rs

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ use time::OffsetDateTime;
2525

2626
const LOOPMUX_VERSION: &str = env!("CARGO_PKG_VERSION");
2727
const TUI_REDRAW_SKIP_LOG_INTERVAL: u64 = 25;
28+
const FLEET_HEARTBEAT_MIN_INTERVAL_SECONDS: u64 = 30;
29+
const FLEET_HEARTBEAT_MAX_INTERVAL_SECONDS: u64 = 300;
30+
const FLEET_HEARTBEAT_POLL_MULTIPLIER: u64 = 12;
2831

2932
#[derive(Debug, Parser)]
3033
#[command(name = "loopmux")]
@@ -707,6 +710,10 @@ struct FleetRunRecord {
707710
version: String,
708711
#[serde(default)]
709712
events: Vec<FleetRunEvent>,
713+
#[serde(default)]
714+
heartbeat_sends_reported: u32,
715+
#[serde(default)]
716+
heartbeat_reported_at: Option<String>,
710717
}
711718

712719
#[derive(Debug, Serialize, Deserialize, Clone)]
@@ -1726,6 +1733,8 @@ impl FleetRunRegistry {
17261733
last_seen: now.clone(),
17271734
version: LOOPMUX_VERSION.to_string(),
17281735
events: Vec::new(),
1736+
heartbeat_sends_reported: sends,
1737+
heartbeat_reported_at: Some(now.clone()),
17291738
};
17301739

17311740
let mut record = if self.state_path.exists() {
@@ -1756,13 +1765,47 @@ impl FleetRunRegistry {
17561765
detail: format!("{} -> {}", existing.target, target),
17571766
});
17581767
}
1768+
let heartbeat_interval = fleet_heartbeat_interval_seconds(poll_seconds);
1769+
let should_emit_heartbeat = should_emit_fleet_heartbeat(
1770+
OffsetDateTime::now_utc(),
1771+
existing.heartbeat_reported_at.as_deref(),
1772+
poll_seconds,
1773+
);
1774+
let mut heartbeat_sends_reported = if sends < existing.heartbeat_sends_reported
1775+
{
1776+
sends
1777+
} else {
1778+
existing.heartbeat_sends_reported
1779+
};
1780+
let mut heartbeat_reported_at = existing.heartbeat_reported_at.clone();
1781+
if heartbeat_reported_at.is_none() {
1782+
heartbeat_sends_reported = sends;
1783+
heartbeat_reported_at = Some(now.clone());
1784+
}
1785+
if should_emit_heartbeat {
1786+
let sends_delta = sends.saturating_sub(heartbeat_sends_reported);
1787+
events.push(FleetRunEvent {
1788+
timestamp: now.clone(),
1789+
kind: "heartbeat".to_string(),
1790+
detail: format_fleet_heartbeat_metric(
1791+
state,
1792+
sends,
1793+
sends_delta,
1794+
heartbeat_interval,
1795+
),
1796+
});
1797+
heartbeat_sends_reported = sends;
1798+
heartbeat_reported_at = Some(now.clone());
1799+
}
17591800
if events.len() > 24 {
17601801
let keep_from = events.len() - 24;
17611802
events.drain(0..keep_from);
17621803
}
17631804
FleetRunRecord {
17641805
started_at: existing.started_at,
17651806
events,
1807+
heartbeat_sends_reported,
1808+
heartbeat_reported_at,
17661809
..base_record
17671810
}
17681811
}
@@ -5654,6 +5697,51 @@ fn format_redraw_skip_metric(total: u64, since_last_report: u64, interval: u64)
56545697
)
56555698
}
56565699

5700+
fn fleet_heartbeat_interval_seconds(poll_seconds: u64) -> u64 {
5701+
poll_seconds
5702+
.max(1)
5703+
.saturating_mul(FLEET_HEARTBEAT_POLL_MULTIPLIER)
5704+
.clamp(
5705+
FLEET_HEARTBEAT_MIN_INTERVAL_SECONDS,
5706+
FLEET_HEARTBEAT_MAX_INTERVAL_SECONDS,
5707+
)
5708+
}
5709+
5710+
fn should_emit_fleet_heartbeat(
5711+
now: OffsetDateTime,
5712+
last_reported_at: Option<&str>,
5713+
poll_seconds: u64,
5714+
) -> bool {
5715+
let Some(last_reported_at) = last_reported_at else {
5716+
return false;
5717+
};
5718+
let Ok(last_reported_at) = OffsetDateTime::parse(
5719+
last_reported_at,
5720+
&time::format_description::well_known::Rfc3339,
5721+
) else {
5722+
return true;
5723+
};
5724+
let elapsed = (now - last_reported_at).whole_seconds();
5725+
elapsed >= fleet_heartbeat_interval_seconds(poll_seconds) as i64
5726+
}
5727+
5728+
fn format_fleet_heartbeat_metric(
5729+
state: LoopState,
5730+
sends_total: u32,
5731+
sends_delta: u32,
5732+
interval_seconds: u64,
5733+
) -> String {
5734+
let activity = if sends_delta > 0 { "active" } else { "idle" };
5735+
format!(
5736+
"fleet-heartbeat state={} activity={} sends_total={} sends_delta={} interval={}s",
5737+
fleet_state_label(state),
5738+
activity,
5739+
sends_total,
5740+
sends_delta,
5741+
interval_seconds
5742+
)
5743+
}
5744+
56575745
fn compact_sent_log(
56585746
timestamp: &str,
56595747
target: &str,
@@ -11330,6 +11418,45 @@ runs:
1133011418
assert!(message.contains("interval=25"));
1133111419
}
1133211420

11421+
#[test]
11422+
fn fleet_heartbeat_interval_scales_and_is_bounded() {
11423+
assert_eq!(fleet_heartbeat_interval_seconds(1), 30);
11424+
assert_eq!(fleet_heartbeat_interval_seconds(3), 36);
11425+
assert_eq!(fleet_heartbeat_interval_seconds(60), 300);
11426+
}
11427+
11428+
#[test]
11429+
fn fleet_heartbeat_emission_respects_interval_and_bad_timestamps() {
11430+
let now = OffsetDateTime::parse(
11431+
"2026-03-01T00:01:00Z",
11432+
&time::format_description::well_known::Rfc3339,
11433+
)
11434+
.unwrap();
11435+
assert!(!should_emit_fleet_heartbeat(
11436+
now,
11437+
Some("2026-03-01T00:00:20Z"),
11438+
5,
11439+
));
11440+
assert!(should_emit_fleet_heartbeat(
11441+
now,
11442+
Some("2026-03-01T00:00:00Z"),
11443+
5,
11444+
));
11445+
assert!(!should_emit_fleet_heartbeat(now, None, 5));
11446+
assert!(should_emit_fleet_heartbeat(now, Some("not-a-ts"), 5));
11447+
}
11448+
11449+
#[test]
11450+
fn fleet_heartbeat_metric_marks_idle_and_active_modes() {
11451+
let idle = format_fleet_heartbeat_metric(LoopState::Running, 10, 0, 60);
11452+
assert!(idle.contains("state=running"));
11453+
assert!(idle.contains("activity=idle"));
11454+
let active = format_fleet_heartbeat_metric(LoopState::Running, 12, 2, 60);
11455+
assert!(active.contains("activity=active"));
11456+
assert!(active.contains("sends_total=12"));
11457+
assert!(active.contains("sends_delta=2"));
11458+
}
11459+
1133311460
#[test]
1133411461
fn periodic_count_log_recovers_after_counter_reset() {
1133511462
assert!(!should_emit_periodic_count_log(10, 25, 25));
@@ -11391,6 +11518,8 @@ runs:
1139111518
last_seen: "2026-02-17T00:00:00Z".to_string(),
1139211519
version: version.to_string(),
1139311520
events: Vec::new(),
11521+
heartbeat_sends_reported: sends,
11522+
heartbeat_reported_at: None,
1139411523
}
1139511524
}
1139611525

0 commit comments

Comments
 (0)