diff --git a/src/mcp-sdk/composer.json b/src/mcp-sdk/composer.json index e515ae898..b911595f8 100644 --- a/src/mcp-sdk/composer.json +++ b/src/mcp-sdk/composer.json @@ -16,7 +16,8 @@ "require": { "php": "^8.2", "psr/log": "^1.0 || ^2.0 || ^3.0", - "symfony/uid": "^6.4 || ^7.0" + "symfony/uid": "^6.4 || ^7.0", + "symfony/clock": "^6.4 || ^7.0" }, "require-dev": { "phpstan/phpstan": "^2.1", diff --git a/src/mcp-sdk/examples/cli/index.php b/src/mcp-sdk/examples/cli/index.php index 740b953e9..f11d1fc6c 100644 --- a/src/mcp-sdk/examples/cli/index.php +++ b/src/mcp-sdk/examples/cli/index.php @@ -11,6 +11,7 @@ require __DIR__.'/vendor/autoload.php'; +use Symfony\Component\Clock\Clock; use Symfony\Component\Console as SymfonyConsole; use Symfony\Component\Console\Output\OutputInterface; @@ -30,7 +31,7 @@ ); // Set up the server -$sever = new Symfony\AI\McpSdk\Server($jsonRpcHandler, $logger); +$sever = new Symfony\AI\McpSdk\Server($jsonRpcHandler, new Clock(), logger: $logger); // Create the transport layer using Symfony Console $transport = new Symfony\AI\McpSdk\Server\Transport\Stdio\SymfonyConsoleTransport($input, $output); diff --git a/src/mcp-sdk/phpunit.xml.dist b/src/mcp-sdk/phpunit.xml.dist index 5354508bf..7f74b5cb3 100644 --- a/src/mcp-sdk/phpunit.xml.dist +++ b/src/mcp-sdk/phpunit.xml.dist @@ -6,7 +6,8 @@ executionOrder="depends,defects" beStrictAboutOutputDuringTests="true" failOnRisky="true" - failOnWarning="true"> + failOnWarning="true" + bootstrap="tests/bootstrap.php"> diff --git a/src/mcp-sdk/src/Exception/TransportNotConnectedException.php b/src/mcp-sdk/src/Exception/TransportNotConnectedException.php new file mode 100644 index 000000000..fa903d520 --- /dev/null +++ b/src/mcp-sdk/src/Exception/TransportNotConnectedException.php @@ -0,0 +1,16 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\AI\McpSdk\Exception; + +final class TransportNotConnectedException extends \LogicException implements ExceptionInterface +{ +} diff --git a/src/mcp-sdk/src/Message/Error.php b/src/mcp-sdk/src/Message/Error.php index df5ad11d2..e40f67ba0 100644 --- a/src/mcp-sdk/src/Message/Error.php +++ b/src/mcp-sdk/src/Message/Error.php @@ -19,6 +19,7 @@ public const INTERNAL_ERROR = -32603; public const PARSE_ERROR = -32700; public const RESOURCE_NOT_FOUND = -32002; + public const REQUEST_TIMEOUT = -32001; public function __construct( public string|int $id, @@ -27,6 +28,14 @@ public function __construct( ) { } + /** + * @param array{jsonrpc: string, id: string|int, error: array{code: int, message: string}} $data + */ + public static function from(array $data): self + { + return new self($data['id'], $data['error']['code'], $data['error']['message']); + } + public static function invalidRequest(string|int $id, string $message = 'Invalid Request'): self { return new self($id, self::INVALID_REQUEST, $message); @@ -52,6 +61,11 @@ public static function parseError(string|int $id, string $message = 'Parse error return new self($id, self::PARSE_ERROR, $message); } + public static function requestTimeout(string|int $id, string $message = 'Request timeout'): self + { + return new self($id, self::REQUEST_TIMEOUT, $message); + } + /** * @return array{ * jsonrpc: string, diff --git a/src/mcp-sdk/src/Message/Factory.php b/src/mcp-sdk/src/Message/Factory.php index 7531efa77..8decd2fe3 100644 --- a/src/mcp-sdk/src/Message/Factory.php +++ b/src/mcp-sdk/src/Message/Factory.php @@ -16,7 +16,7 @@ final class Factory { /** - * @return iterable + * @return iterable * * @throws \JsonException When the input string is not valid JSON */ @@ -29,6 +29,15 @@ public function create(string $input): iterable } foreach ($data as $message) { + if (isset($message['id']) && (\array_key_exists('result', $message) || \array_key_exists('error', $message))) { + if (\array_key_exists('error', $message)) { + yield Error::from($message); + } else { + yield Response::from($message); + } + continue; + } + if (!isset($message['method'])) { yield new InvalidInputMessageException('Invalid JSON-RPC request, missing "method".'); } elseif (str_starts_with((string) $message['method'], 'notifications/')) { diff --git a/src/mcp-sdk/src/Message/Response.php b/src/mcp-sdk/src/Message/Response.php index 2b26d9d2c..2e125013d 100644 --- a/src/mcp-sdk/src/Message/Response.php +++ b/src/mcp-sdk/src/Message/Response.php @@ -22,6 +22,14 @@ public function __construct( ) { } + /** + * @param array{jsonrpc: string, id: string|int, result: array} $data + */ + public static function from(array $data): self + { + return new self($data['id'], $data['result']); + } + /** * @return array{jsonrpc: string, id: string|int, result: array} */ diff --git a/src/mcp-sdk/src/Server.php b/src/mcp-sdk/src/Server.php index 1b5629b41..1bdc51eb8 100644 --- a/src/mcp-sdk/src/Server.php +++ b/src/mcp-sdk/src/Server.php @@ -13,22 +13,42 @@ use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; +use Symfony\AI\McpSdk\Exception\TransportNotConnectedException; +use Symfony\AI\McpSdk\Message\Error; +use Symfony\AI\McpSdk\Message\Request; +use Symfony\AI\McpSdk\Message\Response; use Symfony\AI\McpSdk\Server\JsonRpcHandler; +use Symfony\AI\McpSdk\Server\KeepAliveSessionInterface; +use Symfony\AI\McpSdk\Server\PendingResponse; +use Symfony\AI\McpSdk\Server\PendingResponseBag; use Symfony\AI\McpSdk\Server\TransportInterface; +use Symfony\Component\Clock\ClockInterface; +use Symfony\Component\Uid\Uuid; -final readonly class Server +final class Server { + private ?TransportInterface $transport = null; + + private PendingResponseBag $pendingResponses; + public function __construct( private JsonRpcHandler $jsonRpcHandler, + private ClockInterface $clock, + private ?KeepAliveSessionInterface $keepAliveSession = null, private LoggerInterface $logger = new NullLogger(), ) { + $this->pendingResponses = new PendingResponseBag($clock, new \DateInterval('PT30S')); } public function connect(TransportInterface $transport): void { + $this->transport = $transport; + $transport->initialize(); $this->logger->info('Transport initialized'); + $this->keepAliveSession?->start(); + while ($transport->isConnected()) { foreach ($transport->receive() as $message) { if (null === $message) { @@ -41,6 +61,15 @@ public function connect(TransportInterface $transport): void continue; } + if ($response instanceof Response || $response instanceof Error) { + if ($this->pendingResponses->resolve($response)) { + continue; + } + + $this->logger->warning(\sprintf('No handler found for response id "%s".', $response->id), ['response' => $response]); + continue; + } + $transport->send($response); } } catch (\JsonException $e) { @@ -52,10 +81,50 @@ public function connect(TransportInterface $transport): void } } - usleep(1000); + $this->pendingResponses->gc(function (PendingResponse $pendingResponse, Error $error): void { + $this->logger->warning('Pending response timed out', ['pendingResponse' => $pendingResponse, 'error' => $error]); + }); + + $this->keepAliveSession?->tick(function (): void { + $id = (string) Uuid::v4(); + + $this->sendRequest(new Request($id, 'ping', []), function (Response|Error $response): void { + // Per MCP spec, ping errors should terminate the connection, but some clients + // don't handle this correctly. We may want to consider adding a strict mode with + // strict error handling. + if ($response instanceof Error) { + $this->logger->warning('KeepAlive ping returned error response', ['error' => $response]); + } + }); + }); + + $this->clock->sleep(0.001); } + $this->keepAliveSession?->stop(); $transport->close(); $this->logger->info('Transport closed'); } + + /** + * @throws \JsonException When JSON encoding fails + */ + public function sendRequest(Request $request, ?\Closure $callback = null): void + { + if (null === $this->transport) { + throw new TransportNotConnectedException(); + } + + $this->logger->info('Sending request', ['request' => $request]); + + if ([] === $request->params) { + $encodedRequest = json_encode($request, \JSON_THROW_ON_ERROR | \JSON_FORCE_OBJECT); + } else { + $encodedRequest = json_encode($request, \JSON_THROW_ON_ERROR); + } + + $this->transport->send($encodedRequest); + + $this->pendingResponses->add(new PendingResponse($request->id, $this->clock->now(), $callback)); + } } diff --git a/src/mcp-sdk/src/Server/JsonRpcHandler.php b/src/mcp-sdk/src/Server/JsonRpcHandler.php index ed925fdf1..967d65e14 100644 --- a/src/mcp-sdk/src/Server/JsonRpcHandler.php +++ b/src/mcp-sdk/src/Server/JsonRpcHandler.php @@ -53,7 +53,7 @@ public function __construct( } /** - * @return iterable + * @return iterable * * @throws ExceptionInterface When a handler throws an exception during message processing * @throws \JsonException When JSON encoding of the response fails @@ -82,9 +82,13 @@ public function process(string $input): iterable $this->logger->info('Decoded incoming message', ['message' => $message]); try { - yield $message instanceof Notification - ? $this->handleNotification($message) - : $this->encodeResponse($this->handleRequest($message)); + if ($message instanceof Notification) { + yield $this->handleNotification($message); + } elseif ($message instanceof Response || $message instanceof Error) { + yield $message; + } else { + yield $this->encodeResponse($this->handleRequest($message)); + } } catch (\DomainException) { yield null; } catch (NotFoundExceptionInterface $e) { diff --git a/src/mcp-sdk/src/Server/KeepAliveSession/KeepAliveSession.php b/src/mcp-sdk/src/Server/KeepAliveSession/KeepAliveSession.php new file mode 100644 index 000000000..ed97f6b03 --- /dev/null +++ b/src/mcp-sdk/src/Server/KeepAliveSession/KeepAliveSession.php @@ -0,0 +1,54 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\AI\McpSdk\Server\KeepAliveSession; + +use Symfony\AI\McpSdk\Server\KeepAliveSessionInterface; +use Symfony\Component\Clock\ClockInterface; + +final class KeepAliveSession implements KeepAliveSessionInterface +{ + private bool $running = false; + private \DateTimeImmutable $nextPingAt; + + public function __construct( + private ClockInterface $clock, + private \DateInterval $interval, + ) { + $this->nextPingAt = $this->clock->now()->add($this->interval); + } + + public function start(): void + { + $this->running = true; + $this->nextPingAt = $this->clock->now()->add($this->interval); + } + + public function stop(): void + { + $this->running = false; + } + + public function tick(\Closure $callback): void + { + if (!$this->running) { + return; + } + + if ($this->clock->now() < $this->nextPingAt) { + return; + } + + $callback(); + + $this->nextPingAt = $this->clock->now()->add($this->interval); + } +} diff --git a/src/mcp-sdk/src/Server/KeepAliveSessionInterface.php b/src/mcp-sdk/src/Server/KeepAliveSessionInterface.php new file mode 100644 index 000000000..fb3419b05 --- /dev/null +++ b/src/mcp-sdk/src/Server/KeepAliveSessionInterface.php @@ -0,0 +1,21 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\AI\McpSdk\Server; + +interface KeepAliveSessionInterface +{ + public function start(): void; + + public function stop(): void; + + public function tick(\Closure $callback): void; +} diff --git a/src/mcp-sdk/src/Server/PendingResponse.php b/src/mcp-sdk/src/Server/PendingResponse.php new file mode 100644 index 000000000..d735c29e0 --- /dev/null +++ b/src/mcp-sdk/src/Server/PendingResponse.php @@ -0,0 +1,45 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\AI\McpSdk\Server; + +use Symfony\AI\McpSdk\Message\Error; +use Symfony\AI\McpSdk\Message\Response; + +final readonly class PendingResponse +{ + /** + * @param \Closure(Response|Error): void $callback + */ + public function __construct( + private string|int $id, + private \DateTimeImmutable $sentAt, + private ?\Closure $callback = null, + ) { + } + + public function getId(): string|int + { + return $this->id; + } + + public function getSentAt(): \DateTimeImmutable + { + return $this->sentAt; + } + + public function resolve(Response|Error $response): void + { + if (null !== $this->callback) { + ($this->callback)($response); + } + } +} diff --git a/src/mcp-sdk/src/Server/PendingResponseBag.php b/src/mcp-sdk/src/Server/PendingResponseBag.php new file mode 100644 index 000000000..78bb0a576 --- /dev/null +++ b/src/mcp-sdk/src/Server/PendingResponseBag.php @@ -0,0 +1,108 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\AI\McpSdk\Server; + +use Symfony\AI\McpSdk\Message\Error; +use Symfony\AI\McpSdk\Message\Response; +use Symfony\Component\Clock\ClockInterface; + +/** + * @implements \IteratorAggregate + */ +final class PendingResponseBag implements \Countable, \IteratorAggregate +{ + /** @var array */ + private array $responses = []; + + public function __construct( + private ClockInterface $clock, + private \DateInterval $timeout, + ) { + } + + public function add(PendingResponse $pendingResponse): void + { + $this->responses[$pendingResponse->getId()] = $pendingResponse; + } + + public function has(string|int $id): bool + { + return isset($this->responses[$id]); + } + + /** + * Resolve and remove a pending response by incoming response or error. + * + * @return bool true when a pending response was found and resolved + */ + public function resolve(Response|Error $response): bool + { + $id = $response->id; + + if (!isset($this->responses[$id])) { + return false; + } + + $this->responses[$id]->resolve($response); + unset($this->responses[$id]); + + return true; + } + + /** + * Garbage collect timed-out pending responses. + * + * @param (\Closure(PendingResponse, Error): void)|null $onTimeout Optional callback invoked per timed-out response + * + * @return int number of timed-out responses + */ + public function gc(?\Closure $onTimeout = null): int + { + $now = $this->clock->now(); + $timedOut = 0; + + foreach ($this->responses as $id => $pending) { + if ($pending->getSentAt()->add($this->timeout) < $now) { + $error = Error::requestTimeout($pending->getId()); + $pending->resolve($error); + unset($this->responses[$id]); + ++$timedOut; + + if (null !== $onTimeout) { + $onTimeout($pending, $error); + } + } + } + + return $timedOut; + } + + public function remove(string|int $id): void + { + unset($this->responses[$id]); + } + + public function clear(): void + { + $this->responses = []; + } + + public function getIterator(): \Traversable + { + return new \ArrayIterator($this->responses); + } + + public function count(): int + { + return \count($this->responses); + } +} diff --git a/src/mcp-sdk/tests/Fixtures/Sse/connection_aborted_polyfill.php b/src/mcp-sdk/tests/Fixtures/Sse/connection_aborted_polyfill.php new file mode 100644 index 000000000..6094e338d --- /dev/null +++ b/src/mcp-sdk/tests/Fixtures/Sse/connection_aborted_polyfill.php @@ -0,0 +1,21 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\AI\McpSdk\Server\Transport\Sse; + +use Symfony\AI\McpSdk\Tests\Server\Transport\Sse\StreamTransportTest; + +if (!\function_exists(__NAMESPACE__.'\\connection_aborted')) { + function connection_aborted(): int + { + return StreamTransportTest::$connectionAborted; + } +} diff --git a/src/mcp-sdk/tests/Server/KeepAliveSession/KeepAliveSessionTest.php b/src/mcp-sdk/tests/Server/KeepAliveSession/KeepAliveSessionTest.php new file mode 100644 index 000000000..c35cac45c --- /dev/null +++ b/src/mcp-sdk/tests/Server/KeepAliveSession/KeepAliveSessionTest.php @@ -0,0 +1,95 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\AI\McpSdk\Tests\Server\KeepAliveSession; + +use PHPUnit\Framework\Attributes\CoversClass; +use PHPUnit\Framework\Attributes\Small; +use PHPUnit\Framework\Attributes\TestDox; +use PHPUnit\Framework\TestCase; +use Symfony\AI\McpSdk\Server\KeepAliveSession\KeepAliveSession; +use Symfony\Component\Clock\MockClock; + +#[Small] +#[CoversClass(KeepAliveSession::class)] +class KeepAliveSessionTest extends TestCase +{ + #[TestDox('Does not call the callback before start or before interval elapsed')] + public function testNoTickBeforeStartOrInterval() + { + $session = new KeepAliveSession(new MockClock(), new \DateInterval('PT1S')); + + $called = 0; + $callback = function () use (&$called): void { ++$called; }; + + // Not started yet + $session->tick($callback); + $this->assertSame(0, $called); + + // Start but before interval + $session->start(); + $session->tick($callback); + $this->assertSame(0, $called); + } + + #[TestDox('Calls the callback after the interval has elapsed once started')] + public function testTickAfterInterval() + { + $clock = new MockClock(); + $session = new KeepAliveSession($clock, new \DateInterval('PT1S')); + + $called = 0; + $callback = function () use (&$called): void { ++$called; }; + + $session->start(); + $clock->sleep(1.001); + $session->tick($callback); + + $this->assertSame(1, $called); + } + + #[TestDox('Reschedules correctly and calls the callback on subsequent intervals')] + public function testReschedulesAndTicksMultipleTimes() + { + $clock = new MockClock(); + $session = new KeepAliveSession($clock, new \DateInterval('PT1S')); + + $called = 0; + $callback = function () use (&$called): void { ++$called; }; + + $session->start(); + + $clock->sleep(1.01); + $session->tick($callback); + + $clock->sleep(1.01); + $session->tick($callback); + + $this->assertSame(2, $called); + } + + #[TestDox('Does not call the callback after stop()')] + public function testNoTickAfterStop() + { + $clock = new MockClock(); + $session = new KeepAliveSession($clock, new \DateInterval('PT1S')); + + $called = 0; + $callback = function () use (&$called): void { ++$called; }; + + $session->start(); + $clock->sleep(1.01); + $session->stop(); + $session->tick($callback); + + $this->assertSame(0, $called); + } +} diff --git a/src/mcp-sdk/tests/Server/Transport/Sse/StreamTransportTest.php b/src/mcp-sdk/tests/Server/Transport/Sse/StreamTransportTest.php new file mode 100644 index 000000000..1c485e27d --- /dev/null +++ b/src/mcp-sdk/tests/Server/Transport/Sse/StreamTransportTest.php @@ -0,0 +1,118 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\AI\McpSdk\Tests\Server\Transport\Sse; + +use PHPUnit\Framework\Attributes\CoversClass; +use PHPUnit\Framework\Attributes\Small; +use PHPUnit\Framework\TestCase; +use Symfony\AI\McpSdk\Server\Transport\Sse\StoreInterface; +use Symfony\AI\McpSdk\Server\Transport\Sse\StreamTransport; +use Symfony\Component\Uid\Uuid; + +#[Small] +#[CoversClass(StreamTransport::class)] +class StreamTransportTest extends TestCase +{ + public static int $connectionAborted = 0; + + protected function tearDown(): void + { + parent::tearDown(); + self::$connectionAborted = 0; + } + + public function testInitializeEmitsEndpointEvent() + { + $endpoint = 'https://example.test/mcp/messages'; + $transport = new StreamTransport($endpoint, $this->createMock(StoreInterface::class), Uuid::v7()); + $actual = $this->capture(fn () => $transport->initialize()); + + $this->assertSame('event: endpoint'.\PHP_EOL.'data: '.$endpoint.\PHP_EOL.\PHP_EOL, $actual); + } + + public function testReceiveYieldsSingleMessageFromStore() + { + $id = Uuid::v7(); + $store = $this->createMock(StoreInterface::class); + $store->expects($this->once()) + ->method('pop') + ->with($id) + ->willReturn('hello'); + + $transport = new StreamTransport('x', $store, $id); + $generator = $transport->receive(); + + $this->assertInstanceOf(\Generator::class, $generator); + $this->assertSame('hello', $generator->current()); + $generator->next(); + $this->assertFalse($generator->valid()); + } + + public function testSendEmitsMessageEventAndFlushes() + { + $transport = new StreamTransport('x', $this->createMock(StoreInterface::class), Uuid::v7()); + $actual = $this->capture(fn () => $transport->send('payload')); + + $this->assertSame('event: message'.\PHP_EOL.'data: payload'.\PHP_EOL.\PHP_EOL, $actual); + } + + public function testMultipleSendsProduceMultipleResponses() + { + $transport = new StreamTransport('x', $this->createMock(StoreInterface::class), Uuid::v7()); + $first = $this->capture(fn () => $transport->send('one')); + $second = $this->capture(fn () => $transport->send('two')); + + $this->assertSame([ + 'event: message'.\PHP_EOL.'data: one'.\PHP_EOL.\PHP_EOL, + 'event: message'.\PHP_EOL.'data: two'.\PHP_EOL.\PHP_EOL, + ], [$first, $second]); + } + + public function testCloseRemovesSessionFromStore() + { + $id = Uuid::v7(); + $store = $this->createMock(StoreInterface::class); + $store->expects($this->once()) + ->method('remove') + ->with($id); + + $transport = new StreamTransport('x', $store, $id); + $transport->close(); + } + + public function testIsConnectedRespectsConnectionAbortedPolyfill() + { + $transport = new StreamTransport('x', $this->createMock(StoreInterface::class), Uuid::v7()); + + self::$connectionAborted = 0; + $this->assertTrue($transport->isConnected()); + + self::$connectionAborted = 1; + $this->assertFalse($transport->isConnected()); + } + + private function capture(callable $fn): string + { + $buffer = ''; + ob_start(function (string $chunk) use (&$buffer) { + $buffer .= $chunk; + + return ''; + }); + + $fn(); + + ob_end_flush(); + + return $buffer; + } +} diff --git a/src/mcp-sdk/tests/ServerTest.php b/src/mcp-sdk/tests/ServerTest.php index 975399e4a..802d4af89 100644 --- a/src/mcp-sdk/tests/ServerTest.php +++ b/src/mcp-sdk/tests/ServerTest.php @@ -16,9 +16,16 @@ use PHPUnit\Framework\MockObject\Stub\Exception; use PHPUnit\Framework\TestCase; use Psr\Log\NullLogger; +use Symfony\AI\McpSdk\Exception\TransportNotConnectedException; +use Symfony\AI\McpSdk\Message\Error; +use Symfony\AI\McpSdk\Message\Factory; +use Symfony\AI\McpSdk\Message\Request; +use Symfony\AI\McpSdk\Message\Response; use Symfony\AI\McpSdk\Server; use Symfony\AI\McpSdk\Server\JsonRpcHandler; +use Symfony\AI\McpSdk\Server\KeepAliveSession\KeepAliveSession; use Symfony\AI\McpSdk\Tests\Fixtures\InMemoryTransport; +use Symfony\Component\Clock\MockClock; #[Small] #[CoversClass(Server::class)] @@ -44,7 +51,193 @@ public function testJsonExceptions() ->getMock(); $transport->expects($this->once())->method('send')->with('success'); - $server = new Server($handler, $logger); + $server = new Server($handler, new MockClock(), logger: $logger); $server->connect($transport); } + + public function testSendRequest() + { + $transport = $this->getMockBuilder(InMemoryTransport::class) + ->setConstructorArgs([]) + ->onlyMethods(['send']) + ->getMock(); + $transport->expects($this->once())->method('send')->with('{"jsonrpc":"2.0","id":"1","method":"ping","params":{}}'); + + $logger = new NullLogger(); + $handler = new JsonRpcHandler(new Factory(), [], [], $logger); + $server = new Server($handler, new MockClock(), logger: $logger); + $server->connect($transport); + + $server->sendRequest(new Request('1', 'ping', [])); + } + + public function testThrowExceptionWhenTransportIsNotConnected() + { + $logger = new NullLogger(); + $handler = new JsonRpcHandler(new Factory(), [], [], $logger); + $server = new Server($handler, new MockClock(), logger: $logger); + + $this->expectException(TransportNotConnectedException::class); + $server->sendRequest(new Request('1', 'ping', [])); + } + + public function testResponseCallbackIsCalled() + { + $callbackCalled = false; + $receivedResponse = null; + + $logger = new NullLogger(); + $handler = new JsonRpcHandler(new Factory(), [], [], $logger); + $server = new Server($handler, new MockClock(), logger: $logger); + + $callback = function (Response|Error $response) use (&$callbackCalled, &$receivedResponse) { + $callbackCalled = true; + $receivedResponse = $response; + }; + + $server->connect(new InMemoryTransport()); + $server->sendRequest(new Request('1', 'ping', []), $callback); + + $server->connect(new InMemoryTransport([ + '{"jsonrpc":"2.0","id":"1","result":{}}', + ])); + + $this->assertTrue($callbackCalled); + $this->assertInstanceOf(Response::class, $receivedResponse); + } + + public function testWarningIsLoggedWhenResponseHandlerIsNotFound() + { + $logger = $this->getMockBuilder(NullLogger::class) + ->disableOriginalConstructor() + ->onlyMethods(['warning']) + ->getMock(); + $logger->expects($this->once())->method('warning')->with('No handler found for response id "1".'); + + $handler = new JsonRpcHandler(new Factory(), [], [], $logger); + $server = new Server($handler, new MockClock(), logger: $logger); + + $server->connect(new InMemoryTransport([ + '{"jsonrpc":"2.0","id":"1","result":{}}', + ])); + } + + public function testPendingResponseIsResolvedWhenResponseIsReceived() + { + $logger = $this->getMockBuilder(NullLogger::class) + ->disableOriginalConstructor() + ->onlyMethods(['warning']) + ->getMock(); + $logger->expects($this->once())->method('warning')->with('No handler found for response id "1".'); + + $handler = new JsonRpcHandler(new Factory(), [], [], $logger); + $server = new Server($handler, new MockClock(), logger: $logger); + + $callbackCalled = false; + $receivedResponse = null; + $callback = function (Response|Error $response) use (&$callbackCalled, &$receivedResponse) { + $callbackCalled = true; + $receivedResponse = $response; + }; + + $server->connect(new InMemoryTransport()); + $server->sendRequest(new Request('1', 'ping', []), $callback); + + $server->connect(new InMemoryTransport([ + '{"jsonrpc":"2.0","id":"1","result":{}}', + '{"jsonrpc":"2.0","id":"1","result":{}}', + ])); + + $this->assertTrue($callbackCalled); + $this->assertInstanceOf(Response::class, $receivedResponse); + } + + public function testPendingResponseTimesOut() + { + $logger = $this->getMockBuilder(NullLogger::class) + ->disableOriginalConstructor() + ->onlyMethods(['warning']) + ->getMock(); + $logger->expects($this->once())->method('warning')->with('Pending response timed out'); + + $clock = new MockClock(); + $handler = new JsonRpcHandler(new Factory(), [], [], $logger); + $server = new Server($handler, $clock, logger: $logger); + + $callbackCalled = false; + $receivedResponse = null; + $callback = function (Response|Error $response) use (&$callbackCalled, &$receivedResponse) { + $callbackCalled = true; + $receivedResponse = $response; + }; + + $server->connect(new InMemoryTransport([])); + $server->sendRequest(new Request('1', 'ping', []), $callback); + + $clock->sleep(30.001); + + $server->connect(new InMemoryTransport([])); + + $this->assertTrue($callbackCalled); + $this->assertInstanceOf(Error::class, $receivedResponse); + } + + public function testKeepAliveSessionSendsPing() + { + $clock = new MockClock(); + $logger = new NullLogger(); + $handler = new JsonRpcHandler(new Factory(), [], [], $logger); + $keepAlive = new KeepAliveSession($clock, new \DateInterval('PT0S')); + + $transport = $this->getMockBuilder(InMemoryTransport::class) + ->setConstructorArgs([[]]) + ->onlyMethods(['send']) + ->getMock(); + + $transport->expects($this->once())->method('send')->with($this->callback(function (string $payload): bool { + $data = json_decode($payload, true); + if (!\is_array($data)) { + return false; + } + + return ($data['method'] ?? null) === 'ping'; + })); + + $server = new Server($handler, $clock, keepAliveSession: $keepAlive, logger: $logger); + + $server->connect($transport); + } + + public function testKeepAlivePingTimesOut() + { + $clock = new MockClock(); + + $logger = $this->getMockBuilder(NullLogger::class) + ->disableOriginalConstructor() + ->onlyMethods(['warning']) + ->getMock(); + $matcher = $this->exactly(2); + $logger->expects($matcher) + ->method('warning') + ->willReturnCallback(function (string $message) use ($matcher) { + match ($matcher->numberOfInvocations()) { + 1 => $this->assertEquals('KeepAlive ping returned error response', $message), + 2 => $this->assertEquals('Pending response timed out', $message), + default => $this->fail('Unexpected number of invocations'), + }; + }); + + $handler = new JsonRpcHandler(new Factory(), [], [], $logger); + $keepAlive = new KeepAliveSession($clock, new \DateInterval('PT0S')); + + // First connect: triggers a ping send immediately + $server = new Server($handler, $clock, $keepAlive, $logger); + $server->connect(new InMemoryTransport([])); + + // Let the pending ping timeout + $clock->sleep(30.001); + + // Second connect: triggers GC which should warn about timeout + $server->connect(new InMemoryTransport([])); + } } diff --git a/src/mcp-sdk/tests/bootstrap.php b/src/mcp-sdk/tests/bootstrap.php new file mode 100644 index 000000000..dc19372c1 --- /dev/null +++ b/src/mcp-sdk/tests/bootstrap.php @@ -0,0 +1,12 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +require __DIR__.'/Fixtures/Sse/connection_aborted_polyfill.php';