Skip to content

Commit 05c1878

Browse files
sessionId= should not be required
1 parent dca96d2 commit 05c1878

File tree

3 files changed

+120
-27
lines changed

3 files changed

+120
-27
lines changed

README.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,28 @@ Example:
141141

142142
This server implements the Model Context Protocol (MCP) which allows it to be easily integrated with LLM clients that support the protocol. For more information about MCP, visit [the MCP repository](https://github.com/modelcontextprotocol/mcp).
143143

144+
### VScode MCP, RooCode example
145+
146+
```json
147+
// Roo Code, use bunx or npx, sessionId=
148+
{
149+
"mcpServers":{
150+
"rust-crate-docs": {
151+
"command": "bunx",
152+
"args": [
153+
"-y",
154+
"mcp-remote@latest",
155+
"http://127.0.0.1:3000/sse?sessionId=",
156+
"--allow-http",
157+
"--transport sse-only",
158+
"--debug"
159+
]
160+
}
161+
}
162+
}
163+
```
164+
165+
144166
## License
145167

146168
MIT License

justfile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
run:
2+
cargo run --bin cratedocs http --address 0.0.0.0:3000 --debug
3+
4+
debug-mcp-remote:
5+
# use bunx or npx to see how the mcp-remote proxy connects
6+
bunx mcp-remote@latest "http://127.0.0.1:3000/sse" --allow-http --transport sse-only --debug

src/transport/http_sse_server/http_sse_server.rs

Lines changed: 92 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -52,47 +52,112 @@ fn session_id() -> SessionId {
5252
#[derive(Debug, serde::Deserialize)]
5353
#[serde(rename_all = "camelCase")]
5454
pub struct PostEventQuery {
55-
pub session_id: String,
55+
#[serde(default)] // Use None if session_id is not present in query
56+
pub session_id: Option<String>,
5657
}
5758

5859
async fn post_event_handler(
5960
State(app): State<App>,
60-
Query(PostEventQuery { session_id }): Query<PostEventQuery>,
61+
Query(query_params): Query<PostEventQuery>,
6162
body: Body,
6263
) -> Result<StatusCode, StatusCode> {
64+
tracing::debug!(?query_params, "Received POST request");
6365
const BODY_BYTES_LIMIT: usize = 1 << 22;
64-
let write_stream = {
65-
let rg = app.txs.read().await;
66-
rg.get(session_id.as_str())
67-
.ok_or(StatusCode::NOT_FOUND)?
68-
.clone()
69-
};
70-
let mut write_stream = write_stream.lock().await;
71-
let mut body = body.into_data_stream();
72-
if let (_, Some(size)) = body.size_hint() {
73-
if size > BODY_BYTES_LIMIT {
66+
const BUFFER_SIZE: usize = 1 << 12; // For new sessions
67+
68+
let (session_id_arc, c2s_writer_for_body): (SessionId, C2SWriter) =
69+
match query_params.session_id {
70+
Some(id_str) => {
71+
tracing::debug!(session_id = %id_str, "sessionId provided in query");
72+
// Convert String to Arc<str> for map lookup
73+
let session_arc: SessionId = Arc::from(id_str.as_str());
74+
let rg = app.txs.read().await;
75+
match rg.get(&session_arc) {
76+
Some(writer) => {
77+
tracing::debug!(session_id = %session_arc, "Found existing session writer");
78+
(session_arc, writer.clone())
79+
}
80+
None => {
81+
tracing::warn!(session_id = %session_arc, "sessionId provided but not found in active sessions");
82+
return Err(StatusCode::NOT_FOUND);
83+
}
84+
}
85+
}
86+
None => {
87+
tracing::info!("sessionId not provided, creating new session for POST request");
88+
let new_session_id_arc = session_id(); // fn session_id() -> Arc<str>
89+
tracing::info!(new_session_id = %new_session_id_arc, "Generated new session ID");
90+
91+
let (c2s_read, c2s_write_half) = tokio::io::simplex(BUFFER_SIZE);
92+
// s2c_read/write are also needed for the ByteTransport and Server::run
93+
// _s2c_read is not directly used by this POST handler but needed for the spawned server task.
94+
let (_s2c_read, s2c_write_half) = tokio::io::simplex(BUFFER_SIZE);
95+
96+
let new_c2s_writer_for_map = Arc::new(Mutex::new(c2s_write_half));
97+
app.txs
98+
.write()
99+
.await
100+
.insert(new_session_id_arc.clone(), new_c2s_writer_for_map.clone());
101+
tracing::info!(session_id = %new_session_id_arc, "Inserted new session writer into app.txs");
102+
103+
// Spawn the server task for the new session
104+
let app_clone = app.clone();
105+
let task_session_id = new_session_id_arc.clone();
106+
tokio::spawn(async move {
107+
let router = RouterService(DocRouter::new());
108+
let server = Server::new(router);
109+
let bytes_transport = ByteTransport::new(c2s_read, s2c_write_half);
110+
tracing::info!(session_id = %task_session_id, "Spawning server task for new POST session");
111+
let _result = server
112+
.run(bytes_transport)
113+
.await
114+
.inspect_err(|e| {
115+
tracing::error!(?e, session_id = %task_session_id, "Server run error for new POST session")
116+
});
117+
app_clone.txs.write().await.remove(&task_session_id);
118+
tracing::info!(session_id = %task_session_id, "Cleaned up new POST session from app.txs after server task completion");
119+
});
120+
(new_session_id_arc, new_c2s_writer_for_map)
121+
}
122+
};
123+
124+
// Process the request body using c2s_writer_for_body
125+
let mut write_stream_locked = c2s_writer_for_body.lock().await;
126+
let mut body_data_stream = body.into_data_stream();
127+
128+
if let (_, Some(size_hint)) = body_data_stream.size_hint() {
129+
if size_hint > BODY_BYTES_LIMIT {
130+
tracing::warn!(%session_id_arc, body_size_hint = size_hint, limit = BODY_BYTES_LIMIT, "Payload too large based on hint");
74131
return Err(StatusCode::PAYLOAD_TOO_LARGE);
75132
}
76133
}
77-
// calculate the body size
78-
let mut size = 0;
79-
while let Some(chunk) = body.next().await {
80-
let Ok(chunk) = chunk else {
81-
return Err(StatusCode::BAD_REQUEST);
134+
135+
let mut actual_size = 0;
136+
while let Some(chunk_result) = body_data_stream.next().await {
137+
let chunk = match chunk_result {
138+
Ok(c) => c,
139+
Err(e) => {
140+
tracing::error!(%session_id_arc, ?e, "Error reading chunk from body stream");
141+
return Err(StatusCode::BAD_REQUEST);
142+
}
82143
};
83-
size += chunk.len();
84-
if size > BODY_BYTES_LIMIT {
144+
actual_size += chunk.len();
145+
if actual_size > BODY_BYTES_LIMIT {
146+
tracing::warn!(%session_id_arc, actual_body_size = actual_size, limit = BODY_BYTES_LIMIT, "Payload too large during streaming");
85147
return Err(StatusCode::PAYLOAD_TOO_LARGE);
86148
}
87-
write_stream
88-
.write_all(&chunk)
89-
.await
90-
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
149+
if let Err(e) = write_stream_locked.write_all(&chunk).await {
150+
tracing::error!(%session_id_arc, ?e, "Error writing chunk to session stream");
151+
return Err(StatusCode::INTERNAL_SERVER_ERROR);
152+
}
91153
}
92-
write_stream
93-
.write_u8(b'\n')
94-
.await
95-
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
154+
155+
if let Err(e) = write_stream_locked.write_u8(b'\n').await {
156+
tracing::error!(%session_id_arc, ?e, "Error writing newline to session stream");
157+
return Err(StatusCode::INTERNAL_SERVER_ERROR);
158+
}
159+
160+
tracing::info!(%session_id_arc, "Successfully processed POST request body");
96161
Ok(StatusCode::ACCEPTED)
97162
}
98163

0 commit comments

Comments
 (0)