Skip to content

Commit d5a43c0

Browse files
[MCP SDK][WIP] Add Server response handling + KeepAlive (ping)
1 parent 06f4cfb commit d5a43c0

File tree

14 files changed

+648
-10
lines changed

14 files changed

+648
-10
lines changed

src/mcp-sdk/composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
"require": {
1717
"php": "^8.2",
1818
"psr/log": "^1.0 || ^2.0 || ^3.0",
19-
"symfony/uid": "^6.4 || ^7.0"
19+
"symfony/uid": "^6.4 || ^7.0",
20+
"symfony/clock": "^6.4 || ^7.0"
2021
},
2122
"require-dev": {
2223
"phpstan/phpstan": "^2.1",

src/mcp-sdk/examples/cli/index.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
require __DIR__.'/vendor/autoload.php';
1313

14+
use Symfony\Component\Clock\Clock;
1415
use Symfony\Component\Console as SymfonyConsole;
1516
use Symfony\Component\Console\Output\OutputInterface;
1617

@@ -30,7 +31,7 @@
3031
);
3132

3233
// Set up the server
33-
$sever = new Symfony\AI\McpSdk\Server($jsonRpcHandler, $logger);
34+
$sever = new Symfony\AI\McpSdk\Server($jsonRpcHandler, new Clock(), logger: $logger);
3435

3536
// Create the transport layer using Symfony Console
3637
$transport = new Symfony\AI\McpSdk\Server\Transport\Stdio\SymfonyConsoleTransport($input, $output);
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\AI\McpSdk\Exception;
13+
14+
final class TransportNotConnectedException extends \LogicException implements ExceptionInterface
15+
{
16+
}

src/mcp-sdk/src/Message/Error.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
public const INTERNAL_ERROR = -32603;
2020
public const PARSE_ERROR = -32700;
2121
public const RESOURCE_NOT_FOUND = -32002;
22+
public const REQUEST_TIMEOUT = -32001;
2223

2324
public function __construct(
2425
public string|int $id,
@@ -27,6 +28,14 @@ public function __construct(
2728
) {
2829
}
2930

31+
/**
32+
* @param array{jsonrpc: string, id: string|int, error: array{code: int, message: string}} $data
33+
*/
34+
public static function from(array $data): self
35+
{
36+
return new self($data['id'], $data['error']['code'], $data['error']['message']);
37+
}
38+
3039
public static function invalidRequest(string|int $id, string $message = 'Invalid Request'): self
3140
{
3241
return new self($id, self::INVALID_REQUEST, $message);
@@ -52,6 +61,11 @@ public static function parseError(string|int $id, string $message = 'Parse error
5261
return new self($id, self::PARSE_ERROR, $message);
5362
}
5463

64+
public static function requestTimeout(string|int $id, string $message = 'Request timeout'): self
65+
{
66+
return new self($id, self::REQUEST_TIMEOUT, $message);
67+
}
68+
5569
/**
5670
* @return array{
5771
* jsonrpc: string,

src/mcp-sdk/src/Message/Factory.php

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
final class Factory
1717
{
1818
/**
19-
* @return iterable<Notification|Request|InvalidInputMessageException>
19+
* @return iterable<Notification|Request|Response|Error|InvalidInputMessageException>
2020
*
2121
* @throws \JsonException When the input string is not valid JSON
2222
*/
@@ -29,6 +29,15 @@ public function create(string $input): iterable
2929
}
3030

3131
foreach ($data as $message) {
32+
if (isset($message['id']) && (\array_key_exists('result', $message) || \array_key_exists('error', $message))) {
33+
if (\array_key_exists('error', $message)) {
34+
yield Error::from($message);
35+
} else {
36+
yield Response::from($message);
37+
}
38+
continue;
39+
}
40+
3241
if (!isset($message['method'])) {
3342
yield new InvalidInputMessageException('Invalid JSON-RPC request, missing "method".');
3443
} elseif (str_starts_with((string) $message['method'], 'notifications/')) {

src/mcp-sdk/src/Message/Response.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@ public function __construct(
2222
) {
2323
}
2424

25+
/**
26+
* @param array{jsonrpc: string, id: string|int, result: array<string, mixed>} $data
27+
*/
28+
public static function from(array $data): self
29+
{
30+
return new self($data['id'], $data['result']);
31+
}
32+
2533
/**
2634
* @return array{jsonrpc: string, id: string|int, result: array<string, mixed>}
2735
*/

src/mcp-sdk/src/Server.php

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,42 @@
1313

1414
use Psr\Log\LoggerInterface;
1515
use Psr\Log\NullLogger;
16+
use Symfony\AI\McpSdk\Exception\TransportNotConnectedException;
17+
use Symfony\AI\McpSdk\Message\Error;
18+
use Symfony\AI\McpSdk\Message\Request;
19+
use Symfony\AI\McpSdk\Message\Response;
1620
use Symfony\AI\McpSdk\Server\JsonRpcHandler;
21+
use Symfony\AI\McpSdk\Server\KeepAliveSessionInterface;
22+
use Symfony\AI\McpSdk\Server\PendingResponse;
23+
use Symfony\AI\McpSdk\Server\PendingResponseBag;
1724
use Symfony\AI\McpSdk\Server\TransportInterface;
25+
use Symfony\Component\Clock\ClockInterface;
26+
use Symfony\Component\Uid\Uuid;
1827

19-
final readonly class Server
28+
final class Server
2029
{
30+
private ?TransportInterface $transport = null;
31+
32+
private PendingResponseBag $pendingResponses;
33+
2134
public function __construct(
2235
private JsonRpcHandler $jsonRpcHandler,
36+
private ClockInterface $clock,
37+
private ?KeepAliveSessionInterface $keepAliveSession = null,
2338
private LoggerInterface $logger = new NullLogger(),
2439
) {
40+
$this->pendingResponses = new PendingResponseBag($clock, new \DateInterval('PT30S'));
2541
}
2642

2743
public function connect(TransportInterface $transport): void
2844
{
45+
$this->transport = $transport;
46+
2947
$transport->initialize();
3048
$this->logger->info('Transport initialized');
3149

50+
$this->keepAliveSession?->start();
51+
3252
while ($transport->isConnected()) {
3353
foreach ($transport->receive() as $message) {
3454
if (null === $message) {
@@ -41,6 +61,15 @@ public function connect(TransportInterface $transport): void
4161
continue;
4262
}
4363

64+
if ($response instanceof Response || $response instanceof Error) {
65+
if ($this->pendingResponses->resolve($response)) {
66+
continue;
67+
}
68+
69+
$this->logger->warning(\sprintf('No handler found for response id "%s".', $response->id), ['response' => $response]);
70+
continue;
71+
}
72+
4473
$transport->send($response);
4574
}
4675
} catch (\JsonException $e) {
@@ -52,10 +81,50 @@ public function connect(TransportInterface $transport): void
5281
}
5382
}
5483

55-
usleep(1000);
84+
$this->pendingResponses->gc(function (PendingResponse $pendingResponse, Error $error): void {
85+
$this->logger->warning('Pending response timed out', ['pendingResponse' => $pendingResponse, 'error' => $error]);
86+
});
87+
88+
$this->keepAliveSession?->tick(function (): void {
89+
$id = (string) Uuid::v4();
90+
91+
$this->sendRequest(new Request($id, 'ping', []), function (Response|Error $response): void {
92+
// Per MCP spec, ping errors should terminate the connection, but some clients
93+
// don't handle this correctly. We may want to consider adding a strict mode with
94+
// strict error handling.
95+
if ($response instanceof Error) {
96+
$this->logger->warning('KeepAlive ping returned error response', ['error' => $response]);
97+
}
98+
});
99+
});
100+
101+
$this->clock->sleep(0.001);
56102
}
57103

104+
$this->keepAliveSession?->stop();
58105
$transport->close();
59106
$this->logger->info('Transport closed');
60107
}
108+
109+
/**
110+
* @throws \JsonException When JSON encoding fails
111+
*/
112+
public function sendRequest(Request $request, ?\Closure $callback = null): void
113+
{
114+
if (null === $this->transport) {
115+
throw new TransportNotConnectedException();
116+
}
117+
118+
$this->logger->info('Sending request', ['request' => $request]);
119+
120+
if ([] === $request->params) {
121+
$encodedRequest = json_encode($request, \JSON_THROW_ON_ERROR | \JSON_FORCE_OBJECT);
122+
} else {
123+
$encodedRequest = json_encode($request, \JSON_THROW_ON_ERROR);
124+
}
125+
126+
$this->transport->send($encodedRequest);
127+
128+
$this->pendingResponses->add(new PendingResponse($request->id, $this->clock->now(), $callback));
129+
}
61130
}

src/mcp-sdk/src/Server/JsonRpcHandler.php

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public function __construct(
5353
}
5454

5555
/**
56-
* @return iterable<string|null>
56+
* @return iterable<string|Response|Error|null>
5757
*
5858
* @throws ExceptionInterface When a handler throws an exception during message processing
5959
* @throws \JsonException When JSON encoding of the response fails
@@ -82,9 +82,13 @@ public function process(string $input): iterable
8282
$this->logger->info('Decoded incoming message', ['message' => $message]);
8383

8484
try {
85-
yield $message instanceof Notification
86-
? $this->handleNotification($message)
87-
: $this->encodeResponse($this->handleRequest($message));
85+
if ($message instanceof Notification) {
86+
yield $this->handleNotification($message);
87+
} elseif ($message instanceof Response || $message instanceof Error) {
88+
yield $message;
89+
} else {
90+
yield $this->encodeResponse($this->handleRequest($message));
91+
}
8892
} catch (\DomainException) {
8993
yield null;
9094
} catch (NotFoundExceptionInterface $e) {
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\AI\McpSdk\Server\KeepAliveSession;
13+
14+
use Symfony\AI\McpSdk\Server\KeepAliveSessionInterface;
15+
use Symfony\Component\Clock\ClockInterface;
16+
17+
final class KeepAliveSession implements KeepAliveSessionInterface
18+
{
19+
private bool $running = false;
20+
private \DateTimeImmutable $nextPingAt;
21+
22+
public function __construct(
23+
private ClockInterface $clock,
24+
private \DateInterval $interval,
25+
) {
26+
$this->nextPingAt = $this->clock->now()->add($this->interval);
27+
}
28+
29+
public function start(): void
30+
{
31+
$this->running = true;
32+
$this->nextPingAt = $this->clock->now()->add($this->interval);
33+
}
34+
35+
public function stop(): void
36+
{
37+
$this->running = false;
38+
}
39+
40+
public function tick(\Closure $callback): void
41+
{
42+
if (!$this->running) {
43+
return;
44+
}
45+
46+
if ($this->clock->now() < $this->nextPingAt) {
47+
return;
48+
}
49+
50+
$callback();
51+
52+
$this->nextPingAt = $this->clock->now()->add($this->interval);
53+
}
54+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\AI\McpSdk\Server;
13+
14+
interface KeepAliveSessionInterface
15+
{
16+
public function start(): void;
17+
18+
public function stop(): void;
19+
20+
public function tick(\Closure $callback): void;
21+
}

0 commit comments

Comments
 (0)