Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion agents/assistant/agent.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,4 @@ agent_message = ["*"]
shell = ["python *", "cargo *", "git *", "npm *"]

[autonomous]
max_iterations = 100
heartbeat_interval_secs = 300max_iterations = 100
55 changes: 44 additions & 11 deletions crates/openfang-channels/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,11 +416,18 @@ impl BridgeManager {

let task = tokio::spawn(async move {
let mut stream = std::pin::pin!(stream);
let mut bridge_iter: u64 = 0;
info!("Bridge adapter loop started for {}", adapter_clone.name());
loop {
bridge_iter += 1;
if bridge_iter % 100 == 1 {
info!("Bridge adapter {} loop alive, iter={bridge_iter}", adapter_clone.name());
}
tokio::select! {
msg = stream.next() => {
match msg {
Some(message) => {
info!("Bridge: received message from {} on {}", message.sender.display_name, adapter_clone.name());
// Spawn each dispatch as a concurrent task so the stream
// loop is never blocked by slow LLM calls. The kernel's
// per-agent lock ensures session integrity.
Expand Down Expand Up @@ -451,10 +458,19 @@ impl BridgeManager {
}
}
}
_ = shutdown.changed() => {
if *shutdown.borrow() {
info!("Shutting down channel adapter {}", adapter_clone.name());
break;
result = shutdown.changed() => {
match result {
Ok(()) => {
if *shutdown.borrow() {
info!("Shutting down channel adapter {} (shutdown=true)", adapter_clone.name());
break;
}
info!("Bridge: shutdown.changed() fired but value is false for {}, continuing", adapter_clone.name());
}
Err(e) => {
warn!("Bridge: shutdown channel error for {} (sender dropped): {e}", adapter_clone.name());
break;
}
}
}
}
Expand Down Expand Up @@ -630,9 +646,11 @@ async fn dispatch_message(
rate_limiter: &ChannelRateLimiter,
) {
let ct_str = channel_type_str(&message.channel);
info!("dispatch_message: start for sender={} channel={ct_str} is_group={}", message.sender.display_name, message.is_group);

// Fetch per-channel overrides (if configured)
let overrides = handle.channel_overrides(ct_str).await;
info!("dispatch_message: overrides={}", overrides.is_some());
let channel_default_format = default_output_format_for_channel(ct_str);
let output_format = overrides
.as_ref()
Expand All @@ -651,10 +669,11 @@ async fn dispatch_message(

// --- DM/Group policy check ---
if let Some(ref ov) = overrides {
info!("dispatch_message: policy check - is_group={} group_policy={:?} dm_policy={:?}", message.is_group, ov.group_policy, ov.dm_policy);
if message.is_group {
match ov.group_policy {
GroupPolicy::Ignore => {
debug!("Ignoring group message on {ct_str} (group_policy=ignore)");
warn!("dispatch_message: DROPPED by group_policy=ignore");
return;
}
GroupPolicy::CommandsOnly => {
Expand Down Expand Up @@ -874,32 +893,41 @@ async fn dispatch_message(
}

// Route to agent (standard path)
info!("dispatch_message: resolving agent via router for channel={:?} peer={}", message.channel, message.sender.platform_id);
let agent_id = router.resolve(
&message.channel,
&message.sender.platform_id,
message.sender.openfang_user.as_deref(),
);
info!("dispatch_message: router.resolve returned {:?}", agent_id);

let agent_id = match agent_id {
Some(id) => id,
Some(id) => {
info!("dispatch_message: routed to agent {id}");
id
}
None => {
info!("dispatch_message: no route found, trying fallback to 'assistant'");
// Fallback: try "assistant" agent, then first available agent
let fallback = handle.find_agent_by_name("assistant").await.ok().flatten();
info!("dispatch_message: find_agent_by_name('assistant') = {:?}", fallback);
let fallback = match fallback {
Some(id) => Some(id),
None => handle
.list_agents()
.await
.ok()
.and_then(|agents| agents.first().map(|(id, _)| *id)),
None => {
let agents = handle.list_agents().await.ok();
info!("dispatch_message: list_agents = {:?}", agents.as_ref().map(|a| a.len()));
agents.and_then(|agents| agents.first().map(|(id, _)| *id))
}
};
match fallback {
Some(id) => {
info!("dispatch_message: fallback agent = {id}");
// Auto-set this as the user's default so future messages route directly
router.set_user_default(message.sender.platform_id.clone(), id);
id
}
None => {
warn!("dispatch_message: NO agents available at all");
send_response(
adapter,
&message.sender,
Expand All @@ -914,10 +942,12 @@ async fn dispatch_message(
};

// RBAC: authorize the user before forwarding to agent
info!("dispatch_message: RBAC check for user={} action=chat", sender_user_id(message));
if let Err(denied) = handle
.authorize_channel_user(ct_str, sender_user_id(message), "chat")
.await
{
warn!("dispatch_message: RBAC denied: {denied}");
send_response(
adapter,
&message.sender,
Expand All @@ -928,6 +958,7 @@ async fn dispatch_message(
.await;
return;
}
info!("dispatch_message: RBAC passed, sending to agent {agent_id}");

// Build channel key for re-resolution lookups
let channel_key = format!("{:?}", message.channel);
Expand Down Expand Up @@ -980,7 +1011,9 @@ async fn dispatch_message(
};

// Send to agent and relay response
info!("dispatch_message: calling handle.send_message(agent={agent_id}, text_len={})", prefixed_text.len());
let result = handle.send_message(agent_id, &prefixed_text).await;
info!("dispatch_message: send_message result is_ok={}", result.is_ok());

// Stop the typing refresh now that we have a response
typing_task.abort();
Expand Down
Loading