1818 */
1919final class SseTransport implements TransportInterface
2020{
21- public function __construct (private readonly string $ defaultPath , private readonly ?LoggerInterface $ logger = null ) {}
22-
23- private function getEndpoint (string $ sessionId ): string
24- {
25- return sprintf ('/%s/message?sessionId=%s ' ,
26- trim ($ this ->defaultPath , '/ ' ),
27- $ sessionId ,
28- );
29- }
30-
3121 /**
3222 * Tracks if the server-side connection is considered active.
3323 */
@@ -65,6 +55,14 @@ private function getEndpoint(string $sessionId): string
6555 */
6656 protected ?string $ clientId = null ;
6757
58+ public function __construct (
59+ private readonly string $ defaultPath ,
60+ ?SseAdapterInterface $ adapter = null ,
61+ private readonly ?LoggerInterface $ logger = null
62+ ) {
63+ $ this ->adapter = $ adapter ;
64+ }
65+
6866 /**
6967 * Starts the SSE transport connection.
7068 * Sets the connected flag and initializes the transport. Idempotent.
@@ -77,6 +75,11 @@ public function start(): void
7775 return ;
7876 }
7977
78+ set_time_limit (0 );
79+ ini_set ('output_buffering ' , 'off ' );
80+ ini_set ('zlib.output_compression ' , false );
81+ ini_set ('zlib.default_socket_timeout ' , 5 );
82+
8083 $ this ->connected = true ;
8184 $ this ->initialize ();
8285 }
@@ -104,18 +107,7 @@ public function initialize(): void
104107 */
105108 private function sendEvent (string $ event , string $ data ): void
106109 {
107- // Check if the headers have already been sent
108- if (! headers_sent ()) {
109- // Disable buffering
110- ini_set ('output_buffering ' , 'off ' );
111- ini_set ('zlib.output_compression ' , false );
112-
113- // Add required SSE headers
114- header ('Content-Type: text/event-stream ' );
115- header ('Cache-Control: no-cache ' );
116- header ('X-Accel-Buffering: no ' );
117- header ('Connection: keep-alive ' );
118- }
110+ $ this ->logger ?->debug('SSE Transport::sendEvent: event: ' .$ event .PHP_EOL .'data: ' .$ data .PHP_EOL );
119111
120112 // Just ensure output gets flushed
121113 ob_flush (); // Flushes the active buffer
@@ -264,9 +256,9 @@ protected function triggerError(string $message): void
264256 /**
265257 * Sets the adapter instance used for message persistence/retrieval.
266258 *
267- * @param SseAdapterInterface $adapter The adapter implementation.
259+ * @param SseAdapterInterface|null $adapter The adapter implementation.
268260 */
269- public function setAdapter (SseAdapterInterface $ adapter ): void
261+ public function setAdapter (? SseAdapterInterface $ adapter ): void
270262 {
271263 $ this ->adapter = $ adapter ;
272264 }
@@ -315,4 +307,12 @@ public function pushMessage(string $clientId, array $message): void
315307
316308 $ this ->adapter ->pushMessage (clientId: $ clientId , message: $ messageString );
317309 }
310+
311+ private function getEndpoint (string $ sessionId ): string
312+ {
313+ return sprintf ('/%s/message?sessionId=%s ' ,
314+ trim ($ this ->defaultPath , '/ ' ),
315+ $ sessionId ,
316+ );
317+ }
318318}
0 commit comments