Skip to content

Commit 160c9cc

Browse files
committed
adds remote mcp support in agent crate
1 parent 4e7974b commit 160c9cc

File tree

11 files changed

+1260
-258
lines changed

11 files changed

+1260
-258
lines changed

crates/agent/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ r2d2_sqlite.workspace = true
5151
rand.workspace = true
5252
regex.workspace = true
5353
reqwest.workspace = true
54-
rmcp = { version = "0.8.0", features = ["client", "transport-async-rw", "transport-child-process", "transport-io"] }
54+
rmcp.workspace = true
5555
rusqlite.workspace = true
5656
rustls.workspace = true
5757
rustls-native-certs.workspace = true

crates/agent/src/agent/agent_config/definitions.rs

Lines changed: 104 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use serde::{
1212
use super::types::ResourcePath;
1313
use crate::agent::consts::DEFAULT_AGENT_NAME;
1414
use crate::agent::tools::BuiltInToolName;
15+
use crate::mcp::oauth_util::OAuthConfig;
1516

1617
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
1718
#[serde(untagged)]
@@ -215,13 +216,16 @@ pub struct McpServers {
215216
}
216217

217218
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
218-
#[serde(untagged)]
219+
#[serde(tag = "type")]
219220
pub enum McpServerConfig {
221+
#[serde(rename = "stdio")]
220222
Local(LocalMcpServerConfig),
221-
StreamableHTTP(StreamableHTTPMcpServerConfig),
223+
#[serde(rename = "http")]
224+
Remote(RemoteMcpServerConfig),
222225
}
223226

224227
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
228+
#[serde(rename_all = "camelCase")]
225229
pub struct LocalMcpServerConfig {
226230
/// The command string used to initialize the mcp server
227231
pub command: String,
@@ -241,7 +245,8 @@ pub struct LocalMcpServerConfig {
241245
}
242246

243247
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
244-
pub struct StreamableHTTPMcpServerConfig {
248+
#[serde(rename_all = "camelCase")]
249+
pub struct RemoteMcpServerConfig {
245250
/// The URL endpoint for HTTP-based MCP servers
246251
pub url: String,
247252
/// HTTP headers to include when communicating with HTTP-based MCP servers
@@ -251,6 +256,12 @@ pub struct StreamableHTTPMcpServerConfig {
251256
#[serde(alias = "timeout")]
252257
#[serde(default = "default_timeout")]
253258
pub timeout_ms: u64,
259+
/// OAuth scopes required for authentication with the remote MCP server
260+
#[serde(default)]
261+
pub oauth_scopes: Vec<String>,
262+
/// OAuth configuration for this server
263+
#[serde(skip_serializing_if = "Option::is_none")]
264+
pub oauth: Option<OAuthConfig>,
254265
}
255266

256267
pub fn default_timeout() -> u64 {
@@ -392,4 +403,94 @@ mod tests {
392403

393404
let _: AgentConfig = serde_json::from_value(agent).unwrap();
394405
}
406+
407+
#[test]
408+
fn test_mcp_server_config_http_deser() {
409+
// Test HTTP server without oauth scopes
410+
let config = serde_json::json!({
411+
"type": "http",
412+
"url": "https://mcp.api.coingecko.com/sse"
413+
});
414+
let result: McpServerConfig = serde_json::from_value(config).unwrap();
415+
match result {
416+
McpServerConfig::Remote(remote) => {
417+
assert_eq!(remote.url, "https://mcp.api.coingecko.com/sse");
418+
assert!(remote.oauth_scopes.is_empty());
419+
}
420+
_ => panic!("Expected Remote variant"),
421+
}
422+
423+
// Test HTTP server with oauth scopes
424+
let config = serde_json::json!({
425+
"type": "http",
426+
"url": "https://mcp.datadoghq.com/api/unstable/mcp-server/mcp",
427+
"oauthScopes": ["mcp", "profile", "email"]
428+
});
429+
let result: McpServerConfig = serde_json::from_value(config).unwrap();
430+
match result {
431+
McpServerConfig::Remote(remote) => {
432+
assert_eq!(remote.url, "https://mcp.datadoghq.com/api/unstable/mcp-server/mcp");
433+
assert_eq!(remote.oauth_scopes, vec!["mcp", "profile", "email"]);
434+
}
435+
_ => panic!("Expected Remote variant"),
436+
}
437+
438+
// Test HTTP server with empty oauth scopes
439+
let config = serde_json::json!({
440+
"type": "http",
441+
"url": "https://example-server.modelcontextprotocol.io/mcp",
442+
"oauthScopes": []
443+
});
444+
let result: McpServerConfig = serde_json::from_value(config).unwrap();
445+
match result {
446+
McpServerConfig::Remote(remote) => {
447+
assert_eq!(remote.url, "https://example-server.modelcontextprotocol.io/mcp");
448+
assert!(remote.oauth_scopes.is_empty());
449+
}
450+
_ => panic!("Expected Remote variant"),
451+
}
452+
}
453+
454+
#[test]
455+
fn test_mcp_server_config_stdio_deser() {
456+
let config = serde_json::json!({
457+
"type": "stdio",
458+
"command": "node",
459+
"args": ["server.js"]
460+
});
461+
let result: McpServerConfig = serde_json::from_value(config).unwrap();
462+
match result {
463+
McpServerConfig::Local(local) => {
464+
assert_eq!(local.command, "node");
465+
assert_eq!(local.args, vec!["server.js"]);
466+
}
467+
_ => panic!("Expected Local variant"),
468+
}
469+
}
470+
471+
#[test]
472+
fn test_mcp_servers_map_deser() {
473+
let servers = serde_json::json!({
474+
"coin-gecko": {
475+
"type": "http",
476+
"url": "https://mcp.api.coingecko.com/sse"
477+
},
478+
"datadog": {
479+
"type": "http",
480+
"url": "https://mcp.datadoghq.com/api/unstable/mcp-server/mcp",
481+
"oauthScopes": ["mcp", "profile", "email"]
482+
},
483+
"local-server": {
484+
"type": "stdio",
485+
"command": "npx",
486+
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"]
487+
}
488+
});
489+
490+
let result: HashMap<String, McpServerConfig> = serde_json::from_value(servers).unwrap();
491+
assert_eq!(result.len(), 3);
492+
assert!(result.contains_key("coin-gecko"));
493+
assert!(result.contains_key("datadog"));
494+
assert!(result.contains_key("local-server"));
495+
}
395496
}

crates/agent/src/agent/consts.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ pub const MAX_TOOL_NAME_LEN: usize = 64;
1313

1414
pub const MAX_TOOL_SPEC_DESCRIPTION_LEN: usize = 10_004;
1515

16+
pub const DEFAULT_MCP_CREDENTIAL_PATH: &str = "~/.aws/sso/cache";
17+
1618
/// 10 MB
1719
pub const MAX_IMAGE_SIZE_BYTES: u64 = 10 * 1024 * 1024;
1820

crates/agent/src/agent/mcp/actor.rs

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::collections::HashMap;
2+
use std::path::PathBuf;
23
use std::sync::Arc;
34
use std::time::Duration;
45

@@ -50,14 +51,9 @@ pub enum McpMessage {
5051
pub struct McpServerActorHandle {
5152
_server_name: String,
5253
sender: RequestSender<McpServerActorRequest, McpServerActorResponse, McpServerActorError>,
53-
event_rx: mpsc::Receiver<McpServerActorEvent>,
5454
}
5555

5656
impl McpServerActorHandle {
57-
pub async fn recv(&mut self) -> Option<McpServerActorEvent> {
58-
self.event_rx.recv().await
59-
}
60-
6157
pub async fn get_tool_specs(&self) -> Result<Vec<ToolSpec>, McpServerActorError> {
6258
match self
6359
.sender
@@ -153,6 +149,7 @@ impl From<ServiceError> for McpServerActorError {
153149
pub enum McpServerActorEvent {
154150
/// The MCP server has launched successfully
155151
Initialized {
152+
server_name: String,
156153
/// Time taken to launch the server
157154
serve_duration: Duration,
158155
/// Time taken to list all tools.
@@ -165,7 +162,9 @@ pub enum McpServerActorEvent {
165162
list_prompts_duration: Option<Duration>,
166163
},
167164
/// The MCP server failed to initialize successfully
168-
InitializeError(String),
165+
InitializeError { server_name: String, error: String },
166+
/// An OAuth authentication request from the MCP server
167+
OauthRequest { server_name: String, oauth_url: String },
169168
}
170169

171170
#[derive(Debug)]
@@ -195,34 +194,38 @@ pub struct McpServerActor {
195194

196195
impl McpServerActor {
197196
/// Spawns an actor to manage the MCP server, returning a [McpServerActorHandle].
198-
pub fn spawn(server_name: String, config: McpServerConfig) -> McpServerActorHandle {
199-
let (event_tx, event_rx) = mpsc::channel(32);
197+
pub fn spawn(
198+
server_name: String,
199+
config: McpServerConfig,
200+
cred_path: PathBuf,
201+
event_tx: mpsc::Sender<McpServerActorEvent>,
202+
) -> McpServerActorHandle {
200203
let (req_tx, req_rx) = new_request_channel();
201204

202205
let server_name_clone = server_name.clone();
203-
tokio::spawn(async move { Self::launch(server_name_clone, config, req_rx, event_tx).await });
206+
tokio::spawn(async move { Self::launch(server_name_clone, config, cred_path, req_rx, event_tx).await });
204207

205208
McpServerActorHandle {
206209
_server_name: server_name,
207210
sender: req_tx,
208-
event_rx,
209211
}
210212
}
211213

212214
async fn launch(
213215
server_name: String,
214216
config: McpServerConfig,
217+
cred_path: PathBuf,
215218
req_rx: RequestReceiver<McpServerActorRequest, McpServerActorResponse, McpServerActorError>,
216219
event_tx: mpsc::Sender<McpServerActorEvent>,
217220
) {
218221
let (message_tx, message_rx) = mpsc::channel(32);
219-
match McpService::new(server_name.clone(), config.clone(), message_tx.clone())
220-
.launch()
222+
match McpService::new(server_name.clone(), config.clone(), cred_path, message_tx.clone())
223+
.launch(&event_tx)
221224
.await
222225
{
223226
Ok((service_handle, launch_md)) => {
224227
let s = Self {
225-
server_name,
228+
server_name: server_name.clone(),
226229
_config: config,
227230
tools: launch_md.tools.unwrap_or_default(),
228231
prompts: launch_md.prompts.unwrap_or_default(),
@@ -237,6 +240,7 @@ impl McpServerActor {
237240
let _ = s
238241
.event_tx
239242
.send(McpServerActorEvent::Initialized {
243+
server_name,
240244
serve_duration: launch_md.serve_time_taken,
241245
list_tools_duration: launch_md.list_tools_duration,
242246
list_prompts_duration: launch_md.list_prompts_duration,
@@ -246,7 +250,10 @@ impl McpServerActor {
246250
},
247251
Err(err) => {
248252
let _ = event_tx
249-
.send(McpServerActorEvent::InitializeError(err.to_string()))
253+
.send(McpServerActorEvent::InitializeError {
254+
server_name,
255+
error: err.to_string(),
256+
})
250257
.await;
251258
},
252259
}
@@ -340,7 +347,7 @@ impl McpServerActor {
340347
let service_handle = self.service_handle.clone();
341348
let tx = self.message_tx.clone();
342349
tokio::spawn(async move {
343-
let res = service_handle.list_tools().await;
350+
let res = service_handle.list_all_tools().await;
344351
let _ = tx.send(McpMessage::Tools(res)).await;
345352
});
346353
}
@@ -351,7 +358,7 @@ impl McpServerActor {
351358
let service_handle = self.service_handle.clone();
352359
let tx = self.message_tx.clone();
353360
tokio::spawn(async move {
354-
let res = service_handle.list_prompts().await;
361+
let res = service_handle.list_all_prompts().await;
355362
let _ = tx.send(McpMessage::Prompts(res)).await;
356363
});
357364
}

0 commit comments

Comments
 (0)