Skip to content

Commit d328157

Browse files
authored
fix: enhance transport graceful shutdown with proper writer closure (#392)
- Add TransportWriter type alias for cleaner type definitions - Wrap transport writer in Option to enable proper closure - Implement close() method to drop writer and signal end of communication - Update graceful_shutdown to close transport before waiting for process exit - Improve error handling for closed transport state Addresses the graceful shutdown improvements discussed in #347 and #364
1 parent a84a3eb commit d328157

File tree

2 files changed

+19
-5
lines changed

2 files changed

+19
-5
lines changed

crates/rmcp/src/transport/async_rw.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,11 @@ where
4242
}
4343
}
4444

45+
pub type TransportWriter<Role, W> = FramedWrite<W, JsonRpcMessageCodec<TxJsonRpcMessage<Role>>>;
46+
4547
pub struct AsyncRwTransport<Role: ServiceRole, R: AsyncRead, W: AsyncWrite> {
4648
read: FramedRead<R, JsonRpcMessageCodec<RxJsonRpcMessage<Role>>>,
47-
write: Arc<Mutex<FramedWrite<W, JsonRpcMessageCodec<TxJsonRpcMessage<Role>>>>>,
49+
write: Arc<Mutex<Option<TransportWriter<Role, W>>>>,
4850
}
4951

5052
impl<Role: ServiceRole, R, W> AsyncRwTransport<Role, R, W>
@@ -57,10 +59,10 @@ where
5759
read,
5860
JsonRpcMessageCodec::<RxJsonRpcMessage<Role>>::default(),
5961
);
60-
let write = Arc::new(Mutex::new(FramedWrite::new(
62+
let write = Arc::new(Mutex::new(Some(FramedWrite::new(
6163
write,
6264
JsonRpcMessageCodec::<TxJsonRpcMessage<Role>>::default(),
63-
)));
65+
))));
6466
Self { read, write }
6567
}
6668
}
@@ -103,7 +105,14 @@ where
103105
let lock = self.write.clone();
104106
async move {
105107
let mut write = lock.lock().await;
106-
write.send(item).await.map_err(Into::into)
108+
if let Some(ref mut write) = *write {
109+
write.send(item).await.map_err(Into::into)
110+
} else {
111+
Err(std::io::Error::new(
112+
std::io::ErrorKind::NotConnected,
113+
"Transport is closed",
114+
))
115+
}
107116
}
108117
}
109118

@@ -120,6 +129,8 @@ where
120129
}
121130

122131
async fn close(&mut self) -> Result<(), Self::Error> {
132+
let mut write = self.write.lock().await;
133+
drop(write.take());
123134
Ok(())
124135
}
125136
}

crates/rmcp/src/transport/child_process.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,13 @@ impl TokioChildProcess {
104104

105105
/// Gracefully shutdown the child process
106106
///
107-
/// This will first wait for the child process to exit normally with a timeout.
107+
/// This will first close the transport to the child process (the server),
108+
/// and wait for the child process to exit normally with a timeout.
108109
/// If the child process doesn't exit within the timeout, it will be killed.
109110
pub async fn graceful_shutdown(&mut self) -> std::io::Result<()> {
110111
if let Some(mut child) = self.child.inner.take() {
112+
self.transport.close().await?;
113+
111114
let wait_fut = Box::into_pin(child.wait());
112115
tokio::select! {
113116
_ = tokio::time::sleep(std::time::Duration::from_secs(MAX_WAIT_ON_DROP_SECS)) => {

0 commit comments

Comments
 (0)