Skip to content

Commit df7dae4

Browse files
committed
refactor(sacp): add lifetime-safe session proxying API
ActiveSession now has a 'responder lifetime that encodes whether responders are still active. This prevents calling proxy_remaining_messages() on sessions from run_session() where responders would die on return. New APIs: - ActiveSession::response() - builds NewSessionResponse for forwarding - ActiveSession::proxy_remaining_messages() - only on 'static sessions - SessionBuilder::spawn_session_proxy() - convenience for inject + proxy Removed SessionBuilder::proxy_session() which was an anti-pattern that combined too many concerns. Updated cookbook with both simple (spawn_session_proxy) and advanced (spawn_session + proxy_remaining_messages) patterns.
1 parent f26a71e commit df7dae4

File tree

3 files changed

+162
-74
lines changed

3 files changed

+162
-74
lines changed

src/sacp/src/cookbook.rs

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -327,17 +327,19 @@ pub mod per_session_mcp_server {
327327
//! - You want to customize the MCP server based on session parameters
328328
//! - Tools need to send notifications back to a specific session
329329
//!
330-
//! # Example
330+
//! # Simple example: proxy everything
331+
//!
332+
//! Use [`spawn_session_proxy`] when you just want to inject an MCP server
333+
//! and proxy all messages without any additional processing:
331334
//!
332335
//! ```
333336
//! use sacp::mcp_server::McpServer;
334337
//! use sacp::schema::NewSessionRequest;
335-
//! use sacp::{Agent, Client, Component, JrResponder, ProxyToConductor};
338+
//! use sacp::{Agent, Client, Component, ProxyToConductor};
336339
//!
337340
//! async fn run_proxy(transport: impl Component) -> Result<(), sacp::Error> {
338341
//! ProxyToConductor::builder()
339342
//! .on_receive_request_from(Client, async |request: NewSessionRequest, request_cx, cx| {
340-
//! // Create an MCP server for this session
341343
//! let cwd = request.cwd.clone();
342344
//! let mcp_server = McpServer::builder("session-tools")
343345
//! .tool_fn("get_cwd", "Returns session working directory",
@@ -346,17 +348,54 @@ pub mod per_session_mcp_server {
346348
//! }, sacp::tool_fn!())
347349
//! .build();
348350
//!
349-
//! // Build the session with the MCP server attached and proxy it
350351
//! cx.build_session_from(request)
351352
//! .with_mcp_server(mcp_server)?
352-
//! .proxy_session(request_cx, JrResponder::run)
353+
//! .spawn_session_proxy(request_cx)
353354
//! .await
354355
//! }, sacp::on_receive_request!())
355356
//! .serve(transport)
356357
//! .await
357358
//! }
358359
//! ```
359360
//!
361+
//! # Advanced example: intercept before proxying
362+
//!
363+
//! Use [`spawn_session`] + [`proxy_remaining_messages`] when you need to
364+
//! do something with the session before handing off to proxy mode:
365+
//!
366+
//! ```
367+
//! use sacp::mcp_server::McpServer;
368+
//! use sacp::schema::NewSessionRequest;
369+
//! use sacp::{Agent, Client, Component, ProxyToConductor};
370+
//!
371+
//! async fn run_proxy(transport: impl Component) -> Result<(), sacp::Error> {
372+
//! ProxyToConductor::builder()
373+
//! .on_receive_request_from(Client, async |request: NewSessionRequest, request_cx, cx| {
374+
//! let cwd = request.cwd.clone();
375+
//! let mcp_server = McpServer::builder("session-tools")
376+
//! .tool_fn("get_cwd", "Returns session working directory",
377+
//! async move |_params: (), _cx| {
378+
//! Ok(cwd.display().to_string())
379+
//! }, sacp::tool_fn!())
380+
//! .build();
381+
//!
382+
//! let active_session = cx.build_session_from(request)
383+
//! .with_mcp_server(mcp_server)?
384+
//! .spawn_session()
385+
//! .await?;
386+
//!
387+
//! // Do something with the session before proxying...
388+
//! tracing::info!(session_id = %active_session.session_id(), "Session created");
389+
//!
390+
//! // Respond to the client and proxy remaining messages
391+
//! request_cx.respond(active_session.response())?;
392+
//! active_session.proxy_remaining_messages()
393+
//! }, sacp::on_receive_request!())
394+
//! .serve(transport)
395+
//! .await
396+
//! }
397+
//! ```
398+
//!
360399
//! # How it works
361400
//!
362401
//! When you call [`SessionBuilder::with_mcp_server`]:
@@ -366,5 +405,8 @@ pub mod per_session_mcp_server {
366405
//! 3. The MCP server's URL is added to the `NewSessionRequest`
367406
//! 4. The handler lives as long as the session (dropped when `run_session` completes)
368407
//!
408+
//! [`spawn_session_proxy`]: crate::SessionBuilder::spawn_session_proxy
409+
//! [`spawn_session`]: crate::SessionBuilder::spawn_session
410+
//! [`proxy_remaining_messages`]: crate::ActiveSession::proxy_remaining_messages
369411
//! [`SessionBuilder::with_mcp_server`]: crate::SessionBuilder::with_mcp_server
370412
}

src/sacp/src/session.rs

Lines changed: 113 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
use std::path::Path;
1+
use std::{marker::PhantomData, path::Path};
22

33
use agent_client_protocol_schema::{
44
ContentBlock, ContentChunk, NewSessionRequest, NewSessionResponse, PromptRequest,
55
PromptResponse, SessionModeState, SessionNotification, SessionUpdate, StopReason,
66
};
77
use futures::channel::mpsc;
8+
use tokio::sync::oneshot;
89

910
use crate::{
1011
Agent, Client, Handled, HasEndpoint, JrConnectionCx, JrMessageHandler, JrRequestCx, JrRole,
@@ -51,16 +52,16 @@ where
5152
/// and let you access them.
5253
///
5354
/// Normally you would not use this method directly but would
54-
/// instead use [`Self::build_session`] and then [`SessionBuilder::send_request`].
55+
/// instead use [`Self::build_session`] and then [`SessionBuilder::spawn_session`].
5556
///
5657
/// The vector `dynamic_handler_registrations` contains any dynamic
5758
/// handle registrations associated with this session (e.g., from MCP servers).
5859
/// You can simply pass `Default::default()` if not applicable.
59-
pub fn attach_session(
60+
pub fn attach_session<'responder>(
6061
&self,
6162
response: NewSessionResponse,
6263
mut dynamic_handler_registrations: Vec<DynamicHandlerRegistration<Role>>,
63-
) -> Result<ActiveSession<Role>, crate::Error> {
64+
) -> Result<ActiveSession<'responder, Role>, crate::Error> {
6465
let NewSessionResponse {
6566
session_id,
6667
modes,
@@ -81,6 +82,7 @@ where
8182
update_tx,
8283
connection: self.clone(),
8384
dynamic_handler_registrations,
85+
_responder: PhantomData,
8486
})
8587
}
8688
}
@@ -136,10 +138,17 @@ where
136138
})
137139
}
138140

139-
/// Send the request to create the session.
141+
/// Run this session synchronously. The current task will be blocked
142+
/// and `op` will be executed with the active session information.
143+
/// This is useful when you have MCP servers that are borrowed from your local
144+
/// stack frame.
145+
///
146+
/// The `ActiveSession` passed to `op` has a non-`'static` lifetime, which
147+
/// prevents calling [`ActiveSession::proxy_remaining_messages`] (since the
148+
/// responders would terminate when `op` returns).
140149
pub async fn run_session<R>(
141150
self,
142-
op: impl AsyncFnOnce(ActiveSession<Role>) -> Result<R, crate::Error>,
151+
op: impl for<'responder> AsyncFnOnce(ActiveSession<'responder, Role>) -> Result<R, crate::Error>,
143152
) -> Result<R, crate::Error> {
144153
let response = self
145154
.connection
@@ -163,81 +172,74 @@ where
163172
/// drift but at the cost of requiring MCP servers that are `Send` and
164173
/// don't access data from the surrounding scope.
165174
///
166-
/// # Parameters
167-
///
168-
/// * `run_responder`: this is typically just `Responder::run`;
169-
/// the need for this parameter is a workaround for Rust limitations.
170-
pub async fn send_request<F>(
171-
self,
172-
run_responder: impl FnOnce(Responder, JrConnectionCx<Role>) -> F,
173-
) -> Result<ActiveSession<Role>, crate::Error>
175+
/// Returns an `ActiveSession<'static, _>` because responders are spawned
176+
/// into background tasks that live for the connection lifetime.
177+
pub async fn spawn_session(self) -> Result<ActiveSession<'static, Role>, crate::Error>
174178
where
175-
F: Future<Output = Result<(), crate::Error>> + Send + 'static,
179+
Responder: 'static,
176180
{
177-
let response = self
178-
.connection
179-
.send_request_to(Agent, self.request)
180-
.block_task()
181-
.await?;
181+
let (active_session_tx, active_session_rx) = oneshot::channel();
182182

183-
let cx = self.connection.clone();
184-
self.connection.spawn(run_responder(self.responder, cx))?;
183+
let connection = self.connection.clone();
184+
connection.spawn(async move {
185+
let response = self
186+
.connection
187+
.send_request_to(Agent, self.request)
188+
.block_task()
189+
.await?;
185190

186-
self.connection
187-
.attach_session(response, self.dynamic_handler_registrations)
191+
let cx = self.connection.clone();
192+
self.connection.spawn(self.responder.run(cx))?;
193+
194+
let active_session = self
195+
.connection
196+
.attach_session(response, self.dynamic_handler_registrations)?;
197+
198+
active_session_tx
199+
.send(active_session)
200+
.map_err(|_| crate::Error::internal_error())?;
201+
202+
Ok(())
203+
})?;
204+
205+
active_session_rx
206+
.await
207+
.map_err(|_| crate::Error::internal_error())
188208
}
189209

190-
/// Forward the session request to the agent and proxy all messages.
210+
/// Spawn a session and proxy all messages between client and agent.
191211
///
212+
/// This is a convenience method that combines [`spawn_session`](Self::spawn_session),
213+
/// responding to the client, and [`ActiveSession::proxy_remaining_messages`].
192214
/// Use this when you want to inject MCP servers into a session but don't need
193-
/// to actively interact with it. The session messages will be proxied between
194-
/// client and agent automatically.
215+
/// to actively interact with it.
195216
///
196-
/// # Parameters
197-
///
198-
/// * `request_cx`: The request context from the intercepted `session.new` request,
199-
/// used to send the response back to the client.
200-
/// * `run_responder`: this is typically just `Responder::run`;
201-
/// the need for this parameter is a workaround for Rust limitations.
202-
pub async fn proxy_session<F>(
217+
/// For more control (e.g., to send some messages before proxying), use
218+
/// [`spawn_session`](Self::spawn_session) instead and call
219+
/// [`proxy_remaining_messages`](ActiveSession::proxy_remaining_messages) manually.
220+
pub async fn spawn_session_proxy(
203221
self,
204222
request_cx: JrRequestCx<NewSessionResponse>,
205-
run_responder: impl FnOnce(Responder, JrConnectionCx<Role>) -> F,
206223
) -> Result<(), crate::Error>
207224
where
208225
Role: HasEndpoint<Client>,
209-
F: Future<Output = Result<(), crate::Error>> + Send + 'static,
226+
Responder: 'static,
210227
{
211-
let response = self
212-
.connection
213-
.send_request_to(Agent, self.request)
214-
.block_task()
215-
.await?;
216-
217-
// Add dynamic handler to proxy session messages
218-
let session_id = response.session_id.clone();
219-
self.connection
220-
.add_dynamic_handler(ProxySessionMessages::new(session_id))?
221-
.run_indefinitely();
222-
223-
// Keep MCP server handlers alive
224-
for registration in self.dynamic_handler_registrations {
225-
registration.run_indefinitely();
226-
}
227-
228-
// Spawn the responder
229-
let cx = self.connection.clone();
230-
self.connection.spawn(run_responder(self.responder, cx))?;
231-
232-
// Send response back to client
233-
request_cx.respond(response)?;
234-
235-
Ok(())
228+
let active_session = self.spawn_session().await?;
229+
request_cx.respond(active_session.response())?;
230+
active_session.proxy_remaining_messages()
236231
}
237232
}
238233

239234
/// Active session struct that lets you send prompts and receive updates.
240-
pub struct ActiveSession<Role>
235+
///
236+
/// The `'responder` lifetime represents the span during which responders
237+
/// (e.g., MCP server handlers) are active. When created via [`SessionBuilder::spawn_session`],
238+
/// this is `'static` because responders are spawned into background tasks.
239+
/// When created via [`SessionBuilder::run_session`], this is tied to the
240+
/// closure scope, preventing [`Self::proxy_remaining_messages`] from being called
241+
/// (since the responders would die when the closure returns).
242+
pub struct ActiveSession<'responder, Role>
241243
where
242244
Role: HasEndpoint<Agent>,
243245
{
@@ -251,8 +253,10 @@ where
251253
/// Collect registrations from dynamic handlers for MCP servers etc.
252254
/// These will be dropped once the active-session struct is dropped
253255
/// which will cause them to be deregistered.
254-
#[expect(dead_code)]
255256
dynamic_handler_registrations: Vec<DynamicHandlerRegistration<Role>>,
257+
258+
/// Phantom lifetime representing the responder lifetime.
259+
_responder: PhantomData<&'responder ()>,
256260
}
257261

258262
/// Incoming message from the agent
@@ -267,7 +271,7 @@ pub enum SessionMessage {
267271
StopReason(StopReason),
268272
}
269273

270-
impl<R> ActiveSession<R>
274+
impl<'responder, R> ActiveSession<'responder, R>
271275
where
272276
R: HasEndpoint<Agent>,
273277
{
@@ -286,6 +290,18 @@ where
286290
&self.meta
287291
}
288292

293+
/// Build a `NewSessionResponse` from the session information.
294+
///
295+
/// Useful when you need to forward the session response to a client
296+
/// after doing some processing.
297+
pub fn response(&self) -> NewSessionResponse {
298+
NewSessionResponse {
299+
session_id: self.session_id.clone(),
300+
modes: self.modes.clone(),
301+
meta: self.meta.clone(),
302+
}
303+
}
304+
289305
/// Access the underlying connection context used to communicate with the agent.
290306
pub fn connection_cx(&self) -> JrConnectionCx<R> {
291307
self.connection.clone()
@@ -356,6 +372,39 @@ where
356372
}
357373
}
358374

375+
impl<R> ActiveSession<'static, R>
376+
where
377+
R: HasEndpoint<Agent>,
378+
{
379+
/// Proxy all remaining messages for this session between client and agent.
380+
///
381+
/// Use this when you want to inject MCP servers into a session but don't need
382+
/// to actively interact with it after setup. The session messages will be proxied
383+
/// between client and agent automatically.
384+
///
385+
/// This consumes the `ActiveSession` since you're giving up active control.
386+
///
387+
/// This method is only available on `ActiveSession<'static, _>` (from
388+
/// [`SessionBuilder::spawn_session`]) because it requires responders to
389+
/// outlive the method call.
390+
pub fn proxy_remaining_messages(self) -> Result<(), crate::Error>
391+
where
392+
R: HasEndpoint<Client>,
393+
{
394+
// Add dynamic handler to proxy session messages
395+
self.connection
396+
.add_dynamic_handler(ProxySessionMessages::new(self.session_id))?
397+
.run_indefinitely();
398+
399+
// Keep MCP server handlers alive
400+
for registration in self.dynamic_handler_registrations {
401+
registration.run_indefinitely();
402+
}
403+
404+
Ok(())
405+
}
406+
}
407+
359408
struct ActiveSessionHandler<Role>
360409
where
361410
Role: HasEndpoint<Agent>,

src/yopo/src/lib.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22
//!
33
//! Provides a convenient API for running one-shot prompts against SACP components.
44
5+
use sacp::ClientToAgent;
56
use sacp::schema::{
67
AudioContent, ContentBlock, EmbeddedResourceResource, ImageContent, InitializeRequest,
78
RequestPermissionOutcome, RequestPermissionRequest, RequestPermissionResponse,
89
SessionNotification, TextContent, VERSION as PROTOCOL_VERSION,
910
};
1011
use sacp::util::MatchMessage;
11-
use sacp::{ClientToAgent, JrResponder};
1212
use sacp::{Component, Handled, MessageCx, UntypedMessage};
1313
use std::path::PathBuf;
1414

@@ -113,10 +113,7 @@ pub async fn prompt_with_callback(
113113
.block_task()
114114
.await?;
115115

116-
let mut session = cx
117-
.build_session(PathBuf::from("."))
118-
.send_request(JrResponder::run)
119-
.await?;
116+
let mut session = cx.build_session(PathBuf::from(".")).spawn_session().await?;
120117

121118
session.send_prompt(prompt_text)?;
122119

0 commit comments

Comments
 (0)