Skip to content

Commit 28dc642

Browse files
committed
chore: fixing stdio blocking ctrl+c signal
1 parent 45c1691 commit 28dc642

File tree

14 files changed

+216
-102
lines changed

14 files changed

+216
-102
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ tokio = { version = "1.45.0", features = [
5050
"time",
5151
] }
5252
tokio-stream = "0.1"
53+
tokio-util = "0.7.15"
5354
tracing = "0.1.41"
5455
tracing-core = "0.1.33"
5556
tracing-subscriber = { version = "0.3.19", features = ["json"] }

crates/apollo-mcp-proxy/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ version.workspace = true
88
axum.workspace = true
99
clap = { version = "4.5.40", features = ["derive"] }
1010
tokio.workspace = true
11+
tokio-util.workspace = true
1112
rmcp = { workspace = true, features = [
1213
"client",
1314
"reqwest",
@@ -17,6 +18,7 @@ tracing.workspace = true
1718
tracing-appender = "0.2.3"
1819
tracing-subscriber.workspace = true
1920
serde_json = "1.0.140"
21+
futures = "0.3.31"
2022

2123
[lints]
2224
workspace = true

crates/apollo-mcp-proxy/src/client.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
1+
use crate::non_blocking_stdio::NonBlockStdIo;
12
use crate::server::ProxyServer;
2-
use rmcp::transport::stdio;
3+
use rmcp::model::ProtocolVersion;
34
use rmcp::{
45
ServiceExt,
56
model::{ClientCapabilities, ClientInfo, Implementation},
67
transport::StreamableHttpClientTransport,
78
};
89
use std::error::Error;
10+
use tokio_util::sync::CancellationToken;
911
use tracing::{debug, error, info};
1012

11-
pub async fn start_proxy_client(url: &str) -> Result<(), Box<dyn Error>> {
13+
pub async fn start_proxy_client(
14+
url: &str,
15+
cancellation_token: CancellationToken,
16+
) -> Result<(), Box<dyn Error>> {
1217
let transport = StreamableHttpClientTransport::from_uri(url);
1318
let client_info = ClientInfo {
14-
protocol_version: Default::default(),
19+
protocol_version: ProtocolVersion::LATEST,
1520
capabilities: ClientCapabilities::default(),
1621
client_info: Implementation {
1722
name: "mcp remote rust client".to_string(),
@@ -34,9 +39,11 @@ pub async fn start_proxy_client(url: &str) -> Result<(), Box<dyn Error>> {
3439
debug!("{server_info:#?}");
3540

3641
let proxy_server = ProxyServer::new(client.peer().clone(), client.peer_info());
37-
let stdio_transport = stdio();
38-
let server = proxy_server.serve(stdio_transport).await?;
3942

43+
let stdio_transport = NonBlockStdIo::new(cancellation_token.child_token());
44+
let server = proxy_server
45+
.serve_with_ct(stdio_transport, cancellation_token)
46+
.await?;
4047
server.waiting().await?;
4148
Ok(())
4249
}

crates/apollo-mcp-proxy/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
pub mod client;
2-
2+
pub mod non_blocking_stdio;
33
pub mod server;

crates/apollo-mcp-proxy/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
2828

2929
let args = Args::parse();
3030

31-
let _ = start_proxy_client(&args.url).await;
31+
let _ = start_proxy_client(&args.url, Default::default()).await;
3232

3333
Ok(())
3434
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
use rmcp::RoleServer;
2+
use rmcp::service::{RxJsonRpcMessage, TxJsonRpcMessage};
3+
use rmcp::transport::Transport;
4+
use std::io::{ErrorKind, Write};
5+
use tokio_util::sync::CancellationToken;
6+
use tracing::{debug, error};
7+
8+
pub struct NonBlockStdIo {
9+
tx_out: tokio::sync::mpsc::Sender<TxJsonRpcMessage<RoleServer>>,
10+
rx_in: tokio::sync::mpsc::Receiver<RxJsonRpcMessage<RoleServer>>,
11+
}
12+
13+
impl NonBlockStdIo {
14+
pub fn new(cancellation_token: CancellationToken) -> Self {
15+
let (tx_in, rx_in) = tokio::sync::mpsc::channel(100);
16+
let (tx_out, mut rx_out) = tokio::sync::mpsc::channel(100);
17+
18+
std::thread::spawn(move || {
19+
for line_result in std::io::stdin().lines() {
20+
let line = match line_result {
21+
Ok(line) => line,
22+
Err(e) => {
23+
error!("[Proxy] Failed to read from stdin: {e:?}");
24+
cancellation_token.cancel();
25+
"".to_string()
26+
}
27+
};
28+
29+
debug!("[Proxy] Stdin received: {line}");
30+
31+
let data = match serde_json::from_slice(line.as_bytes()) {
32+
Ok(data) => data,
33+
Err(e) => {
34+
error!("[Proxy] Failed to deserialize json: {e:?}");
35+
cancellation_token.cancel();
36+
continue;
37+
}
38+
};
39+
40+
match tx_in.blocking_send(data) {
41+
Ok(_) => {}
42+
Err(e) => {
43+
error!("[Proxy] Failed to send data: {e:?}");
44+
}
45+
}
46+
}
47+
});
48+
49+
std::thread::spawn(move || {
50+
loop {
51+
if let Some(data) = rx_out.blocking_recv() {
52+
let mut data = serde_json::to_string(&data).unwrap_or_else(|e| {
53+
error!("[Proxy] Couldn't serialize data: {e:?}");
54+
"".to_string()
55+
});
56+
57+
data.push('\n');
58+
59+
match std::io::stdout().write_all(data.as_bytes()) {
60+
Ok(_) => {}
61+
Err(e) => {
62+
error!("[Proxy] Failed to write data to stdout: {e:?}");
63+
}
64+
}
65+
66+
match std::io::stdout().flush() {
67+
Ok(_) => {}
68+
Err(e) => {
69+
error!("[Proxy] Failed to flush stdout: {e:?}");
70+
}
71+
}
72+
}
73+
}
74+
});
75+
76+
Self { tx_out, rx_in }
77+
}
78+
}
79+
80+
impl Transport<RoleServer> for NonBlockStdIo {
81+
type Error = tokio::io::Error;
82+
83+
fn send(
84+
&mut self,
85+
item: TxJsonRpcMessage<RoleServer>,
86+
) -> impl Future<Output = Result<(), Self::Error>> + Send + 'static {
87+
let tx = self.tx_out.clone();
88+
89+
async move {
90+
debug!("Sending message to server: {item:?}");
91+
tx.send(item).await.map_err(|e| {
92+
tokio::io::Error::new(
93+
ErrorKind::BrokenPipe,
94+
format!("NonBlockStdIo send error: {e:?}"),
95+
)
96+
})
97+
}
98+
}
99+
100+
#[allow(clippy::manual_async_fn)]
101+
fn receive(&mut self) -> impl Future<Output = Option<RxJsonRpcMessage<RoleServer>>> + Send {
102+
async move {
103+
let data = self.rx_in.recv().await;
104+
debug!("[NonBlockStdIo receiving] {data:?}");
105+
data
106+
}
107+
}
108+
109+
#[allow(clippy::manual_async_fn)]
110+
fn close(&mut self) -> impl Future<Output = Result<(), Self::Error>> + Send {
111+
async move {
112+
debug!("[NonBlockStdIo] Closing connection");
113+
self.rx_in.close();
114+
Ok(())
115+
}
116+
}
117+
}
118+
119+
impl Drop for NonBlockStdIo {
120+
fn drop(&mut self) {
121+
debug!("[NonBlockStdIo] Dropping connection");
122+
}
123+
}

crates/apollo-mcp-proxy/src/server.rs

Lines changed: 13 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,10 @@ use rmcp::model::{
77
use rmcp::service::{NotificationContext, RequestContext};
88
use rmcp::{Error as McpError, Peer, RoleClient, RoleServer, ServerHandler};
99
use std::sync::Arc;
10-
use tokio::sync::Mutex;
1110
use tracing::{debug, error, info};
1211

1312
pub struct ProxyServer {
14-
client: Arc<Mutex<Peer<RoleClient>>>,
13+
client: Arc<Peer<RoleClient>>,
1514
server_info: Arc<ServerInfo>,
1615
}
1716

@@ -34,7 +33,7 @@ impl ProxyServer {
3433
debug!("[Proxy]server info: {:?}", server_info);
3534

3635
Self {
37-
client: Arc::new(Mutex::new(client_peer)),
36+
client: Arc::new(client_peer),
3837
server_info: Arc::new(server_info),
3938
}
4039
}
@@ -60,10 +59,7 @@ impl ServerHandler for ProxyServer {
6059
request: rmcp::model::CompleteRequestParam,
6160
_context: RequestContext<RoleServer>,
6261
) -> Result<rmcp::model::CompleteResult, McpError> {
63-
let client = self.client.clone();
64-
let guard = client.lock().await;
65-
66-
match guard.complete(request).await {
62+
match self.client.complete(request).await {
6763
Ok(result) => {
6864
debug!("[Proxy] Proxying complete response");
6965
Ok(result)
@@ -91,10 +87,7 @@ impl ServerHandler for ProxyServer {
9187
));
9288
}
9389

94-
let client = self.client.clone();
95-
let guard = client.lock().await;
96-
97-
match guard.get_prompt(request).await {
90+
match self.client.get_prompt(request).await {
9891
Ok(result) => {
9992
debug!("[Proxy] Proxying get_prompt response");
10093
Ok(result)
@@ -122,10 +115,7 @@ impl ServerHandler for ProxyServer {
122115
));
123116
}
124117

125-
let client = self.client.clone();
126-
let guard = client.lock().await;
127-
128-
match guard.list_prompts(request).await {
118+
match self.client.list_prompts(request).await {
129119
Ok(result) => {
130120
debug!("[Proxy] Proxying list_prompts response");
131121
Ok(result)
@@ -150,9 +140,7 @@ impl ServerHandler for ProxyServer {
150140
));
151141
}
152142

153-
let client_guard = self.client.lock().await;
154-
155-
match client_guard.list_resources(request).await {
143+
match self.client.list_resources(request).await {
156144
Ok(list_resources_result) => {
157145
debug!(
158146
"Proxying list_resources response: {:?}",
@@ -180,11 +168,7 @@ impl ServerHandler for ProxyServer {
180168
));
181169
}
182170

183-
let client = self.client.clone();
184-
let guard = client.lock().await;
185-
186-
// TODO: Check if the server has resources capability and forward the request
187-
match guard.list_resource_templates(request).await {
171+
match self.client.list_resource_templates(request).await {
188172
Ok(list_resource_templates_result) => {
189173
debug!(
190174
"Proxying list_resource_templates response: {:?}",
@@ -212,11 +196,8 @@ impl ServerHandler for ProxyServer {
212196
));
213197
}
214198

215-
let client = self.client.clone();
216-
let guard = client.lock().await;
217-
218-
// TODO: Check if the server has resources capability and forward the request
219-
match guard
199+
match self
200+
.client
220201
.read_resource(ReadResourceRequestParam {
221202
uri: request.uri.clone(),
222203
})
@@ -252,10 +233,7 @@ impl ServerHandler for ProxyServer {
252233
));
253234
}
254235

255-
let client = self.client.clone();
256-
let guard = client.lock().await;
257-
258-
match guard.call_tool(request.clone()).await {
236+
match self.client.call_tool(request.clone()).await {
259237
Ok(result) => {
260238
debug!("[Proxy] Tool call succeeded: {:?}", result);
261239
Ok(result)
@@ -282,10 +260,7 @@ impl ServerHandler for ProxyServer {
282260
));
283261
}
284262

285-
let client = self.client.clone();
286-
let guard = client.lock().await;
287-
288-
match guard.list_tools(request).await {
263+
match self.client.list_tools(request).await {
289264
Ok(result) => {
290265
debug!(
291266
"Proxying list_tools response with {} tools: {:?}",
@@ -306,9 +281,7 @@ impl ServerHandler for ProxyServer {
306281
notification: rmcp::model::CancelledNotificationParam,
307282
_context: NotificationContext<RoleServer>,
308283
) {
309-
let client = self.client.clone();
310-
let guard = client.lock().await;
311-
match guard.notify_cancelled(notification).await {
284+
match self.client.notify_cancelled(notification).await {
312285
Ok(_) => {
313286
debug!("[Proxy] Proxying cancelled notification");
314287
}
@@ -323,9 +296,7 @@ impl ServerHandler for ProxyServer {
323296
notification: rmcp::model::ProgressNotificationParam,
324297
_context: NotificationContext<RoleServer>,
325298
) {
326-
let client = self.client.clone();
327-
let guard = client.lock().await;
328-
match guard.notify_progress(notification).await {
299+
match self.client.notify_progress(notification).await {
329300
Ok(_) => {
330301
debug!("[Proxy] Proxying progress notification");
331302
}

crates/apollo-mcp-server/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ serde.workspace = true
2626
serde_json.workspace = true
2727
thiserror.workspace = true
2828
tokio.workspace = true
29+
tokio-util.workspace = true
2930
tracing.workspace = true
3031
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
31-
tokio-util = "0.7.15"
3232
url.workspace = true
3333

3434
[dev-dependencies]

0 commit comments

Comments
 (0)