Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 69 additions & 2 deletions poem-mcpserver/src/streamable_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,22 @@ use crate::{
tool::Tools,
};

const SESSION_TIMEOUT: Duration = Duration::from_secs(60 * 5);
const DEFAULT_SESSION_TIMEOUT: Duration = Duration::from_secs(60 * 5);

/// Configuration options for streamable HTTP sessions.
#[derive(Clone, Copy, Debug)]
pub struct Config {
/// Session idle timeout. Use `None` to disable expiration.
pub session_timeout: Option<Duration>,
}

impl Default for Config {
fn default() -> Self {
Self {
session_timeout: Some(DEFAULT_SESSION_TIMEOUT),
}
}
}

type ServerFactoryFn<ToolsType, PromptsType> =
Box<dyn Fn(&Request) -> McpServer<ToolsType, PromptsType> + Send + Sync>;
Expand Down Expand Up @@ -150,6 +165,10 @@ where
let (server, sender) = {
let mut sessions = data.0.sessions.lock().unwrap();
let Some(session) = sessions.get_mut(&session_id) else {
tracing::warn!(
session_id = session_id,
"session not found (expired or invalid)"
);
return StatusCode::NOT_FOUND.into_response();
};
session.last_active = Instant::now();
Expand Down Expand Up @@ -260,7 +279,40 @@ where
}

/// A streamable http endpoint that can be used to handle MCP requests.
///
/// Uses the default configuration (5-minute idle timeout).
pub fn endpoint<F, ToolsType, PromptsType>(server_factory: F) -> impl IntoEndpoint
where
F: Fn(&Request) -> McpServer<ToolsType, PromptsType> + Send + Sync + 'static,
ToolsType: Tools + Send + Sync + 'static,
PromptsType: Prompts + Send + Sync + 'static,
{
endpoint_with_config(server_factory, Config::default())
}

/// A streamable http endpoint with configurable session behavior.
///
/// Set `Config::session_timeout` to `None` to disable session expiration.
///
/// # Example
/// ```rust,no_run
/// use poem::Route;
/// use poem_mcpserver::{McpServer, streamable_http};
///
/// let app = Route::new().at(
/// "/",
/// streamable_http::endpoint_with_config(
/// |_| McpServer::new(),
/// streamable_http::Config {
/// session_timeout: None,
/// },
/// ),
/// );
/// ```
pub fn endpoint_with_config<F, ToolsType, PromptsType>(
server_factory: F,
config: Config,
) -> impl IntoEndpoint
where
F: Fn(&Request) -> McpServer<ToolsType, PromptsType> + Send + Sync + 'static,
ToolsType: Tools + Send + Sync + 'static,
Expand All @@ -271,14 +323,29 @@ where
sessions: Default::default(),
});

let session_timeout = config.session_timeout;
tokio::spawn({
let state = state.clone();
async move {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
let now = interval.tick().await;
let mut sessions = state.sessions.lock().unwrap();
sessions.retain(|_, session| (now - session.last_active) < SESSION_TIMEOUT);
sessions.retain(|session_id, session| {
let Some(timeout) = session_timeout else {
return true;
};
let expired = (now - session.last_active) >= timeout;
if expired {
tracing::info!(
session_id = session_id,
timeout_seconds = timeout.as_secs(),
last_active = ?session.last_active,
"expired session"
);
}
!expired
});
}
}
});
Expand Down