Skip to content

Commit b98fa1d

Browse files
committed
adds remote mcp support in agent crate
1 parent 4e7974b commit b98fa1d

File tree

10 files changed

+1175
-257
lines changed

10 files changed

+1175
-257
lines changed

crates/agent/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ r2d2_sqlite.workspace = true
5151
rand.workspace = true
5252
regex.workspace = true
5353
reqwest.workspace = true
54-
rmcp = { version = "0.8.0", features = ["client", "transport-async-rw", "transport-child-process", "transport-io"] }
54+
rmcp.workspace = true
5555
rusqlite.workspace = true
5656
rustls.workspace = true
5757
rustls-native-certs.workspace = true

crates/agent/src/agent/agent_config/definitions.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use serde::{
1212
use super::types::ResourcePath;
1313
use crate::agent::consts::DEFAULT_AGENT_NAME;
1414
use crate::agent::tools::BuiltInToolName;
15+
use crate::mcp::oauth_util::OAuthConfig;
1516

1617
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
1718
#[serde(untagged)]
@@ -218,7 +219,7 @@ pub struct McpServers {
218219
#[serde(untagged)]
219220
pub enum McpServerConfig {
220221
Local(LocalMcpServerConfig),
221-
StreamableHTTP(StreamableHTTPMcpServerConfig),
222+
Remote(RemoteMcpServerConfig),
222223
}
223224

224225
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
@@ -241,7 +242,7 @@ pub struct LocalMcpServerConfig {
241242
}
242243

243244
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
244-
pub struct StreamableHTTPMcpServerConfig {
245+
pub struct RemoteMcpServerConfig {
245246
/// The URL endpoint for HTTP-based MCP servers
246247
pub url: String,
247248
/// HTTP headers to include when communicating with HTTP-based MCP servers
@@ -251,6 +252,11 @@ pub struct StreamableHTTPMcpServerConfig {
251252
#[serde(alias = "timeout")]
252253
#[serde(default = "default_timeout")]
253254
pub timeout_ms: u64,
255+
/// OAuth scopes required for authentication with the remote MCP server
256+
pub oauth_scope: Vec<String>,
257+
/// OAuth configuration for this server
258+
#[serde(skip_serializing_if = "Option::is_none")]
259+
pub oauth: Option<OAuthConfig>,
254260
}
255261

256262
pub fn default_timeout() -> u64 {

crates/agent/src/agent/consts.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ pub const MAX_TOOL_NAME_LEN: usize = 64;
1313

1414
pub const MAX_TOOL_SPEC_DESCRIPTION_LEN: usize = 10_004;
1515

16+
pub const DEFAULT_MCP_CREDENTIAL_PATH: &str = "~/.aws/sso/cache";
17+
1618
/// 10 MB
1719
pub const MAX_IMAGE_SIZE_BYTES: u64 = 10 * 1024 * 1024;
1820

crates/agent/src/agent/mcp/actor.rs

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::collections::HashMap;
2+
use std::path::PathBuf;
23
use std::sync::Arc;
34
use std::time::Duration;
45

@@ -44,20 +45,16 @@ pub enum McpMessage {
4445
Tools(Result<Vec<RmcpTool>, ServiceError>),
4546
Prompts(Result<Vec<RmcpPrompt>, ServiceError>),
4647
ExecuteTool { request_id: u32, result: ExecuteToolResult },
48+
OauthRequest { oauth_url: String },
4749
}
4850

4951
#[derive(Debug)]
5052
pub struct McpServerActorHandle {
5153
_server_name: String,
5254
sender: RequestSender<McpServerActorRequest, McpServerActorResponse, McpServerActorError>,
53-
event_rx: mpsc::Receiver<McpServerActorEvent>,
5455
}
5556

5657
impl McpServerActorHandle {
57-
pub async fn recv(&mut self) -> Option<McpServerActorEvent> {
58-
self.event_rx.recv().await
59-
}
60-
6158
pub async fn get_tool_specs(&self) -> Result<Vec<ToolSpec>, McpServerActorError> {
6259
match self
6360
.sender
@@ -153,6 +150,7 @@ impl From<ServiceError> for McpServerActorError {
153150
pub enum McpServerActorEvent {
154151
/// The MCP server has launched successfully
155152
Initialized {
153+
server_name: String,
156154
/// Time taken to launch the server
157155
serve_duration: Duration,
158156
/// Time taken to list all tools.
@@ -165,7 +163,9 @@ pub enum McpServerActorEvent {
165163
list_prompts_duration: Option<Duration>,
166164
},
167165
/// The MCP server failed to initialize successfully
168-
InitializeError(String),
166+
InitializeError { server_name: String, error: String },
167+
/// An OAuth authentication request from the MCP server
168+
OauthRequest { server_name: String, oauth_url: String },
169169
}
170170

171171
#[derive(Debug)]
@@ -195,34 +195,38 @@ pub struct McpServerActor {
195195

196196
impl McpServerActor {
197197
/// Spawns an actor to manage the MCP server, returning a [McpServerActorHandle].
198-
pub fn spawn(server_name: String, config: McpServerConfig) -> McpServerActorHandle {
199-
let (event_tx, event_rx) = mpsc::channel(32);
198+
pub fn spawn(
199+
server_name: String,
200+
config: McpServerConfig,
201+
cred_path: PathBuf,
202+
event_tx: mpsc::Sender<McpServerActorEvent>,
203+
) -> McpServerActorHandle {
200204
let (req_tx, req_rx) = new_request_channel();
201205

202206
let server_name_clone = server_name.clone();
203-
tokio::spawn(async move { Self::launch(server_name_clone, config, req_rx, event_tx).await });
207+
tokio::spawn(async move { Self::launch(server_name_clone, config, cred_path, req_rx, event_tx).await });
204208

205209
McpServerActorHandle {
206210
_server_name: server_name,
207211
sender: req_tx,
208-
event_rx,
209212
}
210213
}
211214

212215
async fn launch(
213216
server_name: String,
214217
config: McpServerConfig,
218+
cred_path: PathBuf,
215219
req_rx: RequestReceiver<McpServerActorRequest, McpServerActorResponse, McpServerActorError>,
216220
event_tx: mpsc::Sender<McpServerActorEvent>,
217221
) {
218222
let (message_tx, message_rx) = mpsc::channel(32);
219-
match McpService::new(server_name.clone(), config.clone(), message_tx.clone())
223+
match McpService::new(server_name.clone(), config.clone(), cred_path, message_tx.clone())
220224
.launch()
221225
.await
222226
{
223227
Ok((service_handle, launch_md)) => {
224228
let s = Self {
225-
server_name,
229+
server_name: server_name.clone(),
226230
_config: config,
227231
tools: launch_md.tools.unwrap_or_default(),
228232
prompts: launch_md.prompts.unwrap_or_default(),
@@ -237,6 +241,7 @@ impl McpServerActor {
237241
let _ = s
238242
.event_tx
239243
.send(McpServerActorEvent::Initialized {
244+
server_name,
240245
serve_duration: launch_md.serve_time_taken,
241246
list_tools_duration: launch_md.list_tools_duration,
242247
list_prompts_duration: launch_md.list_prompts_duration,
@@ -246,7 +251,10 @@ impl McpServerActor {
246251
},
247252
Err(err) => {
248253
let _ = event_tx
249-
.send(McpServerActorEvent::InitializeError(err.to_string()))
254+
.send(McpServerActorEvent::InitializeError {
255+
server_name,
256+
error: err.to_string(),
257+
})
250258
.await;
251259
},
252260
}
@@ -331,6 +339,18 @@ impl McpServerActor {
331339
);
332340
},
333341
},
342+
McpMessage::OauthRequest { oauth_url } => {
343+
if let Err(err) = self
344+
.event_tx
345+
.send(McpServerActorEvent::OauthRequest {
346+
server_name: self.server_name.clone(),
347+
oauth_url,
348+
})
349+
.await
350+
{
351+
error!(?self.server_name, ?err, "failed to send oauth request");
352+
}
353+
},
334354
}
335355
}
336356

@@ -340,7 +360,7 @@ impl McpServerActor {
340360
let service_handle = self.service_handle.clone();
341361
let tx = self.message_tx.clone();
342362
tokio::spawn(async move {
343-
let res = service_handle.list_tools().await;
363+
let res = service_handle.list_all_tools().await;
344364
let _ = tx.send(McpMessage::Tools(res)).await;
345365
});
346366
}
@@ -351,7 +371,7 @@ impl McpServerActor {
351371
let service_handle = self.service_handle.clone();
352372
let tx = self.message_tx.clone();
353373
tokio::spawn(async move {
354-
let res = service_handle.list_prompts().await;
374+
let res = service_handle.list_all_prompts().await;
355375
let _ = tx.send(McpMessage::Prompts(res)).await;
356376
});
357377
}

0 commit comments

Comments
 (0)