Skip to content

Commit 57db771

Browse files
authored
fix: cancellable initialization process (#280)
1 parent 1f7f4d3 commit 57db771

File tree

2 files changed

+42
-0
lines changed

2 files changed

+42
-0
lines changed

crates/rmcp/src/service/client.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ pub enum ClientInitializeError<E> {
3939
error: E,
4040
context: Cow<'static, str>,
4141
},
42+
43+
#[error("Cancelled")]
44+
Cancelled,
4245
}
4346

4447
/// Helper function to get the next message from the stream
@@ -121,6 +124,24 @@ pub async fn serve_client_with_ct<S, T, E, A>(
121124
transport: T,
122125
ct: CancellationToken,
123126
) -> Result<RunningService<RoleClient, S>, ClientInitializeError<E>>
127+
where
128+
S: Service<RoleClient>,
129+
T: IntoTransport<RoleClient, E, A>,
130+
E: std::error::Error + Send + Sync + 'static,
131+
{
132+
tokio::select! {
133+
result = serve_client_with_ct_inner(service, transport, ct.clone()) => { result }
134+
_ = ct.cancelled() => {
135+
Err(ClientInitializeError::Cancelled)
136+
}
137+
}
138+
}
139+
140+
async fn serve_client_with_ct_inner<S, T, E, A>(
141+
service: S,
142+
transport: T,
143+
ct: CancellationToken,
144+
) -> Result<RunningService<RoleClient, S>, ClientInitializeError<E>>
124145
where
125146
S: Service<RoleClient>,
126147
T: IntoTransport<RoleClient, E, A>,

crates/rmcp/src/service/server.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ pub enum ServerInitializeError<E> {
5858
error: E,
5959
context: Cow<'static, str>,
6060
},
61+
62+
#[error("Cancelled")]
63+
Cancelled,
6164
}
6265

6366
pub type ClientSink = Peer<RoleServer>;
@@ -140,6 +143,24 @@ pub async fn serve_server_with_ct<S, T, E, A>(
140143
transport: T,
141144
ct: CancellationToken,
142145
) -> Result<RunningService<RoleServer, S>, ServerInitializeError<E>>
146+
where
147+
S: Service<RoleServer>,
148+
T: IntoTransport<RoleServer, E, A>,
149+
E: std::error::Error + Send + Sync + 'static,
150+
{
151+
tokio::select! {
152+
result = serve_server_with_ct_inner(service, transport, ct.clone()) => { result }
153+
_ = ct.cancelled() => {
154+
Err(ServerInitializeError::Cancelled)
155+
}
156+
}
157+
}
158+
159+
async fn serve_server_with_ct_inner<S, T, E, A>(
160+
service: S,
161+
transport: T,
162+
ct: CancellationToken,
163+
) -> Result<RunningService<RoleServer, S>, ServerInitializeError<E>>
143164
where
144165
S: Service<RoleServer>,
145166
T: IntoTransport<RoleServer, E, A>,

0 commit comments

Comments
 (0)