Skip to content

Commit eb28fda

Browse files
refactor(server): use event-driven message handling
1 parent 7f44d22 commit eb28fda

File tree

4 files changed

+131
-48
lines changed

4 files changed

+131
-48
lines changed

src/Server.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,13 @@ public static function make(): ServerBuilder
3535
public function connect(TransportInterface $transport): void
3636
{
3737
$transport->initialize();
38+
3839
$this->logger->info('Transport initialized.', [
3940
'transport' => $transport::class,
4041
]);
4142

42-
$transport->setMessageHandler(function (string $rawMessage) use ($transport) {
43-
$this->handleMessage($rawMessage, $transport);
43+
$transport->on('message', function (string $message) use ($transport) {
44+
$this->handleMessage($message, $transport);
4445
});
4546
}
4647

src/Server/Transport/StdioTransport.php

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
*/
2121
class StdioTransport implements TransportInterface
2222
{
23-
private string $buffer = '';
24-
private $messageHandler = null;
23+
/** @var array<string, array<callable>> */
24+
private array $listeners = [];
2525

2626
/**
2727
* @param resource $input
@@ -35,9 +35,22 @@ public function __construct(
3535

3636
public function initialize(): void {}
3737

38-
public function setMessageHandler(callable $handler): void
38+
public function on(string $event, callable $listener): void
3939
{
40-
$this->messageHandler = $handler;
40+
if (!isset($this->listeners[$event])) {
41+
$this->listeners[$event] = [];
42+
}
43+
$this->listeners[$event][] = $listener;
44+
}
45+
46+
public function emit(string $event, mixed ...$args): void
47+
{
48+
if (!isset($this->listeners[$event])) {
49+
return;
50+
}
51+
foreach ($this->listeners[$event] as $listener) {
52+
$listener(...$args);
53+
}
4154
}
4255

4356
public function send(string $data): void
@@ -49,10 +62,6 @@ public function send(string $data): void
4962

5063
public function listen(): mixed
5164
{
52-
if ($this->messageHandler === null) {
53-
throw new \LogicException('Cannot listen without a message handler. Did you forget to call Server::connect()?');
54-
}
55-
5665
$this->logger->info('StdioTransport is listening for messages on STDIN...');
5766

5867
while (!feof($this->input)) {
@@ -64,7 +73,7 @@ public function listen(): mixed
6473
$trimmedLine = trim($line);
6574
if (!empty($trimmedLine)) {
6675
$this->logger->debug('Received message on StdioTransport.', ['line' => $trimmedLine]);
67-
call_user_func($this->messageHandler, $trimmedLine);
76+
$this->emit('message', $trimmedLine);
6877
}
6978
}
7079

src/Server/Transport/StreamableHttpTransport.php

Lines changed: 93 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,17 @@
2525
*/
2626
class StreamableHttpTransport implements TransportInterface
2727
{
28-
private $messageHandler = null;
29-
private $outgoingMessages = [];
28+
/** @var array<string, array<callable>> */
29+
private array $listeners = [];
30+
31+
/** @var string[] */
32+
private array $outgoingMessages = [];
33+
34+
private array $corsHeaders = [
35+
'Access-Control-Allow-Origin' => '*',
36+
'Access-Control-Allow-Methods' => 'GET, POST, DELETE, OPTIONS',
37+
'Access-Control-Allow-Headers' => 'Content-Type, Mcp-Session-Id, Last-Event-ID, Authorization, Accept',
38+
];
3039

3140
public function __construct(
3241
private readonly ServerRequestInterface $request,
@@ -37,9 +46,23 @@ public function __construct(
3746

3847
public function initialize(): void {}
3948

40-
public function setMessageHandler(callable $handler): void
49+
public function on(string $event, callable $listener): void
50+
{
51+
if (!isset($this->listeners[$event])) {
52+
$this->listeners[$event] = [];
53+
}
54+
$this->listeners[$event][] = $listener;
55+
}
56+
57+
public function emit(string $event, mixed ...$args): void
4158
{
42-
$this->messageHandler = $handler;
59+
if (!isset($this->listeners[$event])) {
60+
return;
61+
}
62+
63+
foreach ($this->listeners[$event] as $listener) {
64+
$listener(...$args);
65+
}
4366
}
4467

4568
public function send(string $data): void
@@ -49,58 +72,95 @@ public function send(string $data): void
4972

5073
public function listen(): mixed
5174
{
52-
if ($this->messageHandler === null) {
53-
$this->logger->error('Cannot listen without a message handler. Did you forget to call Server::connect()?');
54-
return $this->createErrorResponse(Error::forInternalError('Internal Server Error: Transport not configured.'), 500);
55-
}
5675

57-
switch ($this->request->getMethod()) {
58-
case 'POST':
59-
$body = $this->request->getBody()->getContents();
60-
if (empty($body)) {
61-
return $this->createErrorResponse(Error::forInvalidRequest('Bad Request: Empty request body.'), 400);
62-
}
76+
return match ($this->request->getMethod()) {
77+
'OPTIONS' => $this->handleOptionsRequest(),
78+
'GET' => $this->handleGetRequest(),
79+
'POST' => $this->handlePostRequest(),
80+
'DELETE' => $this->handleDeleteRequest(),
81+
default => $this->handleUnsupportedRequest(),
82+
};
83+
}
6384

64-
call_user_func($this->messageHandler, $body);
65-
break;
85+
protected function handleOptionsRequest(): ResponseInterface
86+
{
87+
return $this->withCorsHeaders($this->responseFactory->createResponse(204));
88+
}
6689

67-
case 'GET':
68-
case 'DELETE':
69-
return $this->createErrorResponse(Error::forInvalidRequest('Method Not Allowed'), 405);
90+
protected function handlePostRequest(): ResponseInterface
91+
{
92+
$acceptHeader = $this->request->getHeaderLine('Accept');
93+
if (!str_contains($acceptHeader, 'application/json') || !str_contains($acceptHeader, 'text/event-stream')) {
94+
$error = Error::forInvalidRequest('Not Acceptable: Client must accept both application/json and text/event-stream.');
95+
return $this->createErrorResponse($error, 406);
96+
}
7097

71-
default:
72-
return $this->createErrorResponse(Error::forInvalidRequest('Method Not Allowed'), 405)
73-
->withHeader('Allow', 'POST');
98+
if (!str_contains($this->request->getHeaderLine('Content-Type'), 'application/json')) {
99+
$error = Error::forInvalidRequest('Unsupported Media Type: Content-Type must be application/json.');
100+
return $this->createErrorResponse($error, 415);
74101
}
75102

76-
return $this->buildResponse();
77-
}
103+
$body = $this->request->getBody()->getContents();
104+
if (empty($body)) {
105+
$error = Error::forInvalidRequest('Bad Request: Empty request body.');
106+
return $this->createErrorResponse($error, 400);
107+
}
78108

79-
public function close(): void {}
109+
$this->emit('message', $body);
80110

81-
private function buildResponse(): ResponseInterface
82-
{
83-
$hasRequestsInInput = !empty($this->request->getBody()->getContents());
111+
$hasRequestsInInput = str_contains($body, '"id"');
84112
$hasResponsesInOutput = !empty($this->outgoingMessages);
85113

86114
if ($hasRequestsInInput && !$hasResponsesInOutput) {
87-
return $this->responseFactory->createResponse(202);
115+
return $this->withCorsHeaders($this->responseFactory->createResponse(202));
88116
}
89117

90118
$responseBody = count($this->outgoingMessages) === 1
91119
? $this->outgoingMessages[0]
92120
: '[' . implode(',', $this->outgoingMessages) . ']';
93121

94-
return $this->responseFactory->createResponse(200)
122+
$response = $this->responseFactory->createResponse(200)
95123
->withHeader('Content-Type', 'application/json')
96124
->withBody($this->streamFactory->createStream($responseBody));
125+
126+
return $this->withCorsHeaders($response);
127+
}
128+
129+
protected function handleGetRequest(): ResponseInterface
130+
{
131+
$response = $this->createErrorResponse(Error::forInvalidRequest('Not Yet Implemented'), 501);
132+
return $this->withCorsHeaders($response);
133+
}
134+
135+
protected function handleDeleteRequest(): ResponseInterface
136+
{
137+
$response = $this->createErrorResponse(Error::forInvalidRequest('Not Yet Implemented'), 501);
138+
return $this->withCorsHeaders($response);
139+
}
140+
141+
protected function handleUnsupportedRequest(): ResponseInterface
142+
{
143+
$response = $this->createErrorResponse(Error::forInvalidRequest('Method Not Allowed'), 405);
144+
return $this->withCorsHeaders($response);
97145
}
98146

99-
private function createErrorResponse(Error $jsonRpcErrpr, int $statusCode): ResponseInterface
147+
protected function withCorsHeaders(ResponseInterface $response): ResponseInterface
100148
{
101-
$errorPayload = json_encode($jsonRpcErrpr, \JSON_THROW_ON_ERROR);
149+
foreach ($this->corsHeaders as $name => $value) {
150+
$response = $response->withHeader($name, $value);
151+
}
152+
153+
return $response;
154+
}
155+
156+
protected function createErrorResponse(Error $jsonRpcError, int $statusCode): ResponseInterface
157+
{
158+
$errorPayload = json_encode($jsonRpcError, \JSON_THROW_ON_ERROR);
159+
102160
return $this->responseFactory->createResponse($statusCode)
103161
->withHeader('Content-Type', 'application/json')
104-
->withBody($this->streamFactory->createStream(json_encode($errorPayload)));
162+
->withBody($this->streamFactory->createStream($errorPayload));
105163
}
164+
165+
public function close(): void {}
106166
}

src/Server/TransportInterface.php

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
/**
1515
* @author Christopher Hertel <[email protected]>
16+
* @author Kyrian Obikwelu <[email protected]>
1617
*/
1718
interface TransportInterface
1819
{
@@ -22,13 +23,20 @@ interface TransportInterface
2223
public function initialize(): void;
2324

2425
/**
25-
* Registers the callback that the Server will use to process incoming messages.
26-
* The transport must call this handler whenever a raw JSON-RPC message string is received.
26+
* Registers an event listener for the specified event.
2727
*
28-
* @param callable(string): void $handler The message processing callback.
28+
* @param string $event The event name to listen for
29+
* @param callable $listener The callback function to execute when the event occurs
2930
*/
30-
public function setMessageHandler(callable $handler): void;
31+
public function on(string $event, callable $listener): void;
3132

33+
/**
34+
* Triggers an event and executes all registered listeners.
35+
*
36+
* @param string $event The event name to emit
37+
* @param mixed ...$args Variable number of arguments to pass to the listeners
38+
*/
39+
public function emit(string $event, mixed ...$args): void;
3240

3341
/**
3442
* Starts the transport's execution process.
@@ -43,11 +51,16 @@ public function listen(): mixed;
4351

4452
/**
4553
* Sends a raw JSON-RPC message string back to the client.
54+
*
55+
* @param string $data The JSON-RPC message string to send
4656
*/
4757
public function send(string $data): void;
4858

4959
/**
5060
* Closes the transport and cleans up any resources.
61+
*
62+
* This method should be called when the transport is no longer needed.
63+
* It should clean up any resources and close any connections.
5164
*/
5265
public function close(): void;
5366
}

0 commit comments

Comments
 (0)