Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/openfang-api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ pub async fn build_router(
budget_config: Arc::new(tokio::sync::RwLock::new(kernel.config.budget.clone())),
});

// Start WS cron broadcaster — subscribes to kernel event bus and pushes
// cron job results to all connected WebSocket clients in real-time.
ws::start_ws_cron_broadcaster(kernel.clone());

// CORS: allow localhost origins by default. If API key is set, the API
// is protected anyway. For development, permissive CORS is convenient.
let cors = if state.kernel.config.api_key.trim().is_empty() {
Expand Down
169 changes: 167 additions & 2 deletions crates/openfang-api/src/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use axum::response::IntoResponse;
use dashmap::DashMap;
use futures::stream::SplitSink;
use futures::{SinkExt, StreamExt};
use openfang_kernel::OpenFangKernel;
use openfang_runtime::kernel_handle::KernelHandle;
use openfang_runtime::llm_driver::StreamEvent;
use openfang_runtime::llm_errors;
Expand All @@ -30,7 +31,7 @@ use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::sync::{Mutex, RwLock};
use tracing::{debug, info, warn};

/// Per-IP WebSocket connection tracker.
Expand Down Expand Up @@ -98,6 +99,62 @@ fn ws_tracker() -> &'static DashMap<IpAddr, AtomicUsize> {
TRACKER.get_or_init(DashMap::new)
}

/// Per-agent WebSocket sender entry.
struct WsSender {
sender: Arc<Mutex<SplitSink<WebSocket, Message>>>,
}

/// Global registry: agent_id → active WebSocket senders.
/// Uses RwLock for fine-grained read/write access to the sender list.
fn ws_agent_connections() -> &'static DashMap<AgentId, RwLock<Vec<WsSender>>> {
static REGISTRY: std::sync::OnceLock<DashMap<AgentId, RwLock<Vec<WsSender>>>> =
std::sync::OnceLock::new();
REGISTRY.get_or_init(DashMap::new)
}

/// Register a WebSocket connection for an agent (async).
pub async fn register_ws_connection(
agent_id: AgentId,
sender: Arc<Mutex<SplitSink<WebSocket, Message>>>,
) {
let entry = ws_agent_connections().entry(agent_id).or_default();
let mut senders = entry.value().write().await;
senders.push(WsSender { sender });
}

/// Deregister a WebSocket connection for an agent.
/// Returns the number of remaining connections for this agent.
pub async fn deregister_ws_connection(
agent_id: AgentId,
sender: &Arc<Mutex<SplitSink<WebSocket, Message>>>,
) -> usize {
let entry = match ws_agent_connections().get(&agent_id) {
Some(e) => e,
None => return 0,
};
let mut senders = entry.value().write().await;
senders.retain(|s| !Arc::ptr_eq(&s.sender, sender));
senders.len()
}

/// Broadcast a JSON message to all active WebSocket connections for an agent.
/// Returns the number of connections the message was sent to.
pub async fn broadcast_to_ws(agent_id: AgentId, msg: serde_json::Value) -> usize {
let entry = match ws_agent_connections().get(&agent_id) {
Some(e) => e,
None => return 0,
};
let senders = entry.value().read().await;
let mut success_count = 0;
for ws_sender in senders.iter() {
let sender = &ws_sender.sender;
if send_json(sender, &msg).await.is_ok() {
success_count += 1;
}
}
success_count
}

/// RAII guard that decrements the connection count on drop.
struct WsConnectionGuard {
ip: IpAddr,
Expand Down Expand Up @@ -264,6 +321,9 @@ async fn handle_agent_ws(
let (sender, mut receiver) = socket.split();
let sender = Arc::new(Mutex::new(sender));

// Register this connection in the global agent-WS registry
register_ws_connection(agent_id, Arc::clone(&sender)).await;

// Per-connection verbose level (default: Full)
let verbose = Arc::new(AtomicU8::new(VerboseLevel::Full as u8));

Expand Down Expand Up @@ -416,7 +476,8 @@ async fn handle_agent_ws(
}
}

// Cleanup
// Cleanup: deregister from agent-WS registry and abort background tasks
deregister_ws_connection(agent_id, &sender).await;
update_handle.abort();
info!(agent_id = %id_str, "WebSocket disconnected");
}
Expand Down Expand Up @@ -1333,6 +1394,110 @@ pub fn strip_think_tags(text: &str) -> String {
result
}

// ---------------------------------------------------------------------------
// Cron Job WS Broadcasting
// ---------------------------------------------------------------------------

/// Start a background task that subscribes to the kernel's event bus and
/// broadcasts cron job results to all connected WebSocket clients for the
/// relevant agent.
///
/// This runs independently of the channel bridge — it uses the kernel's
/// event bus to receive `CronJobExecuted` events and pushes them to WS.
pub fn start_ws_cron_broadcaster(kernel: Arc<OpenFangKernel>) {
tokio::spawn(async move {
let mut rx = kernel.event_bus.subscribe_all();
loop {
let event = rx.recv().await;
match event {
Ok(event) => {
if let openfang_types::event::EventPayload::System(
openfang_types::event::SystemEvent::CronJobExecuted {
agent_id,
job_id,
job_name,
trigger_message,
response,
delivered_to_channel: _,
},
) = event.payload
{
// Build the trigger message (synthetic user message from cron)
let trigger_msg = serde_json::json!({
"type": "message",
"content": trigger_message,
"source": "cron",
"job_id": job_id,
"job_name": job_name
});
let _ = broadcast_to_ws(agent_id, trigger_msg).await;

// Send typing start
let _ = broadcast_to_ws(
agent_id,
serde_json::json!({"state": "start", "type": "typing"}),
)
.await;

// Send streaming phase
let _ = broadcast_to_ws(
agent_id,
serde_json::json!({"detail": null, "phase": "streaming", "type": "phase"}),
)
.await;

// Send text delta (full response since we don't have streaming chunks)
let text_delta = serde_json::json!({
"content": response,
"type": "text_delta"
});
let _ = broadcast_to_ws(agent_id, text_delta).await;

// Send done phase
let _ = broadcast_to_ws(
agent_id,
serde_json::json!({"detail": null, "phase": "done", "type": "phase"}),
)
.await;

// Send typing stop
let _ = broadcast_to_ws(
agent_id,
serde_json::json!({"state": "stop", "type": "typing"}),
)
.await;

// Send final response (mimics the format from agent_loop)
let response_msg = serde_json::json!({
"type": "response",
"content": response,
"context_pressure": "low",
"cost_usd": null,
"input_tokens": 0,
"iterations": 0,
"output_tokens": 0
});
let _ = broadcast_to_ws(agent_id, response_msg).await;

info!(
agent_id = %agent_id,
job_id = %job_id,
"Cron job result broadcast to WS"
);
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
warn!(lagged_messages = n, "WS cron broadcaster lagged, skipping");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
info!("WS cron broadcaster channel closed, stopping");
break;
}
}
}
});
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
Expand Down
9 changes: 9 additions & 0 deletions crates/openfang-api/static/js/pages/chat.js
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,15 @@ function chatPage() {
switch (data.type) {
case 'connected': break;

// Incoming message from server (e.g., cron trigger) — display as user message
case 'message':
if (data.content) {
var meta = data.source === 'cron' ? '[Scheduled: ' + (data.job_name || data.job_id || '') + ']' : '';
this.messages.push({ id: ++msgId, role: 'user', text: data.content, meta: meta, tools: [], images: [], ts: Date.now() });
this.scrollToBottom();
}
break;

// Legacy thinking event (backward compat)
case 'thinking':
if (!this.messages.length || !this.messages[this.messages.length - 1].thinking) {
Expand Down
66 changes: 47 additions & 19 deletions crates/openfang-kernel/src/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5966,17 +5966,31 @@ impl OpenFangKernel {
// Multi-destination fan-out (never aborts the job on delivery error).
cron_fan_out_targets(self, job_name, &result.response, &delivery_targets)
.await;
match cron_deliver_response(self, agent_id, &result.response, &delivery)
let delivered_to_channel = cron_deliver_response(self, agent_id, &result.response, &delivery)
.await
{
Ok(()) => {
self.cron_scheduler.record_success(job_id);
Ok(result.response)
}
Err(e) => {
self.cron_scheduler.record_failure(job_id, &e);
Err(e)
}
.is_ok();
// Publish event for WS broadcast (API layer subscribes and pushes to WebSocket connections).
let cron_event = Event::new(
AgentId::new(),
EventTarget::System,
EventPayload::System(SystemEvent::CronJobExecuted {
agent_id,
job_id: job_id.to_string(),
job_name: job_name.clone(),
trigger_message: message.clone(),
response: result.response.clone(),
delivered_to_channel,
}),
);
self.publish_event(cron_event).await;
// Note: WS broadcast happens regardless of channel delivery success/failure.
// Channel delivery failure is recorded as a job failure.
if delivered_to_channel {
self.cron_scheduler.record_success(job_id);
Ok(result.response)
} else {
self.cron_scheduler.record_failure(job_id, "channel delivery failed");
Err("channel delivery failed".to_string())
}
}
Ok(Err(e)) => {
Expand Down Expand Up @@ -6020,15 +6034,29 @@ impl OpenFangKernel {
Ok(Ok((_run_id, output))) => {
// Multi-destination fan-out (never aborts the job on delivery error).
cron_fan_out_targets(self, job_name, &output, &delivery_targets).await;
match cron_deliver_response(self, agent_id, &output, &delivery).await {
Ok(()) => {
self.cron_scheduler.record_success(job_id);
Ok(output)
}
Err(e) => {
self.cron_scheduler.record_failure(job_id, &e);
Err(e)
}
let delivered_to_channel = cron_deliver_response(self, agent_id, &output, &delivery)
.await
.is_ok();
// Publish event for WS broadcast (API layer subscribes and pushes to WebSocket connections).
let cron_event = Event::new(
AgentId::new(),
EventTarget::System,
EventPayload::System(SystemEvent::CronJobExecuted {
agent_id,
job_id: job_id.to_string(),
job_name: job_name.clone(),
trigger_message: format!("workflow: {}", workflow_id),
response: output.clone(),
delivered_to_channel,
}),
);
self.publish_event(cron_event).await;
if delivered_to_channel {
self.cron_scheduler.record_success(job_id);
Ok(output)
} else {
self.cron_scheduler.record_failure(job_id, "channel delivery failed");
Err("channel delivery failed".to_string())
}
}
Ok(Err(e)) => {
Expand Down
8 changes: 8 additions & 0 deletions crates/openfang-kernel/src/triggers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,14 @@ fn describe_event(event: &Event) -> String {
"Health check failed: agent {agent_id}, unresponsive for {unresponsive_secs}s"
)
}
SystemEvent::CronJobExecuted {
agent_id,
job_id,
job_name,
..
} => {
format!("Cron job executed: {job_name} ({job_id}) for agent {agent_id}")
}
},
EventPayload::Custom(data) => {
format!("Custom event ({} bytes)", data.len())
Expand Down
8 changes: 4 additions & 4 deletions crates/openfang-migrate/src/openclaw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,10 +896,10 @@ fn derive_capabilities(tools: &[String]) -> AgentCapabilities {
"shell_exec" => {
caps.shell = vec!["*".to_string()];
}
"web_fetch" | "web_search" | "browser_navigate" => {
if caps.network.is_empty() {
caps.network = vec!["*".to_string()];
}
"web_fetch" | "web_search" | "browser_navigate"
if caps.network.is_empty() =>
{
caps.network = vec!["*".to_string()];
}
"agent_send" | "agent_list" => {
if caps.agent_message.is_empty() {
Expand Down
15 changes: 15 additions & 0 deletions crates/openfang-types/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,21 @@ pub enum SystemEvent {
/// How long the agent has been unresponsive.
unresponsive_secs: u64,
},
/// A scheduled cron job was executed and produced a result.
CronJobExecuted {
/// The agent that ran the job.
agent_id: AgentId,
/// The job's unique ID.
job_id: String,
/// The job's display name.
job_name: String,
/// The trigger message sent to the agent.
trigger_message: String,
/// The agent's text response.
response: String,
/// Whether the result was delivered to a channel (e.g. Telegram).
delivered_to_channel: bool,
},
}

/// A complete event in the OpenFang event system.
Expand Down