1717final class RedisAdapter implements SseAdapterInterface
1818{
1919 private const DEFAULT_PREFIX = 'mcp_sse_ ' ;
20+
2021 private const DEFAULT_MESSAGE_TTL = 100 ;
22+
2123 private const DEFAULT_REDIS_PORT = 6379 ;
24+
2225 private const FAILED_TO_INITIALIZE = 'Failed to initialize Redis SSE Adapter: ' ;
2326
2427 /**
@@ -39,9 +42,9 @@ final class RedisAdapter implements SseAdapterInterface
3942 /**
4043 * Constructor method to initialize the class with configuration, logger, and Redis instance
4144 *
42- * @param array $config Configuration array containing details such as 'prefix', 'ttl', and 'connection'
43- * @param LoggerInterface|null $logger Logger instance for error and debugging logs
44- * @param Redis|null $redis Optional Redis instance used during tests
45+ * @param array $config Configuration array containing details such as 'prefix', 'ttl', and 'connection'
46+ * @param LoggerInterface|null $logger Logger instance for error and debugging logs
47+ * @param Redis|null $redis Optional Redis instance used during tests
4548 * @return void
4649 *
4750 * @throws SseAdapterException If the Redis connection fails to initialize
@@ -52,14 +55,14 @@ public function __construct(
5255 ?Redis $ redis = null // Allow Redis to be injected
5356 ) {
5457 $ this ->keyPrefix = $ this ->config ['prefix ' ] ?? self ::DEFAULT_PREFIX ;
55- $ this ->messageTtl = (int )($ this ->config ['ttl ' ] ?? self ::DEFAULT_MESSAGE_TTL );
58+ $ this ->messageTtl = (int ) ($ this ->config ['ttl ' ] ?? self ::DEFAULT_MESSAGE_TTL );
5659
5760 try {
5861 $ this ->redis = $ redis ?? new Redis ;
5962 $ this ->redis ->connect ($ this ->config ['connection ' ], self ::DEFAULT_REDIS_PORT );
6063 $ this ->redis ->setOption (Redis::OPT_PREFIX , $ this ->keyPrefix );
6164 } catch (Exception $ e ) {
62- $ this ->logAndThrow (self ::FAILED_TO_INITIALIZE . $ e ->getMessage (), $ e );
65+ $ this ->logAndThrow (self ::FAILED_TO_INITIALIZE . $ e ->getMessage (), $ e );
6366 }
6467 }
6568
@@ -78,7 +81,7 @@ public function pushMessage(string $clientId, string $message): void
7881 $ this ->redis ->rpush ($ key , $ message );
7982 $ this ->setKeyExpiration ($ key );
8083 } catch (Exception $ e ) {
81- $ this ->logAndThrow ('Failed to add message to Redis queue: ' . $ e ->getMessage (), $ e );
84+ $ this ->logAndThrow ('Failed to add message to Redis queue: ' . $ e ->getMessage (), $ e );
8285 }
8386 }
8487
@@ -92,7 +95,7 @@ public function pushMessage(string $clientId, string $message): void
9295 public function removeAllMessages (string $ clientId ): void
9396 {
9497 $ this ->executeRedisCommand (
95- fn () => $ this ->redis ->del ($ this ->generateQueueKey ($ clientId )),
98+ fn () => $ this ->redis ->del ($ this ->generateQueueKey ($ clientId )),
9699 'Failed to remove messages from Redis queue '
97100 );
98101 }
@@ -136,7 +139,7 @@ public function popMessage(string $clientId): ?string
136139
137140 return $ message === false ? null : $ message ;
138141 } catch (Exception $ e ) {
139- $ this ->logAndThrow ('Failed to pop message from Redis queue: ' . $ e ->getMessage (), $ e );
142+ $ this ->logAndThrow ('Failed to pop message from Redis queue: ' . $ e ->getMessage (), $ e );
140143 }
141144 }
142145
@@ -149,6 +152,7 @@ public function popMessage(string $clientId): ?string
149152 public function hasMessages (string $ clientId ): bool
150153 {
151154 $ key = $ this ->generateQueueKey ($ clientId );
155+
152156 return $ this ->getRedisKeyLength ($ key ) > 0 ;
153157 }
154158
@@ -161,6 +165,7 @@ public function hasMessages(string $clientId): bool
161165 public function getMessageCount (string $ clientId ): int
162166 {
163167 $ key = $ this ->generateQueueKey ($ clientId );
168+
164169 return $ this ->getRedisKeyLength ($ key );
165170 }
166171
@@ -176,7 +181,7 @@ public function storeLastPongResponseTimestamp(string $clientId, ?int $timestamp
176181 {
177182 $ this ->executeRedisCommand (
178183 function () use ($ clientId , $ timestamp ) {
179- $ key = $ this ->generateQueueKey ($ clientId ) . ':last_pong ' ;
184+ $ key = $ this ->generateQueueKey ($ clientId ). ':last_pong ' ;
180185 $ this ->redis ->set ($ key , $ timestamp ?? time ());
181186 $ this ->setKeyExpiration ($ key );
182187 },
@@ -195,11 +200,12 @@ function () use ($clientId, $timestamp) {
195200 public function getLastPongResponseTimestamp (string $ clientId ): ?int
196201 {
197202 try {
198- $ key = $ this ->generateQueueKey ($ clientId ) . ':last_pong ' ;
203+ $ key = $ this ->generateQueueKey ($ clientId ). ':last_pong ' ;
199204 $ timestamp = $ this ->redis ->get ($ key );
200- return $ timestamp === false ? null : (int )$ timestamp ;
205+
206+ return $ timestamp === false ? null : (int ) $ timestamp ;
201207 } catch (Exception $ e ) {
202- $ this ->logAndThrow ('Failed to get last pong timestamp: ' . $ e ->getMessage (), $ e );
208+ $ this ->logAndThrow ('Failed to get last pong timestamp: ' . $ e ->getMessage (), $ e );
203209 }
204210 }
205211
@@ -217,8 +223,7 @@ private function generateQueueKey(string $clientId): string
217223 /**
218224 * Set the expiration time for the specified key in Redis
219225 *
220- * @param string $key The key for which the expiration time will be set
221- * @return void
226+ * @param string $key The key for which the expiration time will be set
222227 */
223228 private function setKeyExpiration (string $ key ): void
224229 {
@@ -228,25 +233,25 @@ private function setKeyExpiration(string $key): void
228233 /**
229234 * Retrieve the length of a Redis list by its key
230235 *
231- * @param string $key The key of the Redis list
236+ * @param string $key The key of the Redis list
232237 * @return int The length of the Redis list, or 0 if an error occurs
233238 */
234239 private function getRedisKeyLength (string $ key ): int
235240 {
236241 try {
237- return (int )$ this ->redis ->llen ($ key );
242+ return (int ) $ this ->redis ->llen ($ key );
238243 } catch (Exception $ e ) {
239- $ this ->logger ?->error('Failed to get message count: ' . $ e ->getMessage ());
244+ $ this ->logger ?->error('Failed to get message count: ' .$ e ->getMessage ());
245+
240246 return 0 ;
241247 }
242248 }
243249
244250 /**
245251 * Executes a Redis command and handles potential exceptions.
246252 *
247- * @param callable $command The Redis command to be executed
248- * @param string $errorMessage The error message to log and throw in case of failure
249- * @return void
253+ * @param callable $command The Redis command to be executed
254+ * @param string $errorMessage The error message to log and throw in case of failure
250255 *
251256 * @throws SseAdapterException
252257 */
@@ -255,15 +260,15 @@ private function executeRedisCommand(callable $command, string $errorMessage): v
255260 try {
256261 $ command ();
257262 } catch (Exception $ e ) {
258- $ this ->logAndThrow ($ errorMessage . ': ' . $ e ->getMessage (), $ e );
263+ $ this ->logAndThrow ($ errorMessage. ': ' . $ e ->getMessage (), $ e );
259264 }
260265 }
261266
262267 /**
263268 * Logs an error message and throws an exception
264269 *
265- * @param string $message The error message to log
266- * @param Exception $e The original exception to be wrapped and thrown
270+ * @param string $message The error message to log
271+ * @param Exception $e The original exception to be wrapped and thrown
267272 * @return never This method does not return as it always throws an exception
268273 *
269274 * @throws SseAdapterException
0 commit comments