Skip to content

Commit 4ebf3c4

Browse files
authored
feat(mcpserver): support session timeout config (#1163)
* feat(mcpserver): support session timeout config
1 parent b88e3b8 commit 4ebf3c4

File tree

1 file changed

+69
-2
lines changed

1 file changed

+69
-2
lines changed

poem-mcpserver/src/streamable_http.rs

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,22 @@ use crate::{
2525
tool::Tools,
2626
};
2727

28-
const SESSION_TIMEOUT: Duration = Duration::from_secs(60 * 5);
28+
const DEFAULT_SESSION_TIMEOUT: Duration = Duration::from_secs(60 * 5);
29+
30+
/// Configuration options for streamable HTTP sessions.
31+
#[derive(Clone, Copy, Debug)]
32+
pub struct Config {
33+
/// Session idle timeout. Use `None` to disable expiration.
34+
pub session_timeout: Option<Duration>,
35+
}
36+
37+
impl Default for Config {
38+
fn default() -> Self {
39+
Self {
40+
session_timeout: Some(DEFAULT_SESSION_TIMEOUT),
41+
}
42+
}
43+
}
2944

3045
type ServerFactoryFn<ToolsType, PromptsType> =
3146
Box<dyn Fn(&Request) -> McpServer<ToolsType, PromptsType> + Send + Sync>;
@@ -150,6 +165,10 @@ where
150165
let (server, sender) = {
151166
let mut sessions = data.0.sessions.lock().unwrap();
152167
let Some(session) = sessions.get_mut(&session_id) else {
168+
tracing::warn!(
169+
session_id = session_id,
170+
"session not found (expired or invalid)"
171+
);
153172
return StatusCode::NOT_FOUND.into_response();
154173
};
155174
session.last_active = Instant::now();
@@ -260,7 +279,40 @@ where
260279
}
261280

262281
/// A streamable http endpoint that can be used to handle MCP requests.
282+
///
283+
/// Uses the default configuration (5-minute idle timeout).
263284
pub fn endpoint<F, ToolsType, PromptsType>(server_factory: F) -> impl IntoEndpoint
285+
where
286+
F: Fn(&Request) -> McpServer<ToolsType, PromptsType> + Send + Sync + 'static,
287+
ToolsType: Tools + Send + Sync + 'static,
288+
PromptsType: Prompts + Send + Sync + 'static,
289+
{
290+
endpoint_with_config(server_factory, Config::default())
291+
}
292+
293+
/// A streamable http endpoint with configurable session behavior.
294+
///
295+
/// Set `Config::session_timeout` to `None` to disable session expiration.
296+
///
297+
/// # Example
298+
/// ```rust,no_run
299+
/// use poem::Route;
300+
/// use poem_mcpserver::{McpServer, streamable_http};
301+
///
302+
/// let app = Route::new().at(
303+
/// "/",
304+
/// streamable_http::endpoint_with_config(
305+
/// |_| McpServer::new(),
306+
/// streamable_http::Config {
307+
/// session_timeout: None,
308+
/// },
309+
/// ),
310+
/// );
311+
/// ```
312+
pub fn endpoint_with_config<F, ToolsType, PromptsType>(
313+
server_factory: F,
314+
config: Config,
315+
) -> impl IntoEndpoint
264316
where
265317
F: Fn(&Request) -> McpServer<ToolsType, PromptsType> + Send + Sync + 'static,
266318
ToolsType: Tools + Send + Sync + 'static,
@@ -271,14 +323,29 @@ where
271323
sessions: Default::default(),
272324
});
273325

326+
let session_timeout = config.session_timeout;
274327
tokio::spawn({
275328
let state = state.clone();
276329
async move {
277330
let mut interval = tokio::time::interval(Duration::from_secs(5));
278331
loop {
279332
let now = interval.tick().await;
280333
let mut sessions = state.sessions.lock().unwrap();
281-
sessions.retain(|_, session| (now - session.last_active) < SESSION_TIMEOUT);
334+
sessions.retain(|session_id, session| {
335+
let Some(timeout) = session_timeout else {
336+
return true;
337+
};
338+
let expired = (now - session.last_active) >= timeout;
339+
if expired {
340+
tracing::info!(
341+
session_id = session_id,
342+
timeout_seconds = timeout.as_secs(),
343+
last_active = ?session.last_active,
344+
"expired session"
345+
);
346+
}
347+
!expired
348+
});
282349
}
283350
}
284351
});

0 commit comments

Comments
 (0)