Skip to content

Commit 93d654e

Browse files
committed
test(cli): incraese the wait durations for Windows
Windows needs more time for listeners to be ready due to slower process startup.
1 parent f4c49bc commit 93d654e

File tree

2 files changed

+42
-33
lines changed

2 files changed

+42
-33
lines changed

testsuite/src/mcp_client.rs

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::pin::Pin;
22

3-
use anyhow::{Context as _, Result};
3+
use anyhow::Context as _;
44
use serde::{Deserialize, Serialize};
55
use tokio::io::{AsyncBufReadExt as _, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
66

@@ -160,23 +160,23 @@ impl McpClient {
160160
}
161161

162162
/// Connect to the MCP server and perform initialization handshake using the configured settings.
163-
pub async fn connect(&mut self) -> Result<InitializeResult> {
163+
pub async fn connect(&mut self) -> anyhow::Result<InitializeResult> {
164164
self.initialize().await
165165
}
166166

167167
/// Send a raw JSON-RPC request and get response
168-
async fn send_request(&mut self, request: JsonRpcRequest) -> Result<JsonRpcResponse> {
168+
async fn send_request(&mut self, request: JsonRpcRequest) -> anyhow::Result<JsonRpcResponse> {
169169
// Serialize request to JSON
170170
let mut request = serde_json::to_string(&request)?;
171171
request.push('\n');
172172

173173
// Write request as line.
174-
self.writer.write_all(request.as_bytes()).await?;
175-
self.writer.flush().await?;
174+
self.writer.write_all(request.as_bytes()).await.context("write")?;
175+
self.writer.flush().await.context("flush")?;
176176

177177
// Read response line.
178178
let mut response_line = String::new();
179-
self.reader.read_line(&mut response_line).await?;
179+
self.reader.read_line(&mut response_line).await.context("read")?;
180180

181181
if response_line.trim().is_empty() {
182182
anyhow::bail!("empty response");
@@ -189,7 +189,7 @@ impl McpClient {
189189
}
190190

191191
/// Internal helper to send an initialize request.
192-
async fn initialize(&mut self) -> Result<InitializeResult> {
192+
async fn initialize(&mut self) -> anyhow::Result<InitializeResult> {
193193
let request = JsonRpcRequest {
194194
jsonrpc: "2.0".to_owned(),
195195
id: Some(self.next_id()),
@@ -210,42 +210,38 @@ impl McpClient {
210210
}
211211

212212
/// List available tools.
213-
pub async fn list_tools(&mut self) -> Result<ToolsListResult> {
213+
pub async fn list_tools(&mut self) -> anyhow::Result<ToolsListResult> {
214214
let request = JsonRpcRequest {
215215
jsonrpc: "2.0".to_owned(),
216216
id: Some(self.next_id()),
217217
method: "tools/list".to_owned(),
218218
params: None,
219219
};
220220

221-
let response = self.send_request(request).await?;
221+
let response = self.send_request(request).await.context("send request")?;
222222
if let Some(error) = response.error {
223223
anyhow::bail!("JSON-RPC error {}: {}", error.code, error.message);
224224
}
225225

226-
let result = response
227-
.result
228-
.ok_or_else(|| anyhow::anyhow!("missing result in response"))?;
226+
let result = response.result.context("missing result in response")?;
229227
Ok(serde_json::from_value(result)?)
230228
}
231229

232230
/// Call a tool.
233-
pub async fn call_tool(&mut self, params: ToolCallParams) -> Result<ToolCallResult> {
231+
pub async fn call_tool(&mut self, params: ToolCallParams) -> anyhow::Result<ToolCallResult> {
234232
let request = JsonRpcRequest {
235233
jsonrpc: "2.0".to_owned(),
236234
id: Some(self.next_id()),
237235
method: "tools/call".to_owned(),
238236
params: Some(serde_json::to_value(params)?),
239237
};
240238

241-
let response = self.send_request(request).await?;
239+
let response = self.send_request(request).await.context("send request")?;
242240
if let Some(error) = response.error {
243241
anyhow::bail!("JSON-RPC error {}: {}", error.code, error.message);
244242
}
245243

246-
let result = response
247-
.result
248-
.ok_or_else(|| anyhow::anyhow!("Missing result in response"))?;
244+
let result = response.result.context("missing result in response")?;
249245
Ok(serde_json::from_value(result)?)
250246
}
251247

@@ -254,7 +250,7 @@ impl McpClient {
254250
&mut self,
255251
method: impl Into<String>,
256252
params: Option<serde_json::Value>,
257-
) -> Result<()> {
253+
) -> anyhow::Result<()> {
258254
let request = JsonRpcRequest {
259255
jsonrpc: "2.0".to_owned(),
260256
id: None, // Notifications have no ID.

testsuite/tests/cli/jetsocat.rs

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,17 @@ use rstest::rstest;
77
use test_utils::find_unused_ports;
88
use testsuite::cli::{assert_stderr_eq, jetsocat_assert_cmd, jetsocat_cmd, jetsocat_tokio_cmd};
99

10-
const LISTENER_WAIT_DURATION: Duration = Duration::from_millis(50);
11-
const COMMAND_TIMEOUT: Duration = Duration::from_millis(150);
10+
// NOTE: Windows needs more time for listeners to be ready due to slower process startup.
11+
12+
#[cfg(not(windows))]
13+
const LISTENER_WAIT_DURATION: Duration = Duration::from_millis(150);
14+
#[cfg(windows)]
15+
const LISTENER_WAIT_DURATION: Duration = Duration::from_millis(300);
16+
17+
#[cfg(not(windows))]
18+
const COMMAND_TIMEOUT: Duration = Duration::from_millis(300);
19+
#[cfg(windows)]
20+
const COMMAND_TIMEOUT: Duration = Duration::from_millis(650);
1221

1322
#[test]
1423
fn no_args_shows_help() {
@@ -872,7 +881,7 @@ async fn mcp_proxy_notification(#[values(true, false)] http_transport: bool) {
872881
mcp_client.list_tools().await.expect("list tools");
873882

874883
// Wait for the handler to be called.
875-
tokio::time::sleep(Duration::from_millis(50)).await;
884+
tokio::time::sleep(Duration::from_millis(75)).await;
876885

877886
// Check the probe.
878887
assert!(probe.load(std::sync::atomic::Ordering::SeqCst));
@@ -981,6 +990,7 @@ async fn mcp_proxy_http_error() {
981990
async fn mcp_proxy_terminated_on_broken_pipe() {
982991
use testsuite::mcp_client::McpClient;
983992
use testsuite::mcp_server::{DynMcpTransport, McpServer, NamedPipeTransport};
993+
// use tokio::io::AsyncReadExt as _; // TODO
984994

985995
// Configure MCP server transport (named pipe only).
986996
let np_transport = NamedPipeTransport::bind().unwrap();
@@ -996,16 +1006,18 @@ async fn mcp_proxy_terminated_on_broken_pipe() {
9961006

9971007
// Start jetsocat mcp-proxy with stdio pipe.
9981008
let mut jetsocat_process = jetsocat_tokio_cmd()
999-
.args(["mcp-proxy", "stdio", &pipe])
1009+
.args(["mcp-proxy", "stdio", &pipe]) // TODO: add "--log-term"
10001010
.stdin(std::process::Stdio::piped())
10011011
.stdout(std::process::Stdio::piped())
1012+
// .stderr(std::process::Stdio::piped()) // TODO: Once Jetsocat logs to stderr.
10021013
.kill_on_drop(true)
10031014
.spawn()
10041015
.expect("start jetsocat mcp-proxy");
10051016

10061017
// Get stdin/stdout handles for MCP client.
10071018
let stdin = jetsocat_process.stdin.take().expect("get stdin");
10081019
let stdout = jetsocat_process.stdout.take().expect("get stdout");
1020+
// let mut stderr = jetsocat_process.stderr.take().expect("get stderr"); // TODO
10091021

10101022
// Initialize MCP client with jetsocat's stdin/stdout.
10111023
let mut mcp_client = McpClient::new(Box::pin(stdout), Box::pin(stdin));
@@ -1023,18 +1035,19 @@ async fn mcp_proxy_terminated_on_broken_pipe() {
10231035
// The proxy will detect this and send an error response, then close.
10241036
let result = mcp_client.list_tools().await;
10251037

1026-
// The proxy should detect the pipe as broken, and terminates with no response.
1027-
// Ideally, the proxy should fail when writing the request, but merely
1028-
// writing is typically not enough to detect a broken pipe with named pipes.
1038+
// Since Jetsocat is continuously reading on the pipe, it quickly detects the pipe is broken and stops itself with an error.
1039+
// Our MCP client in turns try to write from stdout / read to stdin, and this fails with a BrokenPipe on our side.
10291040
let error = result.unwrap_err();
1030-
expect![[r#"
1031-
"empty response"
1032-
"#]]
1033-
.assert_debug_eq(&error);
1034-
1035-
// FIXME: Jetsocat needs to be modified to print the logs into stderr.
1036-
// Once we have that, we can retrieve stderr and search for this string:
1037-
// Fatal error reading from peer, stopping proxy error="connection closed"
1041+
let error_debug_fmt = format!("{error:?}");
1042+
#[cfg(windows)]
1043+
assert!(error_debug_fmt.contains("The pipe is being closed"));
1044+
#[cfg(not(windows))]
1045+
assert!(error_debug_fmt.contains("Broken pipe (os error 32)"));
1046+
1047+
// TODO: Once Jetsocat print the logs to stderr.
1048+
// let mut stderr_str = String::new();
1049+
// stderr.read_to_string(&mut stderr_str).await.expect("read_to_string");
1050+
// stderr_str.contains(r#"Fatal error reading from peer, stopping proxy error="connection closed""#);
10381051

10391052
// The jetsocat process should exit gracefully after detecting broken pipe.
10401053
let exit_status = tokio::time::timeout(Duration::from_secs(2), jetsocat_process.wait()).await;

0 commit comments

Comments
 (0)