Skip to content

Add stateless feature to Streamable HTTP protocol #101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 49 additions & 13 deletions lib/mcp/server/transports/streamable_http_transport.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ module MCP
class Server
module Transports
class StreamableHTTPTransport < Transport
def initialize(server)
super
def initialize(server, stateless: false)
super(server)
# { session_id => { stream: stream_object }
@sessions = {}
@mutex = Mutex.new

@stateless = stateless
end

def handle_request(request)
Expand All @@ -24,7 +26,7 @@ def handle_request(request)
when "DELETE"
handle_delete(request)
else
[405, { "Content-Type" => "application/json" }, [{ error: "Method not allowed" }.to_json]]
method_not_allowed_response
end
end

Expand All @@ -35,6 +37,9 @@ def close
end

def send_notification(method, params = nil, session_id: nil)
# Stateless mode doesn't support notifications
raise "Stateless mode does not support notifications" if @stateless

notification = {
jsonrpc: "2.0",
method:,
Expand Down Expand Up @@ -117,6 +122,10 @@ def handle_post(request)
end

def handle_get(request)
if @stateless
return method_not_allowed_response
end

session_id = extract_session_id(request)

return missing_session_id_response unless session_id
Expand All @@ -126,6 +135,13 @@ def handle_get(request)
end

def handle_delete(request)
success_response = [200, { "Content-Type" => "application/json" }, [{ success: true }.to_json]]

if @stateless
# Stateless mode doesn't support sessions, so we can just return a success response
return success_response
end

session_id = request.env["HTTP_MCP_SESSION_ID"]

return [
Expand All @@ -135,7 +151,7 @@ def handle_delete(request)
] unless session_id

cleanup_session(session_id)
[200, { "Content-Type" => "application/json" }, [{ success: true }.to_json]]
success_response
end

def cleanup_session(session_id)
Expand Down Expand Up @@ -167,40 +183,56 @@ def parse_request_body(body_string)
end

def handle_initialization(body_string, body)
session_id = SecureRandom.uuid
session_id = nil

@mutex.synchronize do
@sessions[session_id] = {
stream: nil,
}
unless @stateless
session_id = SecureRandom.uuid

@mutex.synchronize do
@sessions[session_id] = {
stream: nil,
}
end
end

response = @server.handle_json(body_string)

headers = {
"Content-Type" => "application/json",
"Mcp-Session-Id" => session_id,
}

headers["Mcp-Session-Id"] = session_id if session_id

[200, headers, [response]]
end

def handle_regular_request(body_string, session_id)
# If session ID is provided, but not in the sessions hash, return an error
if session_id && [email protected]?(session_id)
return [400, { "Content-Type" => "application/json" }, [{ error: "Invalid session ID" }.to_json]]
unless @stateless
# If session ID is provided, but not in the sessions hash, return an error
if session_id && [email protected]?(session_id)
return [400, { "Content-Type" => "application/json" }, [{ error: "Invalid session ID" }.to_json]]
end
end

response = @server.handle_json(body_string)

# Stream can be nil since stateless mode doesn't retain streams
stream = get_session_stream(session_id) if session_id

if stream
send_response_to_stream(stream, response, session_id)
elsif response.nil? && notification_request?(body_string)
[202, { "Content-Type" => "application/json" }, [response]]
else
[200, { "Content-Type" => "application/json" }, [response]]
end
end

def notification_request?(body_string)
body = parse_request_body(body_string)
body.is_a?(Hash) && body["method"].start_with?("notifications/")
end

def get_session_stream(session_id)
@mutex.synchronize { @sessions[session_id]&.fetch(:stream, nil) }
end
Expand All @@ -222,6 +254,10 @@ def session_exists?(session_id)
@mutex.synchronize { @sessions.key?(session_id) }
end

def method_not_allowed_response
[405, { "Content-Type" => "application/json" }, [{ error: "Method not allowed" }.to_json]]
end

def missing_session_id_response
[400, { "Content-Type" => "application/json" }, [{ error: "Missing session ID" }.to_json]]
end
Expand Down
122 changes: 122 additions & 0 deletions test/mcp/server/transports/streamable_http_transport_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,128 @@ class StreamableHTTPTransportTest < ActiveSupport::TestCase
assert_equal "Method not allowed", body["error"]
end

test "stateless mode allows requests without session IDs, responding with no session ID" do
stateless_transport = StreamableHTTPTransport.new(@server, stateless: true)

init_request = create_rack_request(
"POST",
"/",
{ "CONTENT_TYPE" => "application/json" },
{ jsonrpc: "2.0", method: "initialize", id: "init" }.to_json,
)
init_response = stateless_transport.handle_request(init_request)
assert_nil init_response[1]["Mcp-Session-Id"]
end

test "stateless mode responds without any session ID when session ID is present" do
stateless_transport = StreamableHTTPTransport.new(@server, stateless: true)

request = create_rack_request(
"POST",
"/",
{
"CONTENT_TYPE" => "application/json",
"HTTP_MCP_SESSION_ID" => "unseen_session_id",
},
{ jsonrpc: "2.0", method: "ping", id: "123" }.to_json,
)

response = stateless_transport.handle_request(request)
assert_equal 200, response[0]
assert_equal(
{
"Content-Type" => "application/json",
},
response[1],
)

body = JSON.parse(response[2][0])
assert_equal "2.0", body["jsonrpc"]
assert_equal "123", body["id"]
end

test "stateless mode responds with 405 when SSE is requested" do
stateless_transport = StreamableHTTPTransport.new(@server, stateless: true)

get_request = create_rack_request(
"GET",
"/",
{
"CONTENT_TYPE" => "application/json,text/event-stream",
},
)
response = stateless_transport.handle_request(get_request)
assert_equal 405, response[0]
assert_equal({ "Content-Type" => "application/json" }, response[1])

body = JSON.parse(response[2][0])
assert_equal "Method not allowed", body["error"]
end

test "stateless mode silently responds with success to session DELETE when session ID is not present" do
stateless_transport = StreamableHTTPTransport.new(@server, stateless: true)

delete_request = create_rack_request(
"DELETE",
"/",
{},
)
response = stateless_transport.handle_request(delete_request)
assert_equal 200, response[0]
assert_equal({ "Content-Type" => "application/json" }, response[1])

body = JSON.parse(response[2][0])
assert body["success"]
end

test "stateless mode silently responds with success to session DELETE when session ID is provided" do
stateless_transport = StreamableHTTPTransport.new(@server, stateless: true)

delete_request = create_rack_request(
"DELETE",
"/",
{ "HTTP_MCP_SESSION_ID" => "session_id" },
)
response = stateless_transport.handle_request(delete_request)
assert_equal 200, response[0]
assert_equal({ "Content-Type" => "application/json" }, response[1])

body = JSON.parse(response[2][0])
assert body["success"]
end

test "stateless mode does not support server-sent events" do
stateless_transport = StreamableHTTPTransport.new(@server, stateless: true)

e = assert_raises(RuntimeError) do
stateless_transport.send_notification(
"test_notification",
{ message: "Hello" },
session_id: "some_session_id",
)
end

assert_equal("Stateless mode does not support notifications", e.message)
end

test "stateless mode responds with 202 when client sends a notification/initialized request" do
stateless_transport = StreamableHTTPTransport.new(@server, stateless: true)

request = create_rack_request(
"POST",
"/",
{ "CONTENT_TYPE" => "application/json" },
{ jsonrpc: "2.0", method: "notifications/initialized" }.to_json,
)

response = stateless_transport.handle_request(request)
assert_equal 202, response[0]
assert_equal({ "Content-Type" => "application/json" }, response[1])

body = response[2][0]
assert_nil(body)
end

test "handle post request with a standard error" do
request = create_rack_request(
"POST",
Expand Down
Loading