Skip to content

Commit 67480bf

Browse files
committed
adds mechanism for checking if server is alive
1 parent 87ba32c commit 67480bf

File tree

7 files changed

+170
-87
lines changed

7 files changed

+170
-87
lines changed

crates/chat-cli/src/cli/chat/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ mod message;
77
mod parse;
88
use std::path::MAIN_SEPARATOR;
99
mod line_tracker;
10-
mod new_server_messenger;
1110
mod parser;
1211
mod prompt;
1312
mod prompt_parser;
13+
mod server_messenger;
1414
#[cfg(unix)]
1515
mod skim_integration;
1616
mod token_counter;

crates/chat-cli/src/cli/chat/new_server_messenger.rs renamed to crates/chat-cli/src/cli/chat/server_messenger.rs

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,17 @@ use rmcp::model::{
44
ListResourcesResult,
55
ListToolsResult,
66
};
7+
use rmcp::{
8+
Peer,
9+
RoleClient,
10+
};
711
use tokio::sync::mpsc::{
812
Receiver,
913
Sender,
1014
channel,
1115
};
1216

13-
use crate::mcp_client::new_messenger::{
17+
use crate::mcp_client::messenger::{
1418
Messenger,
1519
MessengerError,
1620
MessengerResult,
@@ -23,18 +27,22 @@ pub enum UpdateEventMessage {
2327
ListToolsResult {
2428
server_name: String,
2529
result: Result<ListToolsResult>,
30+
peer: Option<Peer<RoleClient>>,
2631
},
2732
ListPromptsResult {
2833
server_name: String,
2934
result: Result<ListPromptsResult>,
35+
peer: Option<Peer<RoleClient>>,
3036
},
3137
ListResourcesResult {
3238
server_name: String,
3339
result: Result<ListResourcesResult>,
40+
peer: Option<Peer<RoleClient>>,
3441
},
3542
ResourceTemplatesListResult {
3643
server_name: String,
3744
result: Result<ListResourceTemplatesResult>,
45+
peer: Option<Peer<RoleClient>>,
3846
},
3947
InitStart {
4048
server_name: String,
@@ -74,34 +82,49 @@ pub struct ServerMessenger {
7482

7583
#[async_trait::async_trait]
7684
impl Messenger for ServerMessenger {
77-
async fn send_tools_list_result(&self, result: Result<ListToolsResult>) -> MessengerResult {
85+
async fn send_tools_list_result(
86+
&self,
87+
result: Result<ListToolsResult>,
88+
peer: Option<Peer<RoleClient>>,
89+
) -> MessengerResult {
7890
Ok(self
7991
.update_event_sender
8092
.send(UpdateEventMessage::ListToolsResult {
8193
server_name: self.server_name.clone(),
8294
result,
95+
peer,
8396
})
8497
.await
8598
.map_err(|e| MessengerError::Custom(e.to_string()))?)
8699
}
87100

88-
async fn send_prompts_list_result(&self, result: Result<ListPromptsResult>) -> MessengerResult {
101+
async fn send_prompts_list_result(
102+
&self,
103+
result: Result<ListPromptsResult>,
104+
peer: Option<Peer<RoleClient>>,
105+
) -> MessengerResult {
89106
Ok(self
90107
.update_event_sender
91108
.send(UpdateEventMessage::ListPromptsResult {
92109
server_name: self.server_name.clone(),
93110
result,
111+
peer,
94112
})
95113
.await
96114
.map_err(|e| MessengerError::Custom(e.to_string()))?)
97115
}
98116

99-
async fn send_resources_list_result(&self, result: Result<ListResourcesResult>) -> MessengerResult {
117+
async fn send_resources_list_result(
118+
&self,
119+
result: Result<ListResourcesResult>,
120+
peer: Option<Peer<RoleClient>>,
121+
) -> MessengerResult {
100122
Ok(self
101123
.update_event_sender
102124
.send(UpdateEventMessage::ListResourcesResult {
103125
server_name: self.server_name.clone(),
104126
result,
127+
peer,
105128
})
106129
.await
107130
.map_err(|e| MessengerError::Custom(e.to_string()))?)
@@ -110,12 +133,14 @@ impl Messenger for ServerMessenger {
110133
async fn send_resource_templates_list_result(
111134
&self,
112135
result: Result<ListResourceTemplatesResult>,
136+
peer: Option<Peer<RoleClient>>,
113137
) -> MessengerResult {
114138
Ok(self
115139
.update_event_sender
116140
.send(UpdateEventMessage::ResourceTemplatesListResult {
117141
server_name: self.server_name.clone(),
118142
result,
143+
peer,
119144
})
120145
.await
121146
.map_err(|e| MessengerError::Custom(e.to_string()))?)

crates/chat-cli/src/cli/chat/tool_manager.rs

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ use crate::cli::agent::{
6565
use crate::cli::chat::cli::prompts::GetPromptError;
6666
use crate::cli::chat::consts::DUMMY_TOOL_NAME;
6767
use crate::cli::chat::message::AssistantToolUse;
68-
use crate::cli::chat::new_server_messenger::{
68+
use crate::cli::chat::server_messenger::{
6969
ServerMessenger,
7070
ServerMessengerBuilder,
7171
UpdateEventMessage,
@@ -85,7 +85,7 @@ use crate::cli::chat::tools::{
8585
};
8686
use crate::database::Database;
8787
use crate::database::settings::Setting;
88-
use crate::mcp_client::new_messenger::Messenger;
88+
use crate::mcp_client::messenger::Messenger;
8989
use crate::mcp_client::{
9090
McpClient,
9191
RunningClient,
@@ -397,7 +397,7 @@ impl ToolManagerBuilder {
397397

398398
let temp_messenger = messenger_builder.build_with_name(name);
399399
let _ = temp_messenger
400-
.send_tools_list_result(Err(ServiceError::UnexpectedResponse))
400+
.send_tools_list_result(Err(ServiceError::UnexpectedResponse), None)
401401
.await;
402402
},
403403
}
@@ -1277,7 +1277,11 @@ fn spawn_orchestrator_task(
12771277
// request method on the mcp client no longer buffers all the pages from
12781278
// list calls.
12791279
match msg {
1280-
UpdateEventMessage::ListToolsResult { server_name, result } => {
1280+
UpdateEventMessage::ListToolsResult {
1281+
server_name,
1282+
result,
1283+
peer,
1284+
} => {
12811285
let time_taken = loading_servers
12821286
.remove(&server_name)
12831287
.map_or("0.0".to_owned(), |init_time| {
@@ -1343,6 +1347,18 @@ fn spawn_orchestrator_task(
13431347

13441348
match result {
13451349
Ok(result) => {
1350+
if let Some(peer) = peer {
1351+
if peer.is_transport_closed() {
1352+
error!(
1353+
"Received tool list result from {server_name} but transport has been closed. Ignoring."
1354+
);
1355+
return;
1356+
}
1357+
} else {
1358+
error!("Received tool list result from {server_name} without a peer. Ignoring.");
1359+
return;
1360+
}
1361+
13461362
let mut specs = result
13471363
.tools
13481364
.into_iter()
@@ -1367,6 +1383,7 @@ fn spawn_orchestrator_task(
13671383
&result_tools,
13681384
)
13691385
.await;
1386+
13701387
if let Some(sender) = &loading_status_sender {
13711388
// Anomalies here are not considered fatal, thus we shall give
13721389
// warnings.
@@ -1469,8 +1486,23 @@ fn spawn_orchestrator_task(
14691486
}
14701487
}
14711488
},
1472-
UpdateEventMessage::ListPromptsResult { server_name, result } => match result {
1489+
UpdateEventMessage::ListPromptsResult {
1490+
server_name,
1491+
result,
1492+
peer,
1493+
} => match result {
14731494
Ok(prompt_list_result) => {
1495+
if let Some(peer) = peer {
1496+
if peer.is_transport_closed() {
1497+
error!(
1498+
"Received prompt list result from {server_name} but transport has been closed. Ignoring."
1499+
);
1500+
return;
1501+
}
1502+
} else {
1503+
error!("Received prompt list result from {server_name} without a peer. Ignoring.");
1504+
return;
1505+
}
14741506
// We first need to clear all the PromptGets that are associated with
14751507
// this server because PromptsListResult is declaring what is available
14761508
// (and not the diff)
@@ -1515,14 +1547,8 @@ fn spawn_orchestrator_task(
15151547
.or_insert(vec![record]);
15161548
},
15171549
},
1518-
UpdateEventMessage::ListResourcesResult {
1519-
server_name: _,
1520-
result: _,
1521-
} => {},
1522-
UpdateEventMessage::ResourceTemplatesListResult {
1523-
server_name: _,
1524-
result: _,
1525-
} => {},
1550+
UpdateEventMessage::ListResourcesResult { .. } => {},
1551+
UpdateEventMessage::ResourceTemplatesListResult { .. } => {},
15261552
UpdateEventMessage::InitStart { server_name, .. } => {
15271553
pending.write().await.insert(server_name.clone());
15281554
loading_servers.insert(server_name, std::time::Instant::now());

crates/chat-cli/src/cli/chat/tools/custom_tool.rs

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -159,58 +159,3 @@ impl CustomTool {
159159
PermissionEvalResult::Ask
160160
}
161161
}
162-
163-
#[cfg(test)]
164-
mod tests {
165-
use super::*;
166-
167-
#[tokio::test]
168-
async fn test_substitute_env_vars() {
169-
// Set a test environment variable
170-
let os = Os::new().await.unwrap();
171-
unsafe {
172-
os.env.set_var("TEST_VAR", "test_value");
173-
}
174-
175-
// Test basic substitution
176-
assert_eq!(
177-
substitute_env_vars("Value is ${env:TEST_VAR}", &os.env),
178-
"Value is test_value"
179-
);
180-
181-
// Test multiple substitutions
182-
assert_eq!(
183-
substitute_env_vars("${env:TEST_VAR} and ${env:TEST_VAR}", &os.env),
184-
"test_value and test_value"
185-
);
186-
187-
// Test non-existent variable
188-
assert_eq!(
189-
substitute_env_vars("${env:NON_EXISTENT_VAR}", &os.env),
190-
"${NON_EXISTENT_VAR}"
191-
);
192-
193-
// Test mixed content
194-
assert_eq!(
195-
substitute_env_vars("Prefix ${env:TEST_VAR} suffix", &os.env),
196-
"Prefix test_value suffix"
197-
);
198-
}
199-
200-
#[tokio::test]
201-
async fn test_process_env_vars() {
202-
let os = Os::new().await.unwrap();
203-
unsafe {
204-
os.env.set_var("TEST_VAR", "test_value");
205-
}
206-
207-
let mut env_vars = HashMap::new();
208-
env_vars.insert("KEY1".to_string(), "Value is ${env:TEST_VAR}".to_string());
209-
env_vars.insert("KEY2".to_string(), "No substitution".to_string());
210-
211-
process_env_vars(&mut env_vars, &os.env);
212-
213-
assert_eq!(env_vars.get("KEY1").unwrap(), "Value is test_value");
214-
assert_eq!(env_vars.get("KEY2").unwrap(), "No substitution");
215-
}
216-
}

crates/chat-cli/src/mcp_client/new_client.rs renamed to crates/chat-cli/src/mcp_client/client.rs

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use rmcp::{
2222
use tokio::process::Command;
2323
use tracing::error;
2424

25-
use super::new_messenger::Messenger;
25+
use super::messenger::Messenger;
2626
use crate::cli::chat::tools::custom_tool::CustomToolConfig;
2727
use crate::os::Os;
2828

@@ -68,7 +68,7 @@ macro_rules! paginated_fetch {
6868
final_result.$result_field.append(&mut content);
6969
}
7070

71-
if let Err(e) = $messenger.$messenger_method(final_result).await {
71+
if let Err(e) = $messenger.$messenger_method(final_result, Some($service)).await {
7272
error!(target: "mcp", "Initial {} result failed to send for server {}: {}",
7373
stringify!($result_field), $server_name, e);
7474
}
@@ -226,3 +226,58 @@ where
226226
};
227227
}
228228
}
229+
230+
#[cfg(test)]
231+
mod tests {
232+
use super::*;
233+
234+
#[tokio::test]
235+
async fn test_substitute_env_vars() {
236+
// Set a test environment variable
237+
let os = Os::new().await.unwrap();
238+
unsafe {
239+
os.env.set_var("TEST_VAR", "test_value");
240+
}
241+
242+
// Test basic substitution
243+
assert_eq!(
244+
substitute_env_vars("Value is ${env:TEST_VAR}", &os.env),
245+
"Value is test_value"
246+
);
247+
248+
// Test multiple substitutions
249+
assert_eq!(
250+
substitute_env_vars("${env:TEST_VAR} and ${env:TEST_VAR}", &os.env),
251+
"test_value and test_value"
252+
);
253+
254+
// Test non-existent variable
255+
assert_eq!(
256+
substitute_env_vars("${env:NON_EXISTENT_VAR}", &os.env),
257+
"${NON_EXISTENT_VAR}"
258+
);
259+
260+
// Test mixed content
261+
assert_eq!(
262+
substitute_env_vars("Prefix ${env:TEST_VAR} suffix", &os.env),
263+
"Prefix test_value suffix"
264+
);
265+
}
266+
267+
#[tokio::test]
268+
async fn test_process_env_vars() {
269+
let os = Os::new().await.unwrap();
270+
unsafe {
271+
os.env.set_var("TEST_VAR", "test_value");
272+
}
273+
274+
let mut env_vars = HashMap::new();
275+
env_vars.insert("KEY1".to_string(), "Value is ${env:TEST_VAR}".to_string());
276+
env_vars.insert("KEY2".to_string(), "No substitution".to_string());
277+
278+
process_env_vars(&mut env_vars, &os.env);
279+
280+
assert_eq!(env_vars.get("KEY1").unwrap(), "Value is test_value");
281+
assert_eq!(env_vars.get("KEY2").unwrap(), "No substitution");
282+
}
283+
}

0 commit comments

Comments
 (0)