Skip to content

Handle closed connection in StreamTransport to avoid stuck worker #290

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/mcp-sdk/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
"require": {
"php": "^8.2",
"psr/log": "^1.0 || ^2.0 || ^3.0",
"symfony/uid": "^6.4 || ^7.0"
"symfony/uid": "^6.4 || ^7.0",
"symfony/clock": "^6.4 || ^7.0"
},
"require-dev": {
"phpstan/phpstan": "^2.1",
Expand Down
3 changes: 2 additions & 1 deletion src/mcp-sdk/examples/cli/index.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

require __DIR__.'/vendor/autoload.php';

use Symfony\Component\Clock\Clock;
use Symfony\Component\Console as SymfonyConsole;
use Symfony\Component\Console\Output\OutputInterface;

Expand All @@ -30,7 +31,7 @@
);

// Set up the server
$sever = new Symfony\AI\McpSdk\Server($jsonRpcHandler, $logger);
$sever = new Symfony\AI\McpSdk\Server($jsonRpcHandler, new Clock(), logger: $logger);

// Create the transport layer using Symfony Console
$transport = new Symfony\AI\McpSdk\Server\Transport\Stdio\SymfonyConsoleTransport($input, $output);
Expand Down
3 changes: 2 additions & 1 deletion src/mcp-sdk/phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
executionOrder="depends,defects"
beStrictAboutOutputDuringTests="true"
failOnRisky="true"
failOnWarning="true">
failOnWarning="true"
bootstrap="tests/bootstrap.php">

<testsuites>
<testsuite name="default">
Expand Down
16 changes: 16 additions & 0 deletions src/mcp-sdk/src/Exception/TransportNotConnectedException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\AI\McpSdk\Exception;

final class TransportNotConnectedException extends \LogicException implements ExceptionInterface
{
}
14 changes: 14 additions & 0 deletions src/mcp-sdk/src/Message/Error.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
public const INTERNAL_ERROR = -32603;
public const PARSE_ERROR = -32700;
public const RESOURCE_NOT_FOUND = -32002;
public const REQUEST_TIMEOUT = -32001;

public function __construct(
public string|int $id,
Expand All @@ -27,6 +28,14 @@ public function __construct(
) {
}

/**
* @param array{jsonrpc: string, id: string|int, error: array{code: int, message: string}} $data
*/
public static function from(array $data): self
{
return new self($data['id'], $data['error']['code'], $data['error']['message']);
}

public static function invalidRequest(string|int $id, string $message = 'Invalid Request'): self
{
return new self($id, self::INVALID_REQUEST, $message);
Expand All @@ -52,6 +61,11 @@ public static function parseError(string|int $id, string $message = 'Parse error
return new self($id, self::PARSE_ERROR, $message);
}

public static function requestTimeout(string|int $id, string $message = 'Request timeout'): self
{
return new self($id, self::REQUEST_TIMEOUT, $message);
}

/**
* @return array{
* jsonrpc: string,
Expand Down
11 changes: 10 additions & 1 deletion src/mcp-sdk/src/Message/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
final class Factory
{
/**
* @return iterable<Notification|Request|InvalidInputMessageException>
* @return iterable<Notification|Request|Response|Error|InvalidInputMessageException>
*
* @throws \JsonException When the input string is not valid JSON
*/
Expand All @@ -29,6 +29,15 @@ public function create(string $input): iterable
}

foreach ($data as $message) {
if (isset($message['id']) && (\array_key_exists('result', $message) || \array_key_exists('error', $message))) {
if (\array_key_exists('error', $message)) {
yield Error::from($message);
} else {
yield Response::from($message);
}
continue;
}

if (!isset($message['method'])) {
yield new InvalidInputMessageException('Invalid JSON-RPC request, missing "method".');
} elseif (str_starts_with((string) $message['method'], 'notifications/')) {
Expand Down
8 changes: 8 additions & 0 deletions src/mcp-sdk/src/Message/Response.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ public function __construct(
) {
}

/**
* @param array{jsonrpc: string, id: string|int, result: array<string, mixed>} $data
*/
public static function from(array $data): self
{
return new self($data['id'], $data['result']);
}

/**
* @return array{jsonrpc: string, id: string|int, result: array<string, mixed>}
*/
Expand Down
73 changes: 71 additions & 2 deletions src/mcp-sdk/src/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,42 @@

use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Symfony\AI\McpSdk\Exception\TransportNotConnectedException;
use Symfony\AI\McpSdk\Message\Error;
use Symfony\AI\McpSdk\Message\Request;
use Symfony\AI\McpSdk\Message\Response;
use Symfony\AI\McpSdk\Server\JsonRpcHandler;
use Symfony\AI\McpSdk\Server\KeepAliveSessionInterface;
use Symfony\AI\McpSdk\Server\PendingResponse;
use Symfony\AI\McpSdk\Server\PendingResponseBag;
use Symfony\AI\McpSdk\Server\TransportInterface;
use Symfony\Component\Clock\ClockInterface;
use Symfony\Component\Uid\Uuid;

final readonly class Server
final class Server
{
private ?TransportInterface $transport = null;

private PendingResponseBag $pendingResponses;

public function __construct(
private JsonRpcHandler $jsonRpcHandler,
private ClockInterface $clock,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about adding a default Clock like in Symfony Messenger, wdyt?

private ?KeepAliveSessionInterface $keepAliveSession = null,
private LoggerInterface $logger = new NullLogger(),
) {
$this->pendingResponses = new PendingResponseBag($clock, new \DateInterval('PT30S'));
}

public function connect(TransportInterface $transport): void
{
$this->transport = $transport;

$transport->initialize();
$this->logger->info('Transport initialized');

$this->keepAliveSession?->start();

while ($transport->isConnected()) {
foreach ($transport->receive() as $message) {
if (null === $message) {
Expand All @@ -41,6 +61,15 @@ public function connect(TransportInterface $transport): void
continue;
}

if ($response instanceof Response || $response instanceof Error) {
if ($this->pendingResponses->resolve($response)) {
continue;
}

$this->logger->warning(\sprintf('No handler found for response id "%s".', $response->id), ['response' => $response]);
continue;
}

$transport->send($response);
}
} catch (\JsonException $e) {
Expand All @@ -52,10 +81,50 @@ public function connect(TransportInterface $transport): void
}
}

usleep(1000);
$this->pendingResponses->gc(function (PendingResponse $pendingResponse, Error $error): void {
$this->logger->warning('Pending response timed out', ['pendingResponse' => $pendingResponse, 'error' => $error]);
});

$this->keepAliveSession?->tick(function (): void {
$id = (string) Uuid::v4();

$this->sendRequest(new Request($id, 'ping', []), function (Response|Error $response): void {
// Per MCP spec, ping errors should terminate the connection, but some clients
// don't handle this correctly. We may want to consider adding a strict mode with
// strict error handling.
if ($response instanceof Error) {
$this->logger->warning('KeepAlive ping returned error response', ['error' => $response]);
}
});
});

$this->clock->sleep(0.001);
}

$this->keepAliveSession?->stop();
$transport->close();
$this->logger->info('Transport closed');
}

/**
* @throws \JsonException When JSON encoding fails
*/
public function sendRequest(Request $request, ?\Closure $callback = null): void
{
if (null === $this->transport) {
throw new TransportNotConnectedException();
}

$this->logger->info('Sending request', ['request' => $request]);

if ([] === $request->params) {
$encodedRequest = json_encode($request, \JSON_THROW_ON_ERROR | \JSON_FORCE_OBJECT);
} else {
$encodedRequest = json_encode($request, \JSON_THROW_ON_ERROR);
}

$this->transport->send($encodedRequest);

$this->pendingResponses->add(new PendingResponse($request->id, $this->clock->now(), $callback));
}
}
12 changes: 8 additions & 4 deletions src/mcp-sdk/src/Server/JsonRpcHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public function __construct(
}

/**
* @return iterable<string|null>
* @return iterable<string|Response|Error|null>
*
* @throws ExceptionInterface When a handler throws an exception during message processing
* @throws \JsonException When JSON encoding of the response fails
Expand Down Expand Up @@ -82,9 +82,13 @@ public function process(string $input): iterable
$this->logger->info('Decoded incoming message', ['message' => $message]);

try {
yield $message instanceof Notification
? $this->handleNotification($message)
: $this->encodeResponse($this->handleRequest($message));
if ($message instanceof Notification) {
yield $this->handleNotification($message);
} elseif ($message instanceof Response || $message instanceof Error) {
yield $message;
} else {
yield $this->encodeResponse($this->handleRequest($message));
}
} catch (\DomainException) {
yield null;
} catch (NotFoundExceptionInterface $e) {
Expand Down
54 changes: 54 additions & 0 deletions src/mcp-sdk/src/Server/KeepAliveSession/KeepAliveSession.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\AI\McpSdk\Server\KeepAliveSession;

use Symfony\AI\McpSdk\Server\KeepAliveSessionInterface;
use Symfony\Component\Clock\ClockInterface;

final class KeepAliveSession implements KeepAliveSessionInterface
{
private bool $running = false;
private \DateTimeImmutable $nextPingAt;

public function __construct(
private ClockInterface $clock,
private \DateInterval $interval,
) {
$this->nextPingAt = $this->clock->now()->add($this->interval);
}

public function start(): void
{
$this->running = true;
$this->nextPingAt = $this->clock->now()->add($this->interval);
}

public function stop(): void
{
$this->running = false;
}

public function tick(\Closure $callback): void
{
if (!$this->running) {
return;
}

if ($this->clock->now() < $this->nextPingAt) {
return;
}

$callback();

$this->nextPingAt = $this->clock->now()->add($this->interval);
}
}
21 changes: 21 additions & 0 deletions src/mcp-sdk/src/Server/KeepAliveSessionInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\AI\McpSdk\Server;

interface KeepAliveSessionInterface
{
public function start(): void;

public function stop(): void;

public function tick(\Closure $callback): void;
}
45 changes: 45 additions & 0 deletions src/mcp-sdk/src/Server/PendingResponse.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\AI\McpSdk\Server;

use Symfony\AI\McpSdk\Message\Error;
use Symfony\AI\McpSdk\Message\Response;

final readonly class PendingResponse
{
/**
* @param \Closure(Response|Error): void $callback
*/
public function __construct(
private string|int $id,
private \DateTimeImmutable $sentAt,
private ?\Closure $callback = null,
) {
}

public function getId(): string|int
{
return $this->id;
}

public function getSentAt(): \DateTimeImmutable
{
return $this->sentAt;
}

public function resolve(Response|Error $response): void
{
if (null !== $this->callback) {
($this->callback)($response);
}
}
}
Loading