Skip to content

Commit ca3537e

Browse files
committed
Introduce stateless mode to allow for HA
- Create failing tests for `stateless` mode - Implement `stateless` mode by turning of SSE in Streamable HTTP, making the interaction a standard HTTP Req/Resp Implement `stateless` mode
1 parent 122bbd0 commit ca3537e

File tree

2 files changed

+142
-11
lines changed

2 files changed

+142
-11
lines changed

lib/mcp/server/transports/streamable_http_transport.rb

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@ module MCP
88
class Server
99
module Transports
1010
class StreamableHTTPTransport < Transport
11-
def initialize(server)
12-
super
11+
def initialize(server, stateless: false)
12+
super(server)
1313
# { session_id => { stream: stream_object }
1414
@sessions = {}
1515
@mutex = Mutex.new
16+
17+
@stateless = stateless
1618
end
1719

1820
def handle_request(request)
@@ -24,7 +26,7 @@ def handle_request(request)
2426
when "DELETE"
2527
handle_delete(request)
2628
else
27-
[405, { "Content-Type" => "application/json" }, [{ error: "Method not allowed" }.to_json]]
29+
method_not_allowed_response
2830
end
2931
end
3032

@@ -35,6 +37,9 @@ def close
3537
end
3638

3739
def send_notification(method, params = nil, session_id: nil)
40+
# Stateless mode doesn't support notifications
41+
raise "Stateless mode does not support notifications" if @stateless
42+
3843
notification = {
3944
jsonrpc: "2.0",
4045
method:,
@@ -117,6 +122,10 @@ def handle_post(request)
117122
end
118123

119124
def handle_get(request)
125+
if @stateless
126+
return method_not_allowed_response
127+
end
128+
120129
session_id = extract_session_id(request)
121130

122131
return missing_session_id_response unless session_id
@@ -126,6 +135,13 @@ def handle_get(request)
126135
end
127136

128137
def handle_delete(request)
138+
success_response = [200, { "Content-Type" => "application/json" }, [{ success: true }.to_json]]
139+
140+
if @stateless
141+
# Stateless mode doesn't support sessions, so we can just return a success response
142+
return success_response
143+
end
144+
129145
session_id = request.env["HTTP_MCP_SESSION_ID"]
130146

131147
return [
@@ -135,7 +151,7 @@ def handle_delete(request)
135151
] unless session_id
136152

137153
cleanup_session(session_id)
138-
[200, { "Content-Type" => "application/json" }, [{ success: true }.to_json]]
154+
success_response
139155
end
140156

141157
def cleanup_session(session_id)
@@ -169,10 +185,12 @@ def parse_request_body(body_string)
169185
def handle_initialization(body_string, body)
170186
session_id = SecureRandom.uuid
171187

172-
@mutex.synchronize do
173-
@sessions[session_id] = {
174-
stream: nil,
175-
}
188+
unless @stateless
189+
@mutex.synchronize do
190+
@sessions[session_id] = {
191+
stream: nil,
192+
}
193+
end
176194
end
177195

178196
response = @server.handle_json(body_string)
@@ -186,12 +204,16 @@ def handle_initialization(body_string, body)
186204
end
187205

188206
def handle_regular_request(body_string, session_id)
189-
# If session ID is provided, but not in the sessions hash, return an error
190-
if session_id && !@sessions.key?(session_id)
191-
return [400, { "Content-Type" => "application/json" }, [{ error: "Invalid session ID" }.to_json]]
207+
unless @stateless
208+
# If session ID is provided, but not in the sessions hash, return an error
209+
if session_id && !@sessions.key?(session_id)
210+
return [400, { "Content-Type" => "application/json" }, [{ error: "Invalid session ID" }.to_json]]
211+
end
192212
end
193213

194214
response = @server.handle_json(body_string)
215+
216+
# Stream can be nil since stateless mode doesn't retain streams
195217
stream = get_session_stream(session_id) if session_id
196218

197219
if stream
@@ -222,6 +244,10 @@ def session_exists?(session_id)
222244
@mutex.synchronize { @sessions.key?(session_id) }
223245
end
224246

247+
def method_not_allowed_response
248+
[405, { "Content-Type" => "application/json" }, [{ error: "Method not allowed" }.to_json]]
249+
end
250+
225251
def missing_session_id_response
226252
[400, { "Content-Type" => "application/json" }, [{ error: "Missing session ID" }.to_json]]
227253
end

test/mcp/server/transports/streamable_http_transport_test.rb

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,111 @@ class StreamableHTTPTransportTest < ActiveSupport::TestCase
575575
assert_equal "Method not allowed", body["error"]
576576
end
577577

578+
test "stateless mode allows requests without session IDs, responding with a new session ID" do
579+
stateless_transport = StreamableHTTPTransport.new(@server, stateless: true)
580+
581+
init_request = create_rack_request(
582+
"POST",
583+
"/",
584+
{ "CONTENT_TYPE" => "application/json" },
585+
{ jsonrpc: "2.0", method: "initialize", id: "init" }.to_json,
586+
)
587+
init_response = stateless_transport.handle_request(init_request)
588+
session_id = init_response[1]["Mcp-Session-Id"]
589+
590+
assert_not_nil session_id
591+
end
592+
593+
test "stateless mode responds without any session ID when session ID is present" do
594+
stateless_transport = StreamableHTTPTransport.new(@server, stateless: true)
595+
596+
request = create_rack_request(
597+
"POST",
598+
"/",
599+
{
600+
"CONTENT_TYPE" => "application/json",
601+
"HTTP_MCP_SESSION_ID" => "unseen_session_id",
602+
},
603+
{ jsonrpc: "2.0", method: "ping", id: "123" }.to_json,
604+
)
605+
606+
response = stateless_transport.handle_request(request)
607+
assert_equal 200, response[0]
608+
assert_equal(
609+
{
610+
"Content-Type" => "application/json",
611+
},
612+
response[1],
613+
)
614+
615+
body = JSON.parse(response[2][0])
616+
assert_equal "2.0", body["jsonrpc"]
617+
assert_equal "123", body["id"]
618+
end
619+
620+
test "stateless mode responds with 405 when SSE is requested" do
621+
stateless_transport = StreamableHTTPTransport.new(@server, stateless: true)
622+
623+
get_request = create_rack_request(
624+
"GET",
625+
"/",
626+
{
627+
"CONTENT_TYPE" => "application/json,text/event-stream",
628+
},
629+
)
630+
response = stateless_transport.handle_request(get_request)
631+
assert_equal 405, response[0]
632+
assert_equal({ "Content-Type" => "application/json" }, response[1])
633+
634+
body = JSON.parse(response[2][0])
635+
assert_equal "Method not allowed", body["error"]
636+
end
637+
638+
test "stateless mode silently responds with success to session DELETE when session ID is not present" do
639+
stateless_transport = StreamableHTTPTransport.new(@server, stateless: true)
640+
641+
delete_request = create_rack_request(
642+
"DELETE",
643+
"/",
644+
{},
645+
)
646+
response = stateless_transport.handle_request(delete_request)
647+
assert_equal 200, response[0]
648+
assert_equal({ "Content-Type" => "application/json" }, response[1])
649+
650+
body = JSON.parse(response[2][0])
651+
assert body["success"]
652+
end
653+
654+
test "stateless mode silently responds with success to session DELETE when session ID is provided" do
655+
stateless_transport = StreamableHTTPTransport.new(@server, stateless: true)
656+
657+
delete_request = create_rack_request(
658+
"DELETE",
659+
"/",
660+
{ "HTTP_MCP_SESSION_ID" => "session_id" },
661+
)
662+
response = stateless_transport.handle_request(delete_request)
663+
assert_equal 200, response[0]
664+
assert_equal({ "Content-Type" => "application/json" }, response[1])
665+
666+
body = JSON.parse(response[2][0])
667+
assert body["success"]
668+
end
669+
670+
test "stateless mode does not support notifications" do
671+
stateless_transport = StreamableHTTPTransport.new(@server, stateless: true)
672+
673+
# There are no sessions, so this should fail
674+
assert_raises(RuntimeError, "Stateless mode does not support notifications") do
675+
stateless_transport.send_notification(
676+
"test_notification",
677+
{ message: "Hello" },
678+
session_id: "some_session_id",
679+
)
680+
end
681+
end
682+
578683
test "handle post request with a standard error" do
579684
request = create_rack_request(
580685
"POST",

0 commit comments

Comments
 (0)