Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions crates/openfang-api/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8030,7 +8030,9 @@ fn build_skill_config_snapshot(
.read()
.unwrap_or_else(|e| e.into_inner());
let source: &std::collections::HashMap<String, std::collections::HashMap<String, String>> =
override_guard.as_ref().unwrap_or(&state.kernel.config.skills);
override_guard
.as_ref()
.unwrap_or(&state.kernel.config.skills);
source.get(skill_name).cloned().unwrap_or_default()
};

Expand Down Expand Up @@ -8392,7 +8394,9 @@ fn remove_skill_config_var(

let mut remove_skill = false;
if let Some(skills_table) = root.get_mut("skills").and_then(|v| v.as_table_mut()) {
if let Some(skill_section) = skills_table.get_mut(skill_name).and_then(|v| v.as_table_mut())
if let Some(skill_section) = skills_table
.get_mut(skill_name)
.and_then(|v| v.as_table_mut())
{
skill_section.remove(var_name);
if skill_section.is_empty() {
Expand Down Expand Up @@ -9019,9 +9023,8 @@ pub async fn create_schedule(
}
if let Some(arr) = delivery_targets_raw.as_array() {
for (idx, t) in arr.iter().enumerate() {
if let Err(e) = serde_json::from_value::<
openfang_types::scheduler::CronDeliveryTarget,
>(t.clone())
if let Err(e) =
serde_json::from_value::<openfang_types::scheduler::CronDeliveryTarget>(t.clone())
{
return (
StatusCode::BAD_REQUEST,
Expand Down Expand Up @@ -9152,9 +9155,8 @@ pub async fn update_schedule(
let mut parsed: Vec<openfang_types::scheduler::CronDeliveryTarget> =
Vec::with_capacity(arr.len());
for (idx, t) in arr.iter().enumerate() {
match serde_json::from_value::<openfang_types::scheduler::CronDeliveryTarget>(
t.clone(),
) {
match serde_json::from_value::<openfang_types::scheduler::CronDeliveryTarget>(t.clone())
{
Ok(dt) => parsed.push(dt),
Err(e) => {
return (
Expand Down
19 changes: 12 additions & 7 deletions crates/openfang-api/tests/api_integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1156,7 +1156,10 @@ async fn test_commands_invalid_surface_400() {
assert_eq!(resp.status(), 400);
let body: serde_json::Value = resp.json().await.unwrap();
let err = body["error"].as_str().unwrap_or_default();
assert!(err.contains("bogus"), "error should mention the bad value: {err}");
assert!(
err.contains("bogus"),
"error should mention the bad value: {err}"
);
}

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -1211,7 +1214,10 @@ async fn test_schedules_delivery_targets_roundtrip() {
.unwrap();
assert_eq!(resp.status(), 201);
let body: serde_json::Value = resp.json().await.unwrap();
let sched_id = body["id"].as_str().expect("created schedule id").to_string();
let sched_id = body["id"]
.as_str()
.expect("created schedule id")
.to_string();
let got = body["delivery_targets"]
.as_array()
.expect("response must include delivery_targets");
Expand Down Expand Up @@ -1291,7 +1297,9 @@ async fn test_schedules_delivery_targets_update() {
let body: serde_json::Value = resp.json().await.unwrap();
assert_eq!(body["status"], "updated");
let echoed = &body["schedule"]["delivery_targets"];
let arr = echoed.as_array().expect("schedule.delivery_targets must be array");
let arr = echoed
.as_array()
.expect("schedule.delivery_targets must be array");
assert_eq!(arr.len(), 2);
assert_eq!(arr[0]["type"], "webhook");
assert_eq!(arr[1]["type"], "local_file");
Expand Down Expand Up @@ -1406,10 +1414,7 @@ async fn test_schedules_delivery_log_endpoint() {
.await
.unwrap();
assert_eq!(resp.status(), 201);
let sched_id = resp
.json::<serde_json::Value>()
.await
.unwrap()["id"]
let sched_id = resp.json::<serde_json::Value>().await.unwrap()["id"]
.as_str()
.unwrap()
.to_string();
Expand Down
16 changes: 5 additions & 11 deletions crates/openfang-api/tests/skill_config_api_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ async fn get_config_returns_declared_and_resolved() {
body["resolved"]["github_token"]["source"], "unresolved",
"github_token should be unresolved without env"
);
assert!(body["resolved"]["github_token"]["is_secret"].as_bool().unwrap());
assert!(body["resolved"]["github_token"]["is_secret"]
.as_bool()
.unwrap());

// default_branch falls back to default "main".
assert_eq!(body["resolved"]["default_branch"]["source"], "default");
Expand Down Expand Up @@ -255,10 +257,7 @@ async fn put_rejects_unknown_variable() {
.unwrap();
assert_eq!(resp.status(), 400);
let body: serde_json::Value = resp.json().await.unwrap();
assert!(body["error"]
.as_str()
.unwrap()
.contains("nonexistent_var"));
assert!(body["error"].as_str().unwrap().contains("nonexistent_var"));
}

#[tokio::test]
Expand Down Expand Up @@ -377,12 +376,7 @@ async fn put_reloads_registry_so_agents_see_change() {
.unwrap();

// The kernel's live override map must now hold the new values.
let guard = server
.state
.kernel
.skill_config_overrides
.read()
.unwrap();
let guard = server.state.kernel.skill_config_overrides.read().unwrap();
let overrides = guard.as_ref().expect("override map set after PUT");
let skill_cfg = overrides.get("test-config-skill").expect("skill present");
assert_eq!(skill_cfg.get("github_token").unwrap(), "ghp_new");
Expand Down
6 changes: 2 additions & 4 deletions crates/openfang-cli/src/tui/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2023,10 +2023,8 @@ impl App {
match canonical_head.as_str() {
"/exit" => self.handle_chat_action(chat::ChatAction::Back),
"/help" => {
self.chat.push_message(
chat::Role::System,
commands::render_help(Surfaces::CLI),
);
self.chat
.push_message(chat::Role::System, commands::render_help(Surfaces::CLI));
}
"/status" => {
let mut s = Vec::new();
Expand Down
8 changes: 2 additions & 6 deletions crates/openfang-cli/src/tui/screens/skills.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,7 @@ impl SkillsState {
if self.installed[sel].config_declared > 0 {
return SkillsAction::LoadSkillConfig(name);
} else {
self.status_msg =
format!("'{}' declares no runtime config.", name);
self.status_msg = format!("'{}' declares no runtime config.", name);
}
}
}
Expand Down Expand Up @@ -531,10 +530,7 @@ fn draw_skill_config_details(f: &mut Frame, area: Rect, state: &SkillsState) {

if rows.is_empty() {
f.render_widget(
Paragraph::new(Span::styled(
"No config declared.",
theme::dim_style(),
)),
Paragraph::new(Span::styled("No config declared.", theme::dim_style())),
inner,
);
return;
Expand Down
8 changes: 4 additions & 4 deletions crates/openfang-kernel/src/cron_delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,9 @@ mod tests {
auth_header: Some("Bearer test-token".to_string()),
};
let engine = test_engine(MockBridge::new());
let results = engine.deliver(&[target], "daily-report", "result body").await;
let results = engine
.deliver(&[target], "daily-report", "result body")
.await;

assert!(results[0].success, "error: {:?}", results[0].error);

Expand Down Expand Up @@ -732,8 +734,6 @@ mod tests {
}

fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option<usize> {
haystack
.windows(needle.len())
.position(|w| w == needle)
haystack.windows(needle.len()).position(|w| w == needle)
}
}
39 changes: 38 additions & 1 deletion crates/openfang-kernel/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
use crate::registry::AgentRegistry;
use chrono::Utc;
use dashmap::DashMap;
use openfang_types::agent::{AgentId, AgentState};
use openfang_types::agent::{AgentEntry, AgentId, AgentState, ScheduleMode};
use tracing::{debug, warn};

/// Default heartbeat check interval (seconds).
Expand Down Expand Up @@ -132,6 +132,14 @@ impl Default for RecoveryTracker {
/// and the initial `set_state(Running)` call.
const IDLE_GRACE_SECS: i64 = 10;

/// Reactive agents are healthy while idle between user messages.
///
/// They should only participate in heartbeat failure detection while a turn is
/// actively running. Otherwise silence is the expected steady state.
pub(crate) fn should_exempt_idle_reactive_agent(entry: &AgentEntry, is_running_task: bool) -> bool {
matches!(entry.manifest.schedule, ScheduleMode::Reactive) && !is_running_task
}

/// Check all running and crashed agents and return their heartbeat status.
///
/// This is a pure function — it doesn't start a background task.
Expand Down Expand Up @@ -376,6 +384,35 @@ mod tests {
);
}

#[test]
fn test_idle_reactive_agent_is_exempt_when_not_processing() {
let mut agent = make_entry(
"reactive-idle",
AgentState::Running,
Utc::now() - Duration::seconds(600),
Utc::now() - Duration::seconds(300),
);
agent.manifest.schedule = ScheduleMode::Reactive;

assert!(should_exempt_idle_reactive_agent(&agent, false));
assert!(!should_exempt_idle_reactive_agent(&agent, true));
}

#[test]
fn test_periodic_agent_is_not_exempt_when_idle() {
let mut agent = make_entry(
"periodic-idle",
AgentState::Running,
Utc::now() - Duration::seconds(600),
Utc::now() - Duration::seconds(300),
);
agent.manifest.schedule = ScheduleMode::Periodic {
cron: "0 * * * *".to_string(),
};

assert!(!should_exempt_idle_reactive_agent(&agent, false));
}

#[test]
fn test_active_agent_detected_unresponsive() {
// An agent that WAS active (last_active >> created_at) but has gone
Expand Down
41 changes: 29 additions & 12 deletions crates/openfang-kernel/src/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4481,7 +4481,10 @@ impl OpenFangKernel {
/// Periodically checks all running agents' last_active timestamps and
/// publishes `HealthCheckFailed` events for unresponsive agents.
fn start_heartbeat_monitor(self: &Arc<Self>) {
use crate::heartbeat::{check_agents, is_quiet_hours, HeartbeatConfig, RecoveryTracker};
use crate::heartbeat::{
check_agents, is_quiet_hours, should_exempt_idle_reactive_agent, HeartbeatConfig,
RecoveryTracker,
};

let kernel = Arc::clone(self);
let config = HeartbeatConfig {
Expand All @@ -4505,13 +4508,31 @@ impl OpenFangKernel {

let statuses = check_agents(&kernel.registry, &config);
for status in &statuses {
let Some(entry) = kernel.registry.get(status.agent_id) else {
continue;
};

// Reactive agents are expected to be silent while idle.
// Keep them in Running instead of treating normal quiet time
// as a crash unless a turn is actively executing.
if should_exempt_idle_reactive_agent(
&entry,
kernel.running_tasks.contains_key(&status.agent_id),
) {
if entry.state == AgentState::Crashed {
let _ = kernel
.registry
.set_state(status.agent_id, AgentState::Running);
}
recovery_tracker.reset(status.agent_id);
continue;
}

// Skip agents in quiet hours (per-agent config)
if let Some(entry) = kernel.registry.get(status.agent_id) {
if let Some(ref auto_cfg) = entry.manifest.autonomous {
if let Some(ref qh) = auto_cfg.quiet_hours {
if is_quiet_hours(qh) {
continue;
}
if let Some(ref auto_cfg) = entry.manifest.autonomous {
if let Some(ref qh) = auto_cfg.quiet_hours {
if is_quiet_hours(qh) {
continue;
}
}
}
Expand Down Expand Up @@ -6383,11 +6404,7 @@ struct KernelCronBridge {

#[async_trait]
impl openfang_channels::bridge::ChannelBridgeHandle for KernelCronBridge {
async fn send_message(
&self,
_agent_id: AgentId,
_message: &str,
) -> Result<String, String> {
async fn send_message(&self, _agent_id: AgentId, _message: &str) -> Result<String, String> {
Err("KernelCronBridge only supports send_channel_message".to_string())
}

Expand Down
3 changes: 1 addition & 2 deletions crates/openfang-skills/src/config_injection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,7 @@ pub fn render_config_block(resolved: &HashMap<String, String>) -> String {
let mut keys: Vec<&String> = resolved.keys().collect();
keys.sort();

let mut out =
String::from("[Skill config from ~/.openfang/config.toml:\n");
let mut out = String::from("[Skill config from ~/.openfang/config.toml:\n");
for key in keys {
let raw = &resolved[key];
let shown = redact_value(key, raw);
Expand Down