1616use PhpMcp \Schema \JsonRpc \BatchRequest ;
1717use PhpMcp \Schema \JsonRpc \BatchResponse ;
1818use PhpMcp \Schema \JsonRpc \Error ;
19- use PhpMcp \Schema \JsonRpc \Notification ;
19+ use PhpMcp \Schema \JsonRpc \Parser ;
2020use PhpMcp \Schema \JsonRpc \Request ;
2121use PhpMcp \Schema \JsonRpc \Response ;
2222use PhpMcp \Server \Support \RandomIdGenerator ;
@@ -56,7 +56,7 @@ class StreamableHttpServerTransport implements ServerTransportInterface, LoggerA
5656 * Keyed by a unique pendingRequestId.
5757 * @var array<string, Deferred>
5858 */
59- private array $ pendingDirectPostResponses = [];
59+ private array $ pendingRequests = [];
6060
6161 /**
6262 * Stores active SSE streams.
@@ -167,9 +167,9 @@ private function createRequestHandler(): callable
167167
168168 try {
169169 return match ($ method ) {
170- 'GET ' => $ this ->handleGetRequest ($ request )->then ($ addCors , fn ($ e ) => $ addCors ($ this ->handleRequestError ($ e , $ request ))),
171- 'POST ' => $ this ->handlePostRequest ($ request )->then ($ addCors , fn ($ e ) => $ addCors ($ this ->handleRequestError ($ e , $ request ))),
172- 'DELETE ' => $ this ->handleDeleteRequest ($ request )->then ($ addCors , fn ($ e ) => $ addCors ($ this ->handleRequestError ($ e , $ request ))),
170+ 'GET ' => $ this ->handleGetRequest ($ request )->then ($ addCors , fn ($ e ) => $ addCors ($ this ->handleRequestError ($ e , $ request ))),
171+ 'POST ' => $ this ->handlePostRequest ($ request )->then ($ addCors , fn ($ e ) => $ addCors ($ this ->handleRequestError ($ e , $ request ))),
172+ 'DELETE ' => $ this ->handleDeleteRequest ($ request )->then ($ addCors , fn ($ e ) => $ addCors ($ this ->handleRequestError ($ e , $ request ))),
173173 default => $ addCors ($ this ->handleUnsupportedRequest ($ request )),
174174 };
175175 } catch (Throwable $ e ) {
@@ -249,7 +249,7 @@ private function handlePostRequest(ServerRequestInterface $request): PromiseInte
249249 }
250250
251251 try {
252- $ message = self :: parseRequest ($ body );
252+ $ message = Parser:: parse ($ body );
253253 } catch (Throwable $ e ) {
254254 $ this ->logger ->error ("Failed to parse MCP message from POST body " , ['error ' => $ e ->getMessage ()]);
255255 $ error = Error::forParseError ("Invalid JSON: " . $ e ->getMessage ());
@@ -263,7 +263,7 @@ private function handlePostRequest(ServerRequestInterface $request): PromiseInte
263263 if ($ isInitializeRequest ) {
264264 if ($ request ->hasHeader ('Mcp-Session-Id ' )) {
265265 $ this ->logger ->warning ("Client sent Mcp-Session-Id with InitializeRequest. Ignoring. " , ['clientSentId ' => $ request ->getHeaderLine ('Mcp-Session-Id ' )]);
266- $ error = Error::forInvalidRequest ("Invalid request: Session already initialized. Mcp-Session-Id header not allowed with InitializeRequest. " , $ message ->id );
266+ $ error = Error::forInvalidRequest ("Invalid request: Session already initialized. Mcp-Session-Id header not allowed with InitializeRequest. " , $ message ->getId () );
267267 $ deferred ->resolve (new HttpResponse (400 , ['Content-Type ' => 'application/json ' ], json_encode ($ error )));
268268 return $ deferred ->promise ();
269269 }
@@ -275,7 +275,7 @@ private function handlePostRequest(ServerRequestInterface $request): PromiseInte
275275
276276 if (empty ($ sessionId )) {
277277 $ this ->logger ->warning ("POST request without Mcp-Session-Id. " );
278- $ error = Error::forInvalidRequest ("Mcp-Session-Id header required for POST requests. " , $ message ->id );
278+ $ error = Error::forInvalidRequest ("Mcp-Session-Id header required for POST requests. " , $ message ->getId () );
279279 $ deferred ->resolve (new HttpResponse (400 , ['Content-Type ' => 'application/json ' ], json_encode ($ error )));
280280 return $ deferred ->promise ();
281281 }
@@ -285,17 +285,13 @@ private function handlePostRequest(ServerRequestInterface $request): PromiseInte
285285 'is_initialize_request ' => $ isInitializeRequest ,
286286 ];
287287
288- $ hasRequests = false ;
289- $ nRequests = 0 ;
290- if ($ message instanceof Request) {
291- $ hasRequests = true ;
292- $ nRequests = 1 ;
293- } elseif ($ message instanceof BatchRequest) {
294- $ hasRequests = $ message ->hasRequests ();
295- $ nRequests = count ($ message ->getRequests ());
296- }
288+ $ nRequests = match (true ) {
289+ $ message instanceof Request => 1 ,
290+ $ message instanceof BatchRequest => $ message ->nRequests (),
291+ default => 0 ,
292+ };
297293
298- if (! $ hasRequests ) {
294+ if ($ nRequests === 0 ) {
299295 $ deferred ->resolve (new HttpResponse (202 ));
300296 $ context ['type ' ] = 'post_202_sent ' ;
301297 } else {
@@ -338,19 +334,19 @@ private function handlePostRequest(ServerRequestInterface $request): PromiseInte
338334 $ context ['nRequests ' ] = $ nRequests ;
339335 } else {
340336 $ pendingRequestId = $ this ->idGenerator ->generateId ();
341- $ this ->pendingDirectPostResponses [$ pendingRequestId ] = $ deferred ;
337+ $ this ->pendingRequests [$ pendingRequestId ] = $ deferred ;
342338
343339 $ timeoutTimer = $ this ->loop ->addTimer (30 , function () use ($ pendingRequestId , $ sessionId ) {
344- if (isset ($ this ->pendingDirectPostResponses [$ pendingRequestId ])) {
345- $ deferred = $ this ->pendingDirectPostResponses [$ pendingRequestId ];
346- unset($ this ->pendingDirectPostResponses [$ pendingRequestId ]);
340+ if (isset ($ this ->pendingRequests [$ pendingRequestId ])) {
341+ $ deferred = $ this ->pendingRequests [$ pendingRequestId ];
342+ unset($ this ->pendingRequests [$ pendingRequestId ]);
347343 $ this ->logger ->warning ("Timeout waiting for direct JSON response processing. " , ['pending_request_id ' => $ pendingRequestId , 'session_id ' => $ sessionId ]);
348344 $ errorResponse = McpServerException::internalError ("Request processing timed out. " )->toJsonRpcError ($ pendingRequestId );
349345 $ deferred ->resolve (new HttpResponse (500 , ['Content-Type ' => 'application/json ' ], json_encode ($ errorResponse ->toArray ())));
350346 }
351347 });
352348
353- $ this ->pendingDirectPostResponses [$ pendingRequestId ]->promise ()->finally (function () use ($ timeoutTimer ) {
349+ $ this ->pendingRequests [$ pendingRequestId ]->promise ()->finally (function () use ($ timeoutTimer ) {
354350 $ this ->loop ->cancelTimer ($ timeoutTimer );
355351 });
356352
@@ -417,25 +413,6 @@ private function handleRequestError(Throwable $e, ServerRequestInterface $reques
417413 return new HttpResponse (500 , ['Content-Type ' => 'application/json ' ], json_encode ($ error ));
418414 }
419415
420- public static function parseRequest (string $ message ): Request |Notification |BatchRequest
421- {
422- $ messageData = json_decode ($ message , true , 512 , JSON_THROW_ON_ERROR );
423-
424- $ isBatch = array_is_list ($ messageData ) && count ($ messageData ) > 0 && is_array ($ messageData [0 ] ?? null );
425-
426- if ($ isBatch ) {
427- return BatchRequest::fromArray ($ messageData );
428- } elseif (isset ($ messageData ['method ' ])) {
429- if (isset ($ messageData ['id ' ]) && $ messageData ['id ' ] !== null ) {
430- return Request::fromArray ($ messageData );
431- } else {
432- return Notification::fromArray ($ messageData );
433- }
434- }
435-
436- throw new McpServerException ('Invalid JSON-RPC message ' );
437- }
438-
439416 public function sendMessage (Message $ message , string $ sessionId , array $ context = []): PromiseInterface
440417 {
441418 if ($ this ->closing ) {
@@ -489,13 +466,13 @@ public function sendMessage(Message $message, string $sessionId, array $context
489466
490467 case 'post_json ' :
491468 $ pendingRequestId = $ context ['pending_request_id ' ];
492- if (!isset ($ this ->pendingDirectPostResponses [$ pendingRequestId ])) {
469+ if (!isset ($ this ->pendingRequests [$ pendingRequestId ])) {
493470 $ this ->logger ->error ("Pending direct JSON request not found. " , ['pending_request_id ' => $ pendingRequestId , 'session_id ' => $ sessionId ]);
494471 return reject (new TransportException ("Pending request {$ pendingRequestId } not found. " ));
495472 }
496473
497- $ deferred = $ this ->pendingDirectPostResponses [$ pendingRequestId ];
498- unset($ this ->pendingDirectPostResponses [$ pendingRequestId ]);
474+ $ deferred = $ this ->pendingRequests [$ pendingRequestId ];
475+ unset($ this ->pendingRequests [$ pendingRequestId ]);
499476
500477 $ responseBody = json_encode ($ message , JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE );
501478 $ headers = ['Content-Type ' => 'application/json ' ];
@@ -596,12 +573,12 @@ public function close(): void
596573 $ this ->getStream = null ;
597574 }
598575
599- foreach ($ this ->pendingDirectPostResponses as $ pendingRequestId => $ deferred ) {
576+ foreach ($ this ->pendingRequests as $ pendingRequestId => $ deferred ) {
600577 $ deferred ->reject (new TransportException ('Transport is closing. ' ));
601578 }
602579
603580 $ this ->activeSseStreams = [];
604- $ this ->pendingDirectPostResponses = [];
581+ $ this ->pendingRequests = [];
605582
606583 $ this ->emit ('close ' , ['Transport closed. ' ]);
607584 $ this ->removeAllListeners ();
0 commit comments