diff --git a/README.md b/README.md index 2dbae56..8707e6c 100644 --- a/README.md +++ b/README.md @@ -524,7 +524,8 @@ $transport = new StreamableHttpServerTransport( host: '127.0.0.1', // MCP protocol prohibits 0.0.0.0 port: 8080, mcpPathPrefix: 'mcp', - enableJsonResponse: false // Use SSE streaming (default) + enableJsonResponse: false, // Use SSE streaming (default) + stateless: false // Enable stateless mode for session-less clients ); $server->listen($transport); @@ -546,12 +547,27 @@ $transport = new StreamableHttpServerTransport( ); ``` +**Stateless Mode:** + +For clients that have issues with session management, enable stateless mode: + +```php +$transport = new StreamableHttpServerTransport( + host: '127.0.0.1', + port: 8080, + stateless: true // Each request is independent +); +``` + +In stateless mode, session IDs are generated internally but not exposed to clients, and each request is treated as independent without persistent session state. + **Features:** - **Resumable connections** - clients can reconnect and replay missed events - **Event sourcing** - all events are stored for replay - **JSON mode** - optional JSON-only responses for fast tools - **Enhanced session management** - persistent session state - **Multiple client support** - designed for concurrent clients +- **Stateless mode** - session-less operation for simple clients ## 📋 Schema Generation and Validation diff --git a/src/Protocol.php b/src/Protocol.php index daf2194..4ca20f2 100644 --- a/src/Protocol.php +++ b/src/Protocol.php @@ -134,6 +134,12 @@ public function processMessage(Request|Notification|BatchRequest $message, strin return; } + if ($context['stateless'] ?? false) { + $session->set('initialized', true); + $session->set('protocol_version', self::LATEST_PROTOCOL_VERSION); + $session->set('client_info', ['name' => 'stateless-client', 'version' => '1.0.0']); + } + $response = null; if ($message instanceof BatchRequest) { diff --git a/src/Transports/StreamableHttpServerTransport.php b/src/Transports/StreamableHttpServerTransport.php index 75eee9f..ae29267 100644 --- a/src/Transports/StreamableHttpServerTransport.php +++ b/src/Transports/StreamableHttpServerTransport.php @@ -75,6 +75,7 @@ public function __construct( private string $mcpPath = '/mcp', private ?array $sslContext = null, private readonly bool $enableJsonResponse = true, + private readonly bool $stateless = false, ?EventStoreInterface $eventStore = null ) { $this->logger = new NullLogger(); @@ -171,9 +172,9 @@ private function createRequestHandler(): callable try { return match ($method) { - 'GET' => $this->handleGetRequest($request)->then($addCors, fn ($e) => $addCors($this->handleRequestError($e, $request))), - 'POST' => $this->handlePostRequest($request)->then($addCors, fn ($e) => $addCors($this->handleRequestError($e, $request))), - 'DELETE' => $this->handleDeleteRequest($request)->then($addCors, fn ($e) => $addCors($this->handleRequestError($e, $request))), + 'GET' => $this->handleGetRequest($request)->then($addCors, fn($e) => $addCors($this->handleRequestError($e, $request))), + 'POST' => $this->handlePostRequest($request)->then($addCors, fn($e) => $addCors($this->handleRequestError($e, $request))), + 'DELETE' => $this->handleDeleteRequest($request)->then($addCors, fn($e) => $addCors($this->handleRequestError($e, $request))), default => $addCors($this->handleUnsupportedRequest($request)), }; } catch (Throwable $e) { @@ -184,6 +185,11 @@ private function createRequestHandler(): callable private function handleGetRequest(ServerRequestInterface $request): PromiseInterface { + if ($this->stateless) { + $error = Error::forInvalidRequest("GET requests (SSE streaming) are not supported in stateless mode."); + return resolve(new HttpResponse(405, ['Content-Type' => 'application/json'], json_encode($error))); + } + $acceptHeader = $request->getHeaderLine('Accept'); if (!str_contains($acceptHeader, 'text/event-stream')) { $error = Error::forInvalidRequest("Not Acceptable: Client must accept text/event-stream for GET requests."); @@ -264,24 +270,29 @@ private function handlePostRequest(ServerRequestInterface $request): PromiseInte $isInitializeRequest = ($message instanceof Request && $message->method === 'initialize'); $sessionId = null; - if ($isInitializeRequest) { - if ($request->hasHeader('Mcp-Session-Id')) { - $this->logger->warning("Client sent Mcp-Session-Id with InitializeRequest. Ignoring.", ['clientSentId' => $request->getHeaderLine('Mcp-Session-Id')]); - $error = Error::forInvalidRequest("Invalid request: Session already initialized. Mcp-Session-Id header not allowed with InitializeRequest.", $message->getId()); - $deferred->resolve(new HttpResponse(400, ['Content-Type' => 'application/json'], json_encode($error))); - return $deferred->promise(); - } - + if ($this->stateless) { $sessionId = $this->generateId(); $this->emit('client_connected', [$sessionId]); } else { - $sessionId = $request->getHeaderLine('Mcp-Session-Id'); + if ($isInitializeRequest) { + if ($request->hasHeader('Mcp-Session-Id')) { + $this->logger->warning("Client sent Mcp-Session-Id with InitializeRequest. Ignoring.", ['clientSentId' => $request->getHeaderLine('Mcp-Session-Id')]); + $error = Error::forInvalidRequest("Invalid request: Session already initialized. Mcp-Session-Id header not allowed with InitializeRequest.", $message->getId()); + $deferred->resolve(new HttpResponse(400, ['Content-Type' => 'application/json'], json_encode($error))); + return $deferred->promise(); + } + + $sessionId = $this->generateId(); + $this->emit('client_connected', [$sessionId]); + } else { + $sessionId = $request->getHeaderLine('Mcp-Session-Id'); - if (empty($sessionId)) { - $this->logger->warning("POST request without Mcp-Session-Id."); - $error = Error::forInvalidRequest("Mcp-Session-Id header required for POST requests.", $message->getId()); - $deferred->resolve(new HttpResponse(400, ['Content-Type' => 'application/json'], json_encode($error))); - return $deferred->promise(); + if (empty($sessionId)) { + $this->logger->warning("POST request without Mcp-Session-Id."); + $error = Error::forInvalidRequest("Mcp-Session-Id header required for POST requests.", $message->getId()); + $deferred->resolve(new HttpResponse(400, ['Content-Type' => 'application/json'], json_encode($error))); + return $deferred->promise(); + } } } @@ -344,7 +355,7 @@ private function handlePostRequest(ServerRequestInterface $request): PromiseInte 'X-Accel-Buffering' => 'no', ]; - if (!empty($sessionId)) { + if (!empty($sessionId) && !$this->stateless) { $headers['Mcp-Session-Id'] = $sessionId; } @@ -355,6 +366,8 @@ private function handlePostRequest(ServerRequestInterface $request): PromiseInte } } + $context['stateless'] = $this->stateless; + $this->loop->futureTick(function () use ($message, $sessionId, $context) { $this->emit('message', [$message, $sessionId, $context]); }); @@ -364,6 +377,10 @@ private function handlePostRequest(ServerRequestInterface $request): PromiseInte private function handleDeleteRequest(ServerRequestInterface $request): PromiseInterface { + if ($this->stateless) { + return resolve(new HttpResponse(204)); + } + $sessionId = $request->getHeaderLine('Mcp-Session-Id'); if (empty($sessionId)) { $this->logger->warning("DELETE request without Mcp-Session-Id."); @@ -466,6 +483,12 @@ public function sendMessage(Message $message, string $sessionId, array $context if ($this->activeSseStreams[$streamId]['context']['nResponses'] >= $this->activeSseStreams[$streamId]['context']['nRequests']) { $this->logger->info("All expected responses sent for POST SSE stream. Closing.", ['streamId' => $streamId, 'sessionId' => $sessionId]); $stream->end(); // Will trigger 'close' event. + + if ($context['stateless'] ?? false) { + $this->loop->futureTick(function () use ($sessionId) { + $this->emit('client_disconnected', [$sessionId, 'Stateless request completed']); + }); + } } } @@ -483,12 +506,19 @@ public function sendMessage(Message $message, string $sessionId, array $context $responseBody = json_encode($message, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE); $headers = ['Content-Type' => 'application/json']; - if ($isInitializeResponse) { + if ($isInitializeResponse && !$this->stateless) { $headers['Mcp-Session-Id'] = $sessionId; } $statusCode = $context['status_code'] ?? 200; $deferred->resolve(new HttpResponse($statusCode, $headers, $responseBody . "\n")); + + if ($context['stateless'] ?? false) { + $this->loop->futureTick(function () use ($sessionId) { + $this->emit('client_disconnected', [$sessionId, 'Stateless request completed']); + }); + } + return resolve(null); default: diff --git a/tests/Fixtures/ServerScripts/StreamableHttpTestServer.php b/tests/Fixtures/ServerScripts/StreamableHttpTestServer.php index b6c91a1..07fba5c 100755 --- a/tests/Fixtures/ServerScripts/StreamableHttpTestServer.php +++ b/tests/Fixtures/ServerScripts/StreamableHttpTestServer.php @@ -27,10 +27,11 @@ public function log($level, \Stringable|string $message, array $context = []): v $mcpPath = $argv[3] ?? 'mcp_streamable_test'; $enableJsonResponse = filter_var($argv[4] ?? 'true', FILTER_VALIDATE_BOOLEAN); $useEventStore = filter_var($argv[5] ?? 'false', FILTER_VALIDATE_BOOLEAN); +$stateless = filter_var($argv[6] ?? 'false', FILTER_VALIDATE_BOOLEAN); try { $logger = new NullLogger(); - $logger->info("Starting StreamableHttpTestServer on {$host}:{$port}/{$mcpPath}, JSON Mode: " . ($enableJsonResponse ? 'ON' : 'OFF')); + $logger->info("Starting StreamableHttpTestServer on {$host}:{$port}/{$mcpPath}, JSON Mode: " . ($enableJsonResponse ? 'ON' : 'OFF') . ", Stateless: " . ($stateless ? 'ON' : 'OFF')); $eventStore = $useEventStore ? new InMemoryEventStore() : null; @@ -48,6 +49,7 @@ public function log($level, \Stringable|string $message, array $context = []): v port: $port, mcpPath: $mcpPath, enableJsonResponse: $enableJsonResponse, + stateless: $stateless, eventStore: $eventStore ); diff --git a/tests/Integration/StreamableHttpServerTransportTest.php b/tests/Integration/StreamableHttpServerTransportTest.php index 90e296b..fd0f0c3 100644 --- a/tests/Integration/StreamableHttpServerTransportTest.php +++ b/tests/Integration/StreamableHttpServerTransportTest.php @@ -45,6 +45,16 @@ ]; $this->streamModeCommand = $commandPhpPath . ' ' . $commandScriptPath . ' ' . implode(' ', $streamModeCommandArgs); + $statelessModeCommandArgs = [ + escapeshellarg(STREAMABLE_HTTP_HOST), + escapeshellarg((string)$this->port), + escapeshellarg(STREAMABLE_MCP_PATH), + escapeshellarg('true'), // enableJsonResponse = true + escapeshellarg('false'), // useEventStore = false + escapeshellarg('true'), // stateless = true + ]; + $this->statelessModeCommand = $commandPhpPath . ' ' . $commandScriptPath . ' ' . implode(' ', $statelessModeCommandArgs); + $this->process = null; }); @@ -480,6 +490,234 @@ })->group('integration', 'streamable_http_stream'); }); +/** + * STATELESS MODE TESTS + * + * Tests for the stateless mode of StreamableHttpServerTransport, which: + * - Generates session IDs internally but doesn't expose them to clients + * - Doesn't require session IDs in requests after initialization + * - Doesn't include session IDs in response headers + * - Disables GET requests (SSE streaming) + * - Makes DELETE requests meaningless (but returns 204) + * - Treats each request as independent (no persistent session state) + * + * This mode is designed to work with clients like OpenAI's MCP implementation + * that have issues with session management in "never require approval" mode. + */ +describe('STATELESS MODE', function () { + beforeEach(function () { + $this->process = new Process($this->statelessModeCommand, getcwd() ?: null, null, []); + $this->process->start(); + $this->statelessClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH); + await(delay(0.2)); + }); + + it('allows tool calls without having to send initialized notification', function () { + // 1. Initialize Request + $initResult = await($this->statelessClient->sendRequest('initialize', [ + 'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, + 'clientInfo' => ['name' => 'StatelessModeClient', 'version' => '1.0'], + 'capabilities' => [] + ], 'init-stateless-1')); + + expect($initResult['statusCode'])->toBe(200); + expect($initResult['body']['id'])->toBe('init-stateless-1'); + expect($initResult['body'])->not->toHaveKey('error'); + expect($initResult['body']['result']['protocolVersion'])->toBe(Protocol::LATEST_PROTOCOL_VERSION); + expect($initResult['body']['result']['serverInfo']['name'])->toBe('StreamableHttpIntegrationServer'); + expect($this->statelessClient->sessionId)->toBeString()->toBeEmpty(); + + // 2. Call a tool + $toolResult = await($this->statelessClient->sendRequest('tools/call', [ + 'name' => 'greet_streamable_tool', + 'arguments' => ['name' => 'Stateless Mode User'] + ], 'tool-stateless-1')); + + expect($toolResult['statusCode'])->toBe(200); + expect($toolResult['body']['id'])->toBe('tool-stateless-1'); + expect($toolResult['body'])->not->toHaveKey('error'); + expect($toolResult['body']['result']['content'][0]['text'])->toBe('Hello, Stateless Mode User!'); + })->group('integration', 'streamable_http_stateless'); + + it('return HTTP 400 error response for invalid JSON in POST request', function () { + $malformedJson = '{"jsonrpc":"2.0", "id": "bad-json-stateless-1", "method": "tools/list", "params": {"broken"}'; + + $postPromise = $this->statelessClient->browser->post( + $this->statelessClient->baseUrl, + ['Content-Type' => 'application/json', 'Accept' => 'application/json'], + $malformedJson + ); + + try { + await(timeout($postPromise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2)); + } catch (ResponseException $e) { + $httpResponse = $e->getResponse(); + $bodyContent = (string) $httpResponse->getBody(); + $decodedBody = json_decode($bodyContent, true); + + expect($httpResponse->getStatusCode())->toBe(400); + expect($decodedBody['jsonrpc'])->toBe('2.0'); + expect($decodedBody['id'])->toBe(''); + expect($decodedBody['error']['code'])->toBe(-32700); + expect($decodedBody['error']['message'])->toContain('Invalid JSON'); + } + })->group('integration', 'streamable_http_stateless'); + + it('returns JSON-RPC error result for request for non-existent method', function () { + $errorResult = await($this->statelessClient->sendRequest('non/existentToolViaStateless', [], 'err-meth-stateless-1')); + + expect($errorResult['statusCode'])->toBe(200); + expect($errorResult['body']['id'])->toBe('err-meth-stateless-1'); + expect($errorResult['body']['error']['code'])->toBe(-32601); + expect($errorResult['body']['error']['message'])->toContain("Method 'non/existentToolViaStateless' not found"); + })->group('integration', 'streamable_http_stateless'); + + it('can handle batch requests correctly', function () { + $batchRequests = [ + ['jsonrpc' => '2.0', 'id' => 'batch-req-1', 'method' => 'tools/call', 'params' => ['name' => 'greet_streamable_tool', 'arguments' => ['name' => 'Batch Item 1']]], + ['jsonrpc' => '2.0', 'method' => 'notifications/something'], + ['jsonrpc' => '2.0', 'id' => 'batch-req-2', 'method' => 'tools/call', 'params' => ['name' => 'sum_streamable_tool', 'arguments' => ['a' => 10, 'b' => 20]]], + ['jsonrpc' => '2.0', 'id' => 'batch-req-3', 'method' => 'nonexistent/method'] + ]; + + $batchResponse = await($this->statelessClient->sendBatchRequest($batchRequests)); + + $findResponseById = function (array $batch, $id) { + foreach ($batch as $item) { + if (isset($item['id']) && $item['id'] === $id) { + return $item; + } + } + return null; + }; + + expect($batchResponse['statusCode'])->toBe(200); + expect($batchResponse['body'])->toBeArray()->toHaveCount(3); + + $response1 = $findResponseById($batchResponse['body'], 'batch-req-1'); + $response2 = $findResponseById($batchResponse['body'], 'batch-req-2'); + $response3 = $findResponseById($batchResponse['body'], 'batch-req-3'); + + expect($response1['result']['content'][0]['text'])->toBe('Hello, Batch Item 1!'); + expect($response2['result']['content'][0]['text'])->toBe('30'); + expect($response3['error']['code'])->toBe(-32601); + expect($response3['error']['message'])->toContain("Method 'nonexistent/method' not found"); + })->group('integration', 'streamable_http_stateless'); + + it('can handle tool list request', function () { + $toolListResult = await($this->statelessClient->sendRequest('tools/list', [], 'tool-list-stateless-1')); + + expect($toolListResult['statusCode'])->toBe(200); + expect($toolListResult['body']['id'])->toBe('tool-list-stateless-1'); + expect($toolListResult['body'])->not->toHaveKey('error'); + expect($toolListResult['body']['result']['tools'])->toBeArray(); + expect(count($toolListResult['body']['result']['tools']))->toBe(2); + expect($toolListResult['body']['result']['tools'][0]['name'])->toBe('greet_streamable_tool'); + expect($toolListResult['body']['result']['tools'][1]['name'])->toBe('sum_streamable_tool'); + })->group('integration', 'streamable_http_stateless'); + + it('can read a registered resource', function () { + $resourceResult = await($this->statelessClient->sendRequest('resources/read', ['uri' => 'test://streamable/static'], 'res-read-stateless-1')); + + expect($resourceResult['statusCode'])->toBe(200); + expect($resourceResult['body']['id'])->toBe('res-read-stateless-1'); + $contents = $resourceResult['body']['result']['contents']; + expect($contents[0]['uri'])->toBe('test://streamable/static'); + expect($contents[0]['text'])->toBe(\PhpMcp\Server\Tests\Fixtures\General\ResourceHandlerFixture::$staticTextContent); + })->group('integration', 'streamable_http_stateless'); + + it('can get a registered prompt', function () { + $promptResult = await($this->statelessClient->sendRequest('prompts/get', [ + 'name' => 'simple_streamable_prompt', + 'arguments' => ['name' => 'StatelessPromptUser', 'style' => 'formal'] + ], 'prompt-get-stateless-1')); + + expect($promptResult['statusCode'])->toBe(200); + expect($promptResult['body']['id'])->toBe('prompt-get-stateless-1'); + $messages = $promptResult['body']['result']['messages']; + expect($messages[0]['content']['text'])->toBe('Craft a formal greeting for StatelessPromptUser.'); + })->group('integration', 'streamable_http_stateless'); + + it('does not return session ID in response headers in stateless mode', function () { + $promise = $this->statelessClient->browser->post( + $this->statelessClient->baseUrl, + ['Content-Type' => 'application/json', 'Accept' => 'application/json'], + json_encode([ + 'jsonrpc' => '2.0', + 'id' => 'init-header-test', + 'method' => 'initialize', + 'params' => [ + 'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, + 'clientInfo' => ['name' => 'StatelessHeaderTest', 'version' => '1.0'], + 'capabilities' => [] + ] + ]) + ); + + $response = await(timeout($promise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2)); + + expect($response->getStatusCode())->toBe(200); + expect($response->hasHeader('Mcp-Session-Id'))->toBeFalse(); + + $body = json_decode((string) $response->getBody(), true); + expect($body['id'])->toBe('init-header-test'); + expect($body)->not->toHaveKey('error'); + })->group('integration', 'streamable_http_stateless'); + + it('returns HTTP 405 for GET requests (SSE disabled) in stateless mode', function () { + try { + $getPromise = $this->statelessClient->browser->get( + $this->statelessClient->baseUrl, + ['Accept' => 'text/event-stream'] + ); + await(timeout($getPromise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2)); + $this->fail("Expected GET request to fail with 405, but it succeeded."); + } catch (ResponseException $e) { + expect($e->getResponse()->getStatusCode())->toBe(405); + $bodyContent = (string) $e->getResponse()->getBody(); + $decodedBody = json_decode($bodyContent, true); + expect($decodedBody['error']['message'])->toContain('GET requests (SSE streaming) are not supported in stateless mode'); + } + })->group('integration', 'streamable_http_stateless'); + + it('returns 204 for DELETE requests in stateless mode (but they are meaningless)', function () { + $deletePromise = $this->statelessClient->browser->delete($this->statelessClient->baseUrl); + $response = await(timeout($deletePromise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2)); + + expect($response->getStatusCode())->toBe(204); + expect((string) $response->getBody())->toBeEmpty(); + })->group('integration', 'streamable_http_stateless'); + + it('handles multiple independent tool calls in stateless mode', function () { + $toolResult1 = await($this->statelessClient->sendRequest('tools/call', [ + 'name' => 'greet_streamable_tool', + 'arguments' => ['name' => 'User 1'] + ], 'tool-multi-1')); + + $toolResult2 = await($this->statelessClient->sendRequest('tools/call', [ + 'name' => 'sum_streamable_tool', + 'arguments' => ['a' => 5, 'b' => 10] + ], 'tool-multi-2')); + + $toolResult3 = await($this->statelessClient->sendRequest('tools/call', [ + 'name' => 'greet_streamable_tool', + 'arguments' => ['name' => 'User 3'] + ], 'tool-multi-3')); + + expect($toolResult1['statusCode'])->toBe(200); + expect($toolResult1['body']['id'])->toBe('tool-multi-1'); + expect($toolResult1['body']['result']['content'][0]['text'])->toBe('Hello, User 1!'); + + expect($toolResult2['statusCode'])->toBe(200); + expect($toolResult2['body']['id'])->toBe('tool-multi-2'); + expect($toolResult2['body']['result']['content'][0]['text'])->toBe('15'); + + expect($toolResult3['statusCode'])->toBe(200); + expect($toolResult3['body']['id'])->toBe('tool-multi-3'); + expect($toolResult3['body']['result']['content'][0]['text'])->toBe('Hello, User 3!'); + })->group('integration', 'streamable_http_stateless'); +}); + it('responds to OPTIONS request with CORS headers', function () { $this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []); $this->process->start();