Skip to content

Commit 3cf1838

Browse files
feat(server): Enhance message handling with session support
- Updated `TransportInterface` to use `onMessage` for handling incoming messages with session IDs. - Refactored `Server`, `Handler`, and transport classes to accommodate session management using `Uuid`. - Introduced methods for creating sessions with auto-generated and specific UUIDs in `SessionFactory` and `SessionFactoryInterface`.
1 parent 74e56a2 commit 3cf1838

File tree

7 files changed

+44
-69
lines changed

7 files changed

+44
-69
lines changed

src/JsonRpc/Handler.php

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
use Mcp\Server\MethodHandlerInterface;
2828
use Mcp\Server\NotificationHandler;
2929
use Mcp\Server\RequestHandler;
30+
use Symfony\Component\Uid\Uuid;
3031
use Psr\Log\LoggerInterface;
3132
use Psr\Log\NullLogger;
3233

@@ -87,7 +88,7 @@ public static function make(
8788
* @throws ExceptionInterface When a handler throws an exception during message processing
8889
* @throws \JsonException When JSON encoding of the response fails
8990
*/
90-
public function process(string $input): iterable
91+
public function process(string $input, ?Uuid $sessionId): iterable
9192
{
9293
$this->logger->info('Received message to process.', ['message' => $input]);
9394

@@ -117,7 +118,9 @@ public function process(string $input): iterable
117118
} catch (\DomainException) {
118119
yield null;
119120
} catch (NotFoundExceptionInterface $e) {
120-
$this->logger->warning(\sprintf('Failed to create response: %s', $e->getMessage()), ['exception' => $e],
121+
$this->logger->warning(
122+
\sprintf('Failed to create response: %s', $e->getMessage()),
123+
['exception' => $e],
121124
);
122125

123126
yield $this->encodeResponse(Error::forMethodNotFound($e->getMessage()));

src/Server.php

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Mcp\Server\TransportInterface;
1919
use Psr\Log\LoggerInterface;
2020
use Psr\Log\NullLogger;
21+
use Symfony\Component\Uid\Uuid;
2122

2223
/**
2324
* @author Christopher Hertel <[email protected]>
@@ -45,15 +46,15 @@ public function connect(TransportInterface $transport): void
4546
'transport' => $transport::class,
4647
]);
4748

48-
$transport->on('message', function (string $message) use ($transport) {
49-
$this->handleMessage($message, $transport);
49+
$transport->onMessage(function (string $message, ?Uuid $sessionId) use ($transport) {
50+
$this->handleMessage($message, $sessionId, $transport);
5051
});
5152
}
5253

53-
private function handleMessage(string $message, TransportInterface $transport): void
54+
private function handleMessage(string $message, ?Uuid $sessionId, TransportInterface $transport): void
5455
{
5556
try {
56-
foreach ($this->jsonRpcHandler->process($message) as $response) {
57+
foreach ($this->jsonRpcHandler->process($message, $sessionId) as $response) {
5758
if (null === $response) {
5859
continue;
5960
}

src/Server/Session/SessionFactory.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@
1313
*/
1414
class SessionFactory implements SessionFactoryInterface
1515
{
16-
public function create(Uuid $id, SessionStoreInterface $store): SessionInterface
16+
public function create(SessionStoreInterface $store): SessionInterface
1717
{
18-
return new Session($store, $id);
18+
return new Session($store, Uuid::v4());
1919
}
2020

21-
public function createNew(SessionStoreInterface $store): SessionInterface
21+
public function createWithId(Uuid $id, SessionStoreInterface $store): SessionInterface
2222
{
23-
return $this->create(Uuid::v4(), $store);
23+
return new Session($store, $id);
2424
}
2525
}

src/Server/Session/SessionFactoryInterface.php

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
interface SessionFactoryInterface
1616
{
1717
/**
18-
* Create a session with a specific UUID.
18+
* Creates a new session with an auto-generated UUID.
19+
* This is the standard factory method for creating sessions.
1920
*/
20-
public function create(Uuid $id, SessionStoreInterface $store): SessionInterface;
21+
public function create(SessionStoreInterface $store): SessionInterface;
2122

2223
/**
23-
* Create a new session with a generated UUID.
24+
* Creates a session with a specific UUID.
25+
* Use this when you need to reconstruct a session with a known ID.
2426
*/
25-
public function createNew(SessionStoreInterface $store): SessionInterface;
27+
public function createWithId(Uuid $id, SessionStoreInterface $store): SessionInterface;
2628
}

src/Server/Transport/StdioTransport.php

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,14 @@
1212
namespace Mcp\Server\Transport;
1313

1414
use Mcp\Server\TransportInterface;
15-
use Psr\Log\LoggerInterface;
15+
use Symfony\Component\Uid\Uuid;
16+
use Symfony\Component\Uid\UuidV4;
1617
use Psr\Log\NullLogger;
18+
use Psr\Log\LoggerInterface;
1719

18-
/**
19-
* Heavily inspired by https://jolicode.com/blog/mcp-the-open-protocol-that-turns-llm-chatbots-into-intelligent-agents.
20-
*/
2120
class StdioTransport implements TransportInterface
2221
{
23-
/** @var array<string, array<callable>> */
24-
private array $listeners = [];
22+
private $messageListener;
2523

2624
/**
2725
* @param resource $input
@@ -30,27 +28,15 @@ class StdioTransport implements TransportInterface
3028
public function __construct(
3129
private $input = \STDIN,
3230
private $output = \STDOUT,
33-
private readonly LoggerInterface $logger = new NullLogger(),
31+
private readonly Uuid $sessionId = new UuidV4(),
32+
private readonly LoggerInterface $logger = new NullLogger()
3433
) {}
3534

3635
public function initialize(): void {}
3736

38-
public function on(string $event, callable $listener): void
37+
public function onMessage(callable $listener): void
3938
{
40-
if (!isset($this->listeners[$event])) {
41-
$this->listeners[$event] = [];
42-
}
43-
$this->listeners[$event][] = $listener;
44-
}
45-
46-
public function emit(string $event, mixed ...$args): void
47-
{
48-
if (!isset($this->listeners[$event])) {
49-
return;
50-
}
51-
foreach ($this->listeners[$event] as $listener) {
52-
$listener(...$args);
53-
}
39+
$this->messageListener = $listener;
5440
}
5541

5642
public function send(string $data): void
@@ -73,7 +59,7 @@ public function listen(): mixed
7359
$trimmedLine = trim($line);
7460
if (!empty($trimmedLine)) {
7561
$this->logger->debug('Received message on StdioTransport.', ['line' => $trimmedLine]);
76-
$this->emit('message', $trimmedLine);
62+
call_user_func($this->messageListener, $trimmedLine, $this->sessionId);
7763
}
7864
}
7965

src/Server/Transport/StreamableHttpTransport.php

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@
1919
use Psr\Http\Message\StreamFactoryInterface;
2020
use Psr\Log\LoggerInterface;
2121
use Psr\Log\NullLogger;
22+
use Symfony\Component\Uid\Uuid;
2223

2324
/**
2425
* @author Kyrian Obikwelu <[email protected]>
2526
*/
2627
class StreamableHttpTransport implements TransportInterface
2728
{
28-
/** @var array<string, array<callable>> */
29-
private array $listeners = [];
29+
private $messageListener;
30+
private ?Uuid $sessionId = null;
3031

3132
/** @var string[] */
3233
private array $outgoingMessages = [];
@@ -42,27 +43,16 @@ public function __construct(
4243
private readonly ResponseFactoryInterface $responseFactory,
4344
private readonly StreamFactoryInterface $streamFactory,
4445
private readonly LoggerInterface $logger = new NullLogger()
45-
) {}
46+
) {
47+
$sessionIdString = $this->request->getHeaderLine('Mcp-Session-Id');
48+
$this->sessionId = $sessionIdString ? Uuid::fromString($sessionIdString) : null;
49+
}
4650

4751
public function initialize(): void {}
4852

49-
public function on(string $event, callable $listener): void
50-
{
51-
if (!isset($this->listeners[$event])) {
52-
$this->listeners[$event] = [];
53-
}
54-
$this->listeners[$event][] = $listener;
55-
}
56-
57-
public function emit(string $event, mixed ...$args): void
53+
public function onMessage(callable $listener): void
5854
{
59-
if (!isset($this->listeners[$event])) {
60-
return;
61-
}
62-
63-
foreach ($this->listeners[$event] as $listener) {
64-
$listener(...$args);
65-
}
55+
$this->messageListener = $listener;
6656
}
6757

6858
public function send(string $data): void
@@ -106,7 +96,7 @@ protected function handlePostRequest(): ResponseInterface
10696
return $this->createErrorResponse($error, 400);
10797
}
10898

109-
$this->emit('message', $body);
99+
call_user_func($this->messageListener, $body, $this->sessionId);
110100

111101
$hasRequestsInInput = str_contains($body, '"id"');
112102
$hasResponsesInOutput = !empty($this->outgoingMessages);

src/Server/TransportInterface.php

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
namespace Mcp\Server;
1313

14+
use Symfony\Component\Uid\Uuid;
15+
1416
/**
1517
* @author Christopher Hertel <[email protected]>
1618
* @author Kyrian Obikwelu <[email protected]>
@@ -23,20 +25,11 @@ interface TransportInterface
2325
public function initialize(): void;
2426

2527
/**
26-
* Registers an event listener for the specified event.
27-
*
28-
* @param string $event The event name to listen for
29-
* @param callable $listener The callback function to execute when the event occurs
30-
*/
31-
public function on(string $event, callable $listener): void;
32-
33-
/**
34-
* Triggers an event and executes all registered listeners.
28+
* Registers a callback that will be invoked whenever the transport receives an incoming message.
3529
*
36-
* @param string $event The event name to emit
37-
* @param mixed ...$args Variable number of arguments to pass to the listeners
30+
* @param callable(string $message, ?Uuid $sessionId): void $listener The callback function to execute when the message occurs
3831
*/
39-
public function emit(string $event, mixed ...$args): void;
32+
public function onMessage(callable $listener): void;
4033

4134
/**
4235
* Starts the transport's execution process.

0 commit comments

Comments
 (0)