Skip to content

Commit 1256e24

Browse files
committed
adds remote mcp support in agent crate
1 parent ecb83d3 commit 1256e24

File tree

11 files changed

+1260
-257
lines changed

11 files changed

+1260
-257
lines changed

crates/agent/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ r2d2_sqlite.workspace = true
5151
rand.workspace = true
5252
regex.workspace = true
5353
reqwest.workspace = true
54-
rmcp = { version = "0.8.0", features = ["client", "transport-async-rw", "transport-child-process", "transport-io"] }
54+
rmcp.workspace = true
5555
rusqlite.workspace = true
5656
rustls.workspace = true
5757
rustls-native-certs.workspace = true

crates/agent/src/agent/agent_config/definitions.rs

Lines changed: 104 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use serde::{
1212
use super::types::ResourcePath;
1313
use crate::agent::consts::DEFAULT_AGENT_NAME;
1414
use crate::agent::tools::BuiltInToolName;
15+
use crate::mcp::oauth_util::OAuthConfig;
1516

1617
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
1718
#[serde(untagged)]
@@ -215,13 +216,16 @@ pub struct McpServers {
215216
}
216217

217218
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
218-
#[serde(untagged)]
219+
#[serde(tag = "type")]
219220
pub enum McpServerConfig {
221+
#[serde(rename = "stdio")]
220222
Local(LocalMcpServerConfig),
221-
StreamableHTTP(StreamableHTTPMcpServerConfig),
223+
#[serde(rename = "http")]
224+
Remote(RemoteMcpServerConfig),
222225
}
223226

224227
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
228+
#[serde(rename_all = "camelCase")]
225229
pub struct LocalMcpServerConfig {
226230
/// The command string used to initialize the mcp server
227231
pub command: String,
@@ -241,7 +245,8 @@ pub struct LocalMcpServerConfig {
241245
}
242246

243247
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
244-
pub struct StreamableHTTPMcpServerConfig {
248+
#[serde(rename_all = "camelCase")]
249+
pub struct RemoteMcpServerConfig {
245250
/// The URL endpoint for HTTP-based MCP servers
246251
pub url: String,
247252
/// HTTP headers to include when communicating with HTTP-based MCP servers
@@ -251,6 +256,12 @@ pub struct StreamableHTTPMcpServerConfig {
251256
#[serde(alias = "timeout")]
252257
#[serde(default = "default_timeout")]
253258
pub timeout_ms: u64,
259+
/// OAuth scopes required for authentication with the remote MCP server
260+
#[serde(default)]
261+
pub oauth_scopes: Vec<String>,
262+
/// OAuth configuration for this server
263+
#[serde(skip_serializing_if = "Option::is_none")]
264+
pub oauth: Option<OAuthConfig>,
254265
}
255266

256267
pub fn default_timeout() -> u64 {
@@ -392,4 +403,94 @@ mod tests {
392403

393404
let _: AgentConfig = serde_json::from_value(agent).unwrap();
394405
}
406+
407+
#[test]
408+
fn test_mcp_server_config_http_deser() {
409+
// Test HTTP server without oauth scopes
410+
let config = serde_json::json!({
411+
"type": "http",
412+
"url": "https://mcp.api.coingecko.com/sse"
413+
});
414+
let result: McpServerConfig = serde_json::from_value(config).unwrap();
415+
match result {
416+
McpServerConfig::Remote(remote) => {
417+
assert_eq!(remote.url, "https://mcp.api.coingecko.com/sse");
418+
assert!(remote.oauth_scopes.is_empty());
419+
},
420+
_ => panic!("Expected Remote variant"),
421+
}
422+
423+
// Test HTTP server with oauth scopes
424+
let config = serde_json::json!({
425+
"type": "http",
426+
"url": "https://mcp.datadoghq.com/api/unstable/mcp-server/mcp",
427+
"oauthScopes": ["mcp", "profile", "email"]
428+
});
429+
let result: McpServerConfig = serde_json::from_value(config).unwrap();
430+
match result {
431+
McpServerConfig::Remote(remote) => {
432+
assert_eq!(remote.url, "https://mcp.datadoghq.com/api/unstable/mcp-server/mcp");
433+
assert_eq!(remote.oauth_scopes, vec!["mcp", "profile", "email"]);
434+
},
435+
_ => panic!("Expected Remote variant"),
436+
}
437+
438+
// Test HTTP server with empty oauth scopes
439+
let config = serde_json::json!({
440+
"type": "http",
441+
"url": "https://example-server.modelcontextprotocol.io/mcp",
442+
"oauthScopes": []
443+
});
444+
let result: McpServerConfig = serde_json::from_value(config).unwrap();
445+
match result {
446+
McpServerConfig::Remote(remote) => {
447+
assert_eq!(remote.url, "https://example-server.modelcontextprotocol.io/mcp");
448+
assert!(remote.oauth_scopes.is_empty());
449+
},
450+
_ => panic!("Expected Remote variant"),
451+
}
452+
}
453+
454+
#[test]
455+
fn test_mcp_server_config_stdio_deser() {
456+
let config = serde_json::json!({
457+
"type": "stdio",
458+
"command": "node",
459+
"args": ["server.js"]
460+
});
461+
let result: McpServerConfig = serde_json::from_value(config).unwrap();
462+
match result {
463+
McpServerConfig::Local(local) => {
464+
assert_eq!(local.command, "node");
465+
assert_eq!(local.args, vec!["server.js"]);
466+
},
467+
_ => panic!("Expected Local variant"),
468+
}
469+
}
470+
471+
#[test]
472+
fn test_mcp_servers_map_deser() {
473+
let servers = serde_json::json!({
474+
"coin-gecko": {
475+
"type": "http",
476+
"url": "https://mcp.api.coingecko.com/sse"
477+
},
478+
"datadog": {
479+
"type": "http",
480+
"url": "https://mcp.datadoghq.com/api/unstable/mcp-server/mcp",
481+
"oauthScopes": ["mcp", "profile", "email"]
482+
},
483+
"local-server": {
484+
"type": "stdio",
485+
"command": "npx",
486+
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"]
487+
}
488+
});
489+
490+
let result: HashMap<String, McpServerConfig> = serde_json::from_value(servers).unwrap();
491+
assert_eq!(result.len(), 3);
492+
assert!(result.contains_key("coin-gecko"));
493+
assert!(result.contains_key("datadog"));
494+
assert!(result.contains_key("local-server"));
495+
}
395496
}

crates/agent/src/agent/consts.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ pub const MAX_TOOL_NAME_LEN: usize = 64;
1212

1313
pub const MAX_TOOL_SPEC_DESCRIPTION_LEN: usize = 10_004;
1414

15+
pub const DEFAULT_MCP_CREDENTIAL_PATH: &str = "~/.aws/sso/cache";
16+
1517
/// 10 MB
1618
pub const MAX_IMAGE_SIZE_BYTES: u64 = 10 * 1024 * 1024;
1719

crates/agent/src/agent/mcp/actor.rs

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::collections::HashMap;
2+
use std::path::PathBuf;
23
use std::sync::Arc;
34
use std::time::Duration;
45

@@ -50,14 +51,9 @@ pub enum McpMessage {
5051
pub struct McpServerActorHandle {
5152
_server_name: String,
5253
sender: RequestSender<McpServerActorRequest, McpServerActorResponse, McpServerActorError>,
53-
event_rx: mpsc::Receiver<McpServerActorEvent>,
5454
}
5555

5656
impl McpServerActorHandle {
57-
pub async fn recv(&mut self) -> Option<McpServerActorEvent> {
58-
self.event_rx.recv().await
59-
}
60-
6157
pub async fn get_tool_specs(&self) -> Result<Vec<ToolSpec>, McpServerActorError> {
6258
match self
6359
.sender
@@ -150,6 +146,7 @@ impl From<ServiceError> for McpServerActorError {
150146
pub enum McpServerActorEvent {
151147
/// The MCP server has launched successfully
152148
Initialized {
149+
server_name: String,
153150
/// Time taken to launch the server
154151
serve_duration: Duration,
155152
/// Time taken to list all tools.
@@ -162,7 +159,9 @@ pub enum McpServerActorEvent {
162159
list_prompts_duration: Option<Duration>,
163160
},
164161
/// The MCP server failed to initialize successfully
165-
InitializeError(String),
162+
InitializeError { server_name: String, error: String },
163+
/// An OAuth authentication request from the MCP server
164+
OauthRequest { server_name: String, oauth_url: String },
166165
}
167166

168167
#[derive(Debug)]
@@ -192,34 +191,38 @@ pub struct McpServerActor {
192191

193192
impl McpServerActor {
194193
/// Spawns an actor to manage the MCP server, returning a [McpServerActorHandle].
195-
pub fn spawn(server_name: String, config: McpServerConfig) -> McpServerActorHandle {
196-
let (event_tx, event_rx) = mpsc::channel(32);
194+
pub fn spawn(
195+
server_name: String,
196+
config: McpServerConfig,
197+
cred_path: PathBuf,
198+
event_tx: mpsc::Sender<McpServerActorEvent>,
199+
) -> McpServerActorHandle {
197200
let (req_tx, req_rx) = new_request_channel();
198201

199202
let server_name_clone = server_name.clone();
200-
tokio::spawn(async move { Self::launch(server_name_clone, config, req_rx, event_tx).await });
203+
tokio::spawn(async move { Self::launch(server_name_clone, config, cred_path, req_rx, event_tx).await });
201204

202205
McpServerActorHandle {
203206
_server_name: server_name,
204207
sender: req_tx,
205-
event_rx,
206208
}
207209
}
208210

209211
async fn launch(
210212
server_name: String,
211213
config: McpServerConfig,
214+
cred_path: PathBuf,
212215
req_rx: RequestReceiver<McpServerActorRequest, McpServerActorResponse, McpServerActorError>,
213216
event_tx: mpsc::Sender<McpServerActorEvent>,
214217
) {
215218
let (message_tx, message_rx) = mpsc::channel(32);
216-
match McpService::new(server_name.clone(), config.clone(), message_tx.clone())
217-
.launch()
219+
match McpService::new(server_name.clone(), config.clone(), cred_path, message_tx.clone())
220+
.launch(&event_tx)
218221
.await
219222
{
220223
Ok((service_handle, launch_md)) => {
221224
let s = Self {
222-
server_name,
225+
server_name: server_name.clone(),
223226
_config: config,
224227
tools: launch_md.tools.unwrap_or_default(),
225228
prompts: launch_md.prompts.unwrap_or_default(),
@@ -234,6 +237,7 @@ impl McpServerActor {
234237
let _ = s
235238
.event_tx
236239
.send(McpServerActorEvent::Initialized {
240+
server_name,
237241
serve_duration: launch_md.serve_time_taken,
238242
list_tools_duration: launch_md.list_tools_duration,
239243
list_prompts_duration: launch_md.list_prompts_duration,
@@ -243,7 +247,10 @@ impl McpServerActor {
243247
},
244248
Err(err) => {
245249
let _ = event_tx
246-
.send(McpServerActorEvent::InitializeError(err.to_string()))
250+
.send(McpServerActorEvent::InitializeError {
251+
server_name,
252+
error: err.to_string(),
253+
})
247254
.await;
248255
},
249256
}
@@ -337,7 +344,7 @@ impl McpServerActor {
337344
let service_handle = self.service_handle.clone();
338345
let tx = self.message_tx.clone();
339346
tokio::spawn(async move {
340-
let res = service_handle.list_tools().await;
347+
let res = service_handle.list_all_tools().await;
341348
let _ = tx.send(McpMessage::Tools(res)).await;
342349
});
343350
}
@@ -348,7 +355,7 @@ impl McpServerActor {
348355
let service_handle = self.service_handle.clone();
349356
let tx = self.message_tx.clone();
350357
tokio::spawn(async move {
351-
let res = service_handle.list_prompts().await;
358+
let res = service_handle.list_all_prompts().await;
352359
let _ = tx.send(McpMessage::Prompts(res)).await;
353360
});
354361
}

0 commit comments

Comments
 (0)