Skip to content

Commit 0a7ec0b

Browse files
authored
Merge pull request #103 from nexxeln/feat/mcp-client-integration
add mcp client support for workers
2 parents a37bcf4 + 0741f6d commit 0a7ec0b

File tree

16 files changed

+1459
-30
lines changed

16 files changed

+1459
-30
lines changed

Cargo.lock

Lines changed: 380 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ pin-project = "1"
7474

7575
# Schema validation
7676
schemars = "0.8"
77+
rmcp = { version = "0.16", features = ["client", "reqwest", "transport-child-process", "transport-streamable-http-client", "transport-streamable-http-client-reqwest"] }
7778

7879
# Command line (for main.rs)
7980
clap = { version = "4.5", features = ["derive"] }

src/agent/worker.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,8 @@ impl Worker {
185185

186186
tracing::info!(worker_id = %self.id, task = %self.task, "worker starting");
187187

188+
let mcp_tools = self.deps.mcp_manager.get_tools().await;
189+
188190
// Create per-worker ToolServer with task tools
189191
let worker_tool_server = crate::tools::create_worker_tool_server(
190192
self.deps.agent_id.clone(),
@@ -196,6 +198,7 @@ impl Worker {
196198
self.brave_search_key.clone(),
197199
self.deps.runtime_config.workspace_dir.clone(),
198200
self.deps.runtime_config.instance_dir.clone(),
201+
mcp_tools,
199202
);
200203

201204
let routing = self.deps.runtime_config.routing.load();

src/api/agents.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,22 @@ pub(super) struct DeleteAgentQuery {
134134
agent_id: String,
135135
}
136136

137+
#[derive(Deserialize)]
138+
pub(super) struct AgentMcpQuery {
139+
agent_id: String,
140+
}
141+
142+
#[derive(Deserialize)]
143+
pub(super) struct ReconnectMcpRequest {
144+
agent_id: String,
145+
server_name: String,
146+
}
147+
148+
#[derive(Serialize)]
149+
pub(super) struct AgentMcpResponse {
150+
servers: Vec<crate::mcp::McpServerStatus>,
151+
}
152+
137153
/// List all configured agents with their config summaries.
138154
pub(super) async fn list_agents(State(state): State<Arc<ApiState>>) -> Json<AgentsResponse> {
139155
let agents = state.agent_configs.load();
@@ -142,6 +158,51 @@ pub(super) async fn list_agents(State(state): State<Arc<ApiState>>) -> Json<Agen
142158
})
143159
}
144160

161+
/// List MCP connection status for an agent.
162+
pub(super) async fn list_agent_mcp(
163+
State(state): State<Arc<ApiState>>,
164+
Query(query): Query<AgentMcpQuery>,
165+
) -> Result<Json<AgentMcpResponse>, StatusCode> {
166+
let managers = state.mcp_managers.load();
167+
let manager = managers
168+
.get(&query.agent_id)
169+
.cloned()
170+
.ok_or(StatusCode::NOT_FOUND)?;
171+
let servers = manager.statuses().await;
172+
Ok(Json(AgentMcpResponse { servers }))
173+
}
174+
175+
/// Force reconnect for a single MCP server on an agent.
176+
pub(super) async fn reconnect_agent_mcp(
177+
State(state): State<Arc<ApiState>>,
178+
Json(request): Json<ReconnectMcpRequest>,
179+
) -> Result<Json<serde_json::Value>, StatusCode> {
180+
let managers = state.mcp_managers.load();
181+
let manager = managers
182+
.get(&request.agent_id)
183+
.cloned()
184+
.ok_or(StatusCode::NOT_FOUND)?;
185+
186+
manager
187+
.reconnect(&request.server_name)
188+
.await
189+
.map_err(|error| {
190+
tracing::warn!(
191+
%error,
192+
agent_id = %request.agent_id,
193+
server_name = %request.server_name,
194+
"failed to reconnect mcp server"
195+
);
196+
StatusCode::BAD_REQUEST
197+
})?;
198+
199+
Ok(Json(serde_json::json!({
200+
"success": true,
201+
"agent_id": request.agent_id,
202+
"server_name": request.server_name
203+
})))
204+
}
205+
145206
/// Create a new agent and initialize it live (directories, databases, memory, identity, cron, cortex).
146207
pub(super) async fn create_agent(
147208
State(state): State<Arc<ApiState>>,
@@ -233,6 +294,7 @@ pub(super) async fn create_agent(
233294
ingestion: None,
234295
cortex: None,
235296
browser: None,
297+
mcp: None,
236298
brave_search_key: None,
237299
cron: Vec::new(),
238300
};
@@ -354,10 +416,14 @@ pub(super) async fn create_agent(
354416
.clone()
355417
};
356418

419+
let mcp_manager = std::sync::Arc::new(crate::mcp::McpManager::new(agent_config.mcp.clone()));
420+
mcp_manager.connect_all().await;
421+
357422
let deps = crate::AgentDeps {
358423
agent_id: arc_agent_id.clone(),
359424
memory_search: memory_search.clone(),
360425
llm_manager,
426+
mcp_manager: mcp_manager.clone(),
361427
cron_tool: None,
362428
runtime_config: runtime_config.clone(),
363429
event_tx: event_tx.clone(),
@@ -455,6 +521,10 @@ pub(super) async fn create_agent(
455521
configs.insert(agent_id.clone(), runtime_config);
456522
state.runtime_configs.store(std::sync::Arc::new(configs));
457523

524+
let mut mcp_managers = (**state.mcp_managers.load()).clone();
525+
mcp_managers.insert(agent_id.clone(), mcp_manager);
526+
state.mcp_managers.store(std::sync::Arc::new(mcp_managers));
527+
458528
let mut agent_infos = (**state.agent_configs.load()).clone();
459529
agent_infos.push(AgentInfo {
460530
id: agent_config.id.clone(),
@@ -567,6 +637,12 @@ pub(super) async fn delete_agent(
567637

568638
// Remove from all API state maps
569639
{
640+
let mut mcp_managers = (**state.mcp_managers.load()).clone();
641+
if let Some(mcp_manager) = mcp_managers.remove(&agent_id) {
642+
mcp_manager.disconnect_all().await;
643+
}
644+
state.mcp_managers.store(std::sync::Arc::new(mcp_managers));
645+
570646
let mut pools = (**state.agent_pools.load()).clone();
571647
pools.remove(&agent_id);
572648
state.agent_pools.store(std::sync::Arc::new(pools));

src/api/config.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,8 +329,13 @@ pub(super) async fn update_agent_config(
329329
match crate::config::Config::load_from_path(&config_path) {
330330
Ok(new_config) => {
331331
let runtime_configs = state.runtime_configs.load();
332-
if let Some(rc) = runtime_configs.get(&request.agent_id) {
333-
rc.reload_config(&new_config, &request.agent_id);
332+
let mcp_managers = state.mcp_managers.load();
333+
if let (Some(rc), Some(mcp_manager)) = (
334+
runtime_configs.get(&request.agent_id).cloned(),
335+
mcp_managers.get(&request.agent_id).cloned(),
336+
) {
337+
rc.reload_config(&new_config, &request.agent_id, &mcp_manager)
338+
.await;
334339
}
335340
if request.discord.is_some()
336341
&& let Some(discord_config) = &new_config.messaging.discord

src/api/server.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ pub async fn start_http_server(
5151
.post(agents::create_agent)
5252
.delete(agents::delete_agent),
5353
)
54+
.route("/agents/mcp", get(agents::list_agent_mcp))
55+
.route("/agents/mcp/reconnect", post(agents::reconnect_agent_mcp))
5456
.route("/agents/overview", get(agents::agent_overview))
5557
.route("/channels", get(channels::list_channels))
5658
.route("/channels/messages", get(channels::channel_messages))

src/api/settings.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -431,8 +431,26 @@ pub(super) async fn update_raw_config(
431431
match crate::config::Config::load_from_path(&config_path) {
432432
Ok(new_config) => {
433433
let runtime_configs = state.runtime_configs.load();
434-
for (agent_id, rc) in runtime_configs.iter() {
435-
rc.reload_config(&new_config, agent_id);
434+
let mcp_managers = state.mcp_managers.load();
435+
let reload_targets = runtime_configs
436+
.iter()
437+
.filter_map(|(agent_id, runtime_config)| {
438+
mcp_managers.get(agent_id).map(|mcp_manager| {
439+
(
440+
agent_id.clone(),
441+
runtime_config.clone(),
442+
mcp_manager.clone(),
443+
)
444+
})
445+
})
446+
.collect::<Vec<_>>();
447+
drop(runtime_configs);
448+
drop(mcp_managers);
449+
450+
for (agent_id, runtime_config, mcp_manager) in reload_targets {
451+
runtime_config
452+
.reload_config(&new_config, &agent_id, &mcp_manager)
453+
.await;
436454
}
437455
}
438456
Err(error) => {

src/api/state.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::agent::status::StatusBlock;
66
use crate::config::{Binding, DefaultsConfig, DiscordPermissions, RuntimeConfig, SlackPermissions};
77
use crate::cron::{CronStore, Scheduler};
88
use crate::llm::LlmManager;
9+
use crate::mcp::McpManager;
910
use crate::memory::{EmbeddingModel, MemorySearch};
1011
use crate::messaging::MessagingManager;
1112
use crate::messaging::webchat::WebChatAdapter;
@@ -61,6 +62,8 @@ pub struct ApiState {
6162
pub cron_schedulers: arc_swap::ArcSwap<HashMap<String, Arc<Scheduler>>>,
6263
/// Per-agent RuntimeConfig for reading live hot-reloaded configuration.
6364
pub runtime_configs: ArcSwap<HashMap<String, Arc<RuntimeConfig>>>,
65+
/// Per-agent MCP managers for status and reconnect APIs.
66+
pub mcp_managers: ArcSwap<HashMap<String, Arc<McpManager>>>,
6467
/// Shared reference to the Discord permissions ArcSwap (same instance used by the adapter and file watcher).
6568
pub discord_permissions: RwLock<Option<Arc<ArcSwap<DiscordPermissions>>>>,
6669
/// Shared reference to the Slack permissions ArcSwap (same instance used by the adapter and file watcher).
@@ -191,6 +194,7 @@ impl ApiState {
191194
cron_stores: arc_swap::ArcSwap::from_pointee(HashMap::new()),
192195
cron_schedulers: arc_swap::ArcSwap::from_pointee(HashMap::new()),
193196
runtime_configs: ArcSwap::from_pointee(HashMap::new()),
197+
mcp_managers: ArcSwap::from_pointee(HashMap::new()),
194198
discord_permissions: RwLock::new(None),
195199
slack_permissions: RwLock::new(None),
196200
bindings: RwLock::new(None),
@@ -416,6 +420,11 @@ impl ApiState {
416420
self.runtime_configs.store(Arc::new(configs));
417421
}
418422

423+
/// Set the MCP managers for all agents.
424+
pub fn set_mcp_managers(&self, managers: HashMap<String, Arc<McpManager>>) {
425+
self.mcp_managers.store(Arc::new(managers));
426+
}
427+
419428
/// Share the Discord permissions ArcSwap with the API so reads get hot-reloaded values.
420429
pub async fn set_discord_permissions(&self, permissions: Arc<ArcSwap<DiscordPermissions>>) {
421430
*self.discord_permissions.write().await = Some(permissions);

0 commit comments

Comments
 (0)