Skip to content

Commit 85afe24

Browse files
refactor: use Parser for JSON-RPC message parsing across transports
- Replaced custom `parseRequest` methods in `HttpServerTransport`, `StdioServerTransport`, and `StreamableHttpServerTransport` with a unified `Parser::parse` method from the `php-mcp/schema` package. - Removed redundant code for request parsing, enhancing maintainability and consistency across transport implementations. - Updated error handling to utilize the new message parsing approach, improving logging and response management.
1 parent 0c01c4a commit 85afe24

File tree

4 files changed

+28
-98
lines changed

4 files changed

+28
-98
lines changed

src/Registry.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,6 @@ public function registerCompletionProvider(string $refType, string $identifier,
346346
}
347347

348348
$this->completionProviders[$refType][$identifier][$argument] = $providerClass;
349-
$this->logger->debug("Registered completion provider for {$refType} '{$identifier}', argument '{$argument}'", ['provider' => $providerClass]);
350349
}
351350

352351
public function enableNotifications(): void

src/Transports/HttpServerTransport.php

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,14 @@
55
namespace PhpMcp\Server\Transports;
66

77
use Evenement\EventEmitterTrait;
8-
use PhpMcp\Schema\Constants;
98
use PhpMcp\Server\Contracts\IdGeneratorInterface;
109
use PhpMcp\Server\Contracts\LoggerAwareInterface;
1110
use PhpMcp\Server\Contracts\LoopAwareInterface;
1211
use PhpMcp\Server\Contracts\ServerTransportInterface;
1312
use PhpMcp\Server\Exception\TransportException;
1413
use PhpMcp\Schema\JsonRpc\Message;
15-
use PhpMcp\Schema\JsonRpc\Request;
16-
use PhpMcp\Schema\JsonRpc\Notification;
17-
use PhpMcp\Schema\JsonRpc\BatchRequest;
1814
use PhpMcp\Schema\JsonRpc\Error;
19-
use PhpMcp\Server\Exception\McpServerException;
15+
use PhpMcp\Schema\JsonRpc\Parser;
2016
use PhpMcp\Server\Support\RandomIdGenerator;
2117
use Psr\Http\Message\ServerRequestInterface;
2218
use Psr\Log\LoggerInterface;
@@ -257,7 +253,7 @@ protected function handleMessagePostRequest(ServerRequestInterface $request): Re
257253
}
258254

259255
try {
260-
$message = self::parseRequest($body);
256+
$message = Parser::parse($body);
261257
} catch (Throwable $e) {
262258
$this->logger->error('Error parsing message', ['sessionId' => $sessionId, 'exception' => $e]);
263259

@@ -271,25 +267,6 @@ protected function handleMessagePostRequest(ServerRequestInterface $request): Re
271267
return new Response(202, ['Content-Type' => 'text/plain'], 'Accepted');
272268
}
273269

274-
public static function parseRequest(string $message): Request|Notification|BatchRequest
275-
{
276-
$messageData = json_decode($message, true, 512, JSON_THROW_ON_ERROR);
277-
278-
$isBatch = array_is_list($messageData) && count($messageData) > 0 && is_array($messageData[0] ?? null);
279-
280-
if ($isBatch) {
281-
return BatchRequest::fromArray($messageData);
282-
} elseif (isset($messageData['method'])) {
283-
if (isset($messageData['id']) && $messageData['id'] !== null) {
284-
return Request::fromArray($messageData);
285-
} else {
286-
return Notification::fromArray($messageData);
287-
}
288-
}
289-
290-
throw new McpServerException('Invalid JSON-RPC message');
291-
}
292-
293270

294271
/**
295272
* Sends a raw JSON-RPC message frame to a specific client via SSE.

src/Transports/StdioServerTransport.php

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,13 @@
55
namespace PhpMcp\Server\Transports;
66

77
use Evenement\EventEmitterTrait;
8-
use PhpMcp\Schema\Constants;
9-
use PhpMcp\Schema\JsonRpc\BatchRequest;
8+
use PhpMcp\Schema\JsonRpc\Parser;
109
use PhpMcp\Server\Contracts\LoggerAwareInterface;
1110
use PhpMcp\Server\Contracts\LoopAwareInterface;
1211
use PhpMcp\Server\Contracts\ServerTransportInterface;
1312
use PhpMcp\Server\Exception\TransportException;
14-
use PhpMcp\Schema\JsonRpc\Notification;
15-
use PhpMcp\Schema\JsonRpc\Request;
1613
use PhpMcp\Schema\JsonRpc\Error;
1714
use PhpMcp\Schema\JsonRpc\Message;
18-
use PhpMcp\Server\Exception\McpServerException;
1915
use Psr\Log\LoggerInterface;
2016
use Psr\Log\NullLogger;
2117
use React\ChildProcess\Process;
@@ -181,7 +177,7 @@ private function processBuffer(): void
181177
}
182178

183179
try {
184-
$message = self::parseRequest($trimmedLine);
180+
$message = Parser::parse($trimmedLine);
185181
} catch (Throwable $e) {
186182
$this->logger->error('Error parsing message', ['exception' => $e]);
187183
$error = Error::forParseError("Invalid JSON: " . $e->getMessage());
@@ -193,25 +189,6 @@ private function processBuffer(): void
193189
}
194190
}
195191

196-
public static function parseRequest(string $message): Request|Notification|BatchRequest
197-
{
198-
$messageData = json_decode($message, true, 512, JSON_THROW_ON_ERROR);
199-
200-
$isBatch = array_is_list($messageData) && count($messageData) > 0 && is_array($messageData[0] ?? null);
201-
202-
if ($isBatch) {
203-
return BatchRequest::fromArray($messageData);
204-
} elseif (isset($messageData['method'])) {
205-
if (isset($messageData['id']) && $messageData['id'] !== null) {
206-
return Request::fromArray($messageData);
207-
} else {
208-
return Notification::fromArray($messageData);
209-
}
210-
}
211-
212-
throw new McpServerException('Invalid JSON-RPC message');
213-
}
214-
215192
/**
216193
* Sends a raw, framed message to STDOUT.
217194
*/

src/Transports/StreamableHttpServerTransport.php

Lines changed: 24 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
use PhpMcp\Schema\JsonRpc\BatchRequest;
1717
use PhpMcp\Schema\JsonRpc\BatchResponse;
1818
use PhpMcp\Schema\JsonRpc\Error;
19-
use PhpMcp\Schema\JsonRpc\Notification;
19+
use PhpMcp\Schema\JsonRpc\Parser;
2020
use PhpMcp\Schema\JsonRpc\Request;
2121
use PhpMcp\Schema\JsonRpc\Response;
2222
use PhpMcp\Server\Support\RandomIdGenerator;
@@ -56,7 +56,7 @@ class StreamableHttpServerTransport implements ServerTransportInterface, LoggerA
5656
* Keyed by a unique pendingRequestId.
5757
* @var array<string, Deferred>
5858
*/
59-
private array $pendingDirectPostResponses = [];
59+
private array $pendingRequests = [];
6060

6161
/**
6262
* Stores active SSE streams.
@@ -167,9 +167,9 @@ private function createRequestHandler(): callable
167167

168168
try {
169169
return match ($method) {
170-
'GET' => $this->handleGetRequest($request)->then($addCors, fn ($e) => $addCors($this->handleRequestError($e, $request))),
171-
'POST' => $this->handlePostRequest($request)->then($addCors, fn ($e) => $addCors($this->handleRequestError($e, $request))),
172-
'DELETE' => $this->handleDeleteRequest($request)->then($addCors, fn ($e) => $addCors($this->handleRequestError($e, $request))),
170+
'GET' => $this->handleGetRequest($request)->then($addCors, fn($e) => $addCors($this->handleRequestError($e, $request))),
171+
'POST' => $this->handlePostRequest($request)->then($addCors, fn($e) => $addCors($this->handleRequestError($e, $request))),
172+
'DELETE' => $this->handleDeleteRequest($request)->then($addCors, fn($e) => $addCors($this->handleRequestError($e, $request))),
173173
default => $addCors($this->handleUnsupportedRequest($request)),
174174
};
175175
} catch (Throwable $e) {
@@ -249,7 +249,7 @@ private function handlePostRequest(ServerRequestInterface $request): PromiseInte
249249
}
250250

251251
try {
252-
$message = self::parseRequest($body);
252+
$message = Parser::parse($body);
253253
} catch (Throwable $e) {
254254
$this->logger->error("Failed to parse MCP message from POST body", ['error' => $e->getMessage()]);
255255
$error = Error::forParseError("Invalid JSON: " . $e->getMessage());
@@ -263,7 +263,7 @@ private function handlePostRequest(ServerRequestInterface $request): PromiseInte
263263
if ($isInitializeRequest) {
264264
if ($request->hasHeader('Mcp-Session-Id')) {
265265
$this->logger->warning("Client sent Mcp-Session-Id with InitializeRequest. Ignoring.", ['clientSentId' => $request->getHeaderLine('Mcp-Session-Id')]);
266-
$error = Error::forInvalidRequest("Invalid request: Session already initialized. Mcp-Session-Id header not allowed with InitializeRequest.", $message->id);
266+
$error = Error::forInvalidRequest("Invalid request: Session already initialized. Mcp-Session-Id header not allowed with InitializeRequest.", $message->getId());
267267
$deferred->resolve(new HttpResponse(400, ['Content-Type' => 'application/json'], json_encode($error)));
268268
return $deferred->promise();
269269
}
@@ -275,7 +275,7 @@ private function handlePostRequest(ServerRequestInterface $request): PromiseInte
275275

276276
if (empty($sessionId)) {
277277
$this->logger->warning("POST request without Mcp-Session-Id.");
278-
$error = Error::forInvalidRequest("Mcp-Session-Id header required for POST requests.", $message->id);
278+
$error = Error::forInvalidRequest("Mcp-Session-Id header required for POST requests.", $message->getId());
279279
$deferred->resolve(new HttpResponse(400, ['Content-Type' => 'application/json'], json_encode($error)));
280280
return $deferred->promise();
281281
}
@@ -285,17 +285,13 @@ private function handlePostRequest(ServerRequestInterface $request): PromiseInte
285285
'is_initialize_request' => $isInitializeRequest,
286286
];
287287

288-
$hasRequests = false;
289-
$nRequests = 0;
290-
if ($message instanceof Request) {
291-
$hasRequests = true;
292-
$nRequests = 1;
293-
} elseif ($message instanceof BatchRequest) {
294-
$hasRequests = $message->hasRequests();
295-
$nRequests = count($message->getRequests());
296-
}
288+
$nRequests = match (true) {
289+
$message instanceof Request => 1,
290+
$message instanceof BatchRequest => $message->nRequests(),
291+
default => 0,
292+
};
297293

298-
if (!$hasRequests) {
294+
if ($nRequests === 0) {
299295
$deferred->resolve(new HttpResponse(202));
300296
$context['type'] = 'post_202_sent';
301297
} else {
@@ -338,19 +334,19 @@ private function handlePostRequest(ServerRequestInterface $request): PromiseInte
338334
$context['nRequests'] = $nRequests;
339335
} else {
340336
$pendingRequestId = $this->idGenerator->generateId();
341-
$this->pendingDirectPostResponses[$pendingRequestId] = $deferred;
337+
$this->pendingRequests[$pendingRequestId] = $deferred;
342338

343339
$timeoutTimer = $this->loop->addTimer(30, function () use ($pendingRequestId, $sessionId) {
344-
if (isset($this->pendingDirectPostResponses[$pendingRequestId])) {
345-
$deferred = $this->pendingDirectPostResponses[$pendingRequestId];
346-
unset($this->pendingDirectPostResponses[$pendingRequestId]);
340+
if (isset($this->pendingRequests[$pendingRequestId])) {
341+
$deferred = $this->pendingRequests[$pendingRequestId];
342+
unset($this->pendingRequests[$pendingRequestId]);
347343
$this->logger->warning("Timeout waiting for direct JSON response processing.", ['pending_request_id' => $pendingRequestId, 'session_id' => $sessionId]);
348344
$errorResponse = McpServerException::internalError("Request processing timed out.")->toJsonRpcError($pendingRequestId);
349345
$deferred->resolve(new HttpResponse(500, ['Content-Type' => 'application/json'], json_encode($errorResponse->toArray())));
350346
}
351347
});
352348

353-
$this->pendingDirectPostResponses[$pendingRequestId]->promise()->finally(function () use ($timeoutTimer) {
349+
$this->pendingRequests[$pendingRequestId]->promise()->finally(function () use ($timeoutTimer) {
354350
$this->loop->cancelTimer($timeoutTimer);
355351
});
356352

@@ -417,25 +413,6 @@ private function handleRequestError(Throwable $e, ServerRequestInterface $reques
417413
return new HttpResponse(500, ['Content-Type' => 'application/json'], json_encode($error));
418414
}
419415

420-
public static function parseRequest(string $message): Request|Notification|BatchRequest
421-
{
422-
$messageData = json_decode($message, true, 512, JSON_THROW_ON_ERROR);
423-
424-
$isBatch = array_is_list($messageData) && count($messageData) > 0 && is_array($messageData[0] ?? null);
425-
426-
if ($isBatch) {
427-
return BatchRequest::fromArray($messageData);
428-
} elseif (isset($messageData['method'])) {
429-
if (isset($messageData['id']) && $messageData['id'] !== null) {
430-
return Request::fromArray($messageData);
431-
} else {
432-
return Notification::fromArray($messageData);
433-
}
434-
}
435-
436-
throw new McpServerException('Invalid JSON-RPC message');
437-
}
438-
439416
public function sendMessage(Message $message, string $sessionId, array $context = []): PromiseInterface
440417
{
441418
if ($this->closing) {
@@ -489,13 +466,13 @@ public function sendMessage(Message $message, string $sessionId, array $context
489466

490467
case 'post_json':
491468
$pendingRequestId = $context['pending_request_id'];
492-
if (!isset($this->pendingDirectPostResponses[$pendingRequestId])) {
469+
if (!isset($this->pendingRequests[$pendingRequestId])) {
493470
$this->logger->error("Pending direct JSON request not found.", ['pending_request_id' => $pendingRequestId, 'session_id' => $sessionId]);
494471
return reject(new TransportException("Pending request {$pendingRequestId} not found."));
495472
}
496473

497-
$deferred = $this->pendingDirectPostResponses[$pendingRequestId];
498-
unset($this->pendingDirectPostResponses[$pendingRequestId]);
474+
$deferred = $this->pendingRequests[$pendingRequestId];
475+
unset($this->pendingRequests[$pendingRequestId]);
499476

500477
$responseBody = json_encode($message, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);
501478
$headers = ['Content-Type' => 'application/json'];
@@ -596,12 +573,12 @@ public function close(): void
596573
$this->getStream = null;
597574
}
598575

599-
foreach ($this->pendingDirectPostResponses as $pendingRequestId => $deferred) {
576+
foreach ($this->pendingRequests as $pendingRequestId => $deferred) {
600577
$deferred->reject(new TransportException('Transport is closing.'));
601578
}
602579

603580
$this->activeSseStreams = [];
604-
$this->pendingDirectPostResponses = [];
581+
$this->pendingRequests = [];
605582

606583
$this->emit('close', ['Transport closed.']);
607584
$this->removeAllListeners();

0 commit comments

Comments
 (0)