Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 42 additions & 30 deletions src/agent/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,10 @@ impl Channel {
_ = tokio::time::sleep(sleep_duration), if next_deadline.is_some() => {
let now = tokio::time::Instant::now();
// Check coalesce deadline
if self.coalesce_deadline.is_some_and(|d| d <= now) {
if let Err(error) = self.flush_coalesce_buffer().await {
tracing::error!(%error, channel_id = %self.id, "error flushing coalesce buffer on deadline");
}
if self.coalesce_deadline.is_some_and(|d| d <= now)
&& let Err(error) = self.flush_coalesce_buffer().await
{
tracing::error!(%error, channel_id = %self.id, "error flushing coalesce buffer on deadline");
}
// Check retrigger deadline
if self.retrigger_deadline.is_some_and(|d| d <= now) {
Expand Down Expand Up @@ -391,7 +391,10 @@ impl Channel {

if messages.len() == 1 {
// Single message - process normally
let message = messages.into_iter().next().ok_or_else(|| anyhow::anyhow!("empty iterator after length check"))?;
let message = messages
.into_iter()
.next()
.ok_or_else(|| anyhow::anyhow!("empty iterator after length check"))?;
self.handle_message(message).await
} else {
// Multiple messages - batch them
Expand Down Expand Up @@ -462,10 +465,11 @@ impl Channel {
.get("telegram_chat_type")
.and_then(|v| v.as_str())
});
self.conversation_context = Some(
prompt_engine
.render_conversation_context(&first.source, server_name, channel_name)?,
);
self.conversation_context = Some(prompt_engine.render_conversation_context(
&first.source,
server_name,
channel_name,
)?);
}

// Persist each message to conversation log (individual audit trail)
Expand Down Expand Up @@ -605,8 +609,11 @@ impl Channel {
let browser_enabled = rc.browser_config.load().enabled;
let web_search_enabled = rc.brave_search_key.load().is_some();
let opencode_enabled = rc.opencode.load().enabled;
let worker_capabilities =
prompt_engine.render_worker_capabilities(browser_enabled, web_search_enabled, opencode_enabled)?;
let worker_capabilities = prompt_engine.render_worker_capabilities(
browser_enabled,
web_search_enabled,
opencode_enabled,
)?;

let status_text = {
let status = self.state.status_block.read().await;
Expand Down Expand Up @@ -712,10 +719,11 @@ impl Channel {
.get("telegram_chat_type")
.and_then(|v| v.as_str())
});
self.conversation_context = Some(
prompt_engine
.render_conversation_context(&message.source, server_name, channel_name)?,
);
self.conversation_context = Some(prompt_engine.render_conversation_context(
&message.source,
server_name,
channel_name,
)?);
}

let system_prompt = self.build_system_prompt().await?;
Expand Down Expand Up @@ -802,8 +810,11 @@ impl Channel {
let browser_enabled = rc.browser_config.load().enabled;
let web_search_enabled = rc.brave_search_key.load().is_some();
let opencode_enabled = rc.opencode.load().enabled;
let worker_capabilities = prompt_engine
.render_worker_capabilities(browser_enabled, web_search_enabled, opencode_enabled)?;
let worker_capabilities = prompt_engine.render_worker_capabilities(
browser_enabled,
web_search_enabled,
opencode_enabled,
)?;

let status_text = {
let status = self.state.status_block.read().await;
Expand All @@ -814,17 +825,16 @@ impl Channel {

let empty_to_none = |s: String| if s.is_empty() { None } else { Some(s) };

prompt_engine
.render_channel_prompt(
empty_to_none(identity_context),
empty_to_none(memory_bulletin.to_string()),
empty_to_none(skills_prompt),
worker_capabilities,
self.conversation_context.clone(),
empty_to_none(status_text),
None, // coalesce_hint - only set for batched messages
available_channels,
)
prompt_engine.render_channel_prompt(
empty_to_none(identity_context),
empty_to_none(memory_bulletin.to_string()),
empty_to_none(skills_prompt),
worker_capabilities,
self.conversation_context.clone(),
empty_to_none(status_text),
None, // coalesce_hint - only set for batched messages
available_channels,
)
}

/// Register per-turn tools, run the LLM agentic loop, and clean up.
Expand Down Expand Up @@ -1147,8 +1157,10 @@ impl Channel {
for (key, value) in retrigger_metadata {
self.pending_retrigger_metadata.insert(key, value);
}
self.retrigger_deadline =
Some(tokio::time::Instant::now() + std::time::Duration::from_millis(RETRIGGER_DEBOUNCE_MS));
self.retrigger_deadline = Some(
tokio::time::Instant::now()
+ std::time::Duration::from_millis(RETRIGGER_DEBOUNCE_MS),
);
}
}

Expand Down
34 changes: 23 additions & 11 deletions src/agent/cortex_chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ impl<M: CompletionModel> PromptHook<M> for CortexChatHook {
) -> ToolCallHookAction {
self.send(CortexChatEvent::ToolStarted {
tool: tool_name.to_string(),
}).await;
})
.await;
ToolCallHookAction::Continue
}

Expand All @@ -95,7 +96,8 @@ impl<M: CompletionModel> PromptHook<M> for CortexChatHook {
self.send(CortexChatEvent::ToolCompleted {
tool: tool_name.to_string(),
result_preview: preview,
}).await;
})
.await;
HookAction::Continue
}

Expand Down Expand Up @@ -295,26 +297,33 @@ impl CortexChatSession {
let _ = store
.save_message(&thread_id, "assistant", &response, channel_ref)
.await;
let _ = event_tx.send(CortexChatEvent::Done {
full_text: response,
}).await;
let _ = event_tx
.send(CortexChatEvent::Done {
full_text: response,
})
.await;
}
Err(error) => {
let error_text = format!("Cortex chat error: {error}");
let _ = store
.save_message(&thread_id, "assistant", &error_text, channel_ref)
.await;
let _ = event_tx.send(CortexChatEvent::Error {
message: error_text,
}).await;
let _ = event_tx
.send(CortexChatEvent::Error {
message: error_text,
})
.await;
}
}
});

Ok(event_rx)
}

async fn build_system_prompt(&self, channel_context_id: Option<&str>) -> crate::error::Result<String> {
async fn build_system_prompt(
&self,
channel_context_id: Option<&str>,
) -> crate::error::Result<String> {
let runtime_config = &self.deps.runtime_config;
let prompt_engine = runtime_config.prompts.load();

Expand All @@ -324,8 +333,11 @@ impl CortexChatSession {
let browser_enabled = runtime_config.browser_config.load().enabled;
let web_search_enabled = runtime_config.brave_search_key.load().is_some();
let opencode_enabled = runtime_config.opencode.load().enabled;
let worker_capabilities =
prompt_engine.render_worker_capabilities(browser_enabled, web_search_enabled, opencode_enabled)?;
let worker_capabilities = prompt_engine.render_worker_capabilities(
browser_enabled,
web_search_enabled,
opencode_enabled,
)?;

// Load channel transcript if a channel context is active
let channel_transcript = if let Some(channel_id) = channel_context_id {
Expand Down
8 changes: 5 additions & 3 deletions src/agent/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,10 @@ impl Worker {
None
}
})
.unwrap_or_else(|| "Worker reached maximum segments without a final response.".to_string());
.unwrap_or_else(|| {
"Worker reached maximum segments without a final response."
.to_string()
});
}

self.maybe_compact_history(&mut history).await;
Expand Down Expand Up @@ -358,8 +361,7 @@ impl Worker {
self.hook.send_status("compacting (overflow recovery)");
self.force_compact_history(&mut history).await;
let prompt_engine = self.deps.runtime_config.prompts.load();
let overflow_msg =
prompt_engine.render_system_worker_overflow()?;
let overflow_msg = prompt_engine.render_system_worker_overflow()?;
follow_up_prompt = format!("{follow_up}\n\n{overflow_msg}");
}
Err(error) => {
Expand Down
32 changes: 10 additions & 22 deletions src/api/bindings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,12 +377,10 @@ pub(super) async fn create_binding(
Some(existing) => existing.clone(),
None => {
drop(perms_guard);
let Some(discord_config) = new_config
.messaging
.discord
.as_ref()
else {
tracing::error!("discord config missing despite token being provided");
let Some(discord_config) = new_config.messaging.discord.as_ref() else {
tracing::error!(
"discord config missing despite token being provided"
);
return Err(StatusCode::INTERNAL_SERVER_ERROR);
};
let perms = crate::config::DiscordPermissions::from_config(
Expand All @@ -409,12 +407,10 @@ pub(super) async fn create_binding(
Some(existing) => existing.clone(),
None => {
drop(perms_guard);
let Some(slack_config) = new_config
.messaging
.slack
.as_ref()
else {
tracing::error!("slack config missing despite tokens being provided");
let Some(slack_config) = new_config.messaging.slack.as_ref() else {
tracing::error!(
"slack config missing despite tokens being provided"
);
return Err(StatusCode::INTERNAL_SERVER_ERROR);
};
let perms = crate::config::SlackPermissions::from_config(
Expand Down Expand Up @@ -453,11 +449,7 @@ pub(super) async fn create_binding(

if let Some(token) = new_telegram_token {
let telegram_perms = {
let Some(telegram_config) = new_config
.messaging
.telegram
.as_ref()
else {
let Some(telegram_config) = new_config.messaging.telegram.as_ref() else {
tracing::error!("telegram config missing despite token being provided");
return Err(StatusCode::INTERNAL_SERVER_ERROR);
};
Expand All @@ -475,11 +467,7 @@ pub(super) async fn create_binding(
}

if let Some((username, oauth_token)) = new_twitch_creds {
let Some(twitch_config) = new_config
.messaging
.twitch
.as_ref()
else {
let Some(twitch_config) = new_config.messaging.twitch.as_ref() else {
tracing::error!("twitch config missing despite credentials being provided");
return Err(StatusCode::INTERNAL_SERVER_ERROR);
};
Expand Down
8 changes: 2 additions & 6 deletions src/api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use axum::middleware::{self, Next};
use axum::response::{Html, IntoResponse, Response};
use axum::routing::{delete, get, post, put};
use rust_embed::Embed;
use tower_http::cors::CorsLayer;
use serde_json::json;
use tower_http::cors::CorsLayer;

use std::net::SocketAddr;
use std::sync::Arc;
Expand Down Expand Up @@ -45,11 +45,7 @@ pub async fn start_http_server(
axum::http::Method::DELETE,
axum::http::Method::OPTIONS,
])
.allow_headers([
header::CONTENT_TYPE,
header::AUTHORIZATION,
header::ACCEPT,
]);
.allow_headers([header::CONTENT_TYPE, header::AUTHORIZATION, header::ACCEPT]);

let api_routes = Router::new()
.route("/health", get(system::health))
Expand Down
Loading