1616 */
1717final class RedisAdapter implements SseAdapterInterface
1818{
19- const FAILED_TO_INITIALIZE = 'Failed to initialize Redis SSE Adapter: ' ;
19+ private const DEFAULT_PREFIX = 'mcp_sse_ ' ;
20+ private const DEFAULT_MESSAGE_TTL = 100 ;
21+ private const DEFAULT_REDIS_PORT = 6379 ;
22+ private const FAILED_TO_INITIALIZE = 'Failed to initialize Redis SSE Adapter: ' ;
2023
2124 /**
2225 * Redis connection instance
2326 */
24- protected Redis $ redis ;
27+ private Redis $ redis ;
2528
2629 /**
2730 * Redis key prefix for SSE messages
2831 */
29- protected string $ keyPrefix = ' mcp_sse_ ' ;
32+ private string $ keyPrefix ;
3033
3134 /**
3235 * Message expiration time in seconds
3336 */
34- protected int $ messageTtl = 100 ;
37+ private int $ messageTtl ;
3538
39+ /**
40+ * Constructor method to initialize the class with configuration, logger, and Redis instance
41+ *
42+ * @param array $config Configuration array containing details such as 'prefix', 'ttl', and 'connection'
43+ * @param LoggerInterface|null $logger Logger instance for error and debugging logs
44+ * @param Redis|null $redis Optional Redis instance used during tests
45+ * @return void
46+ *
47+ * @throws SseAdapterException If the Redis connection fails to initialize
48+ */
3649 public function __construct (
3750 private readonly array $ config ,
38- private readonly ?LoggerInterface $ logger
51+ private readonly ?LoggerInterface $ logger ,
52+ ?Redis $ redis = null // Allow Redis to be injected
3953 ) {
54+ $ this ->keyPrefix = $ this ->config ['prefix ' ] ?? self ::DEFAULT_PREFIX ;
55+ $ this ->messageTtl = (int )($ this ->config ['ttl ' ] ?? self ::DEFAULT_MESSAGE_TTL );
56+
4057 try {
41- $ this ->keyPrefix = $ this ->config ['prefix ' ] ?? 'mcp_sse_ ' ;
42- $ this ->messageTtl = (int ) $ this ->config ['ttl ' ] ?: 100 ;
43- $ this ->redis = new Redis ;
44- $ this ->redis ->connect ($ this ->config ['connection ' ], 6379 );
58+ $ this ->redis = $ redis ?? new Redis ;
59+ $ this ->redis ->connect ($ this ->config ['connection ' ], self ::DEFAULT_REDIS_PORT );
4560 $ this ->redis ->setOption (Redis::OPT_PREFIX , $ this ->keyPrefix );
4661 } catch (Exception $ e ) {
47- $ this ->logger ?->error(self ::FAILED_TO_INITIALIZE .$ e ->getMessage ());
48- throw new SseAdapterException (self ::FAILED_TO_INITIALIZE .$ e ->getMessage ());
62+ $ this ->logAndThrow (self ::FAILED_TO_INITIALIZE . $ e ->getMessage (), $ e );
4963 }
5064 }
5165
@@ -60,29 +74,14 @@ public function __construct(
6074 public function pushMessage (string $ clientId , string $ message ): void
6175 {
6276 try {
63- $ key = $ this ->getQueueKey ($ clientId );
64-
77+ $ key = $ this ->generateQueueKey ($ clientId );
6578 $ this ->redis ->rpush ($ key , $ message );
66-
67- $ this ->redis ->expire ($ key , $ this ->messageTtl );
68-
79+ $ this ->setKeyExpiration ($ key );
6980 } catch (Exception $ e ) {
70- $ this ->logger ?->error('Failed to add message to Redis queue: ' .$ e ->getMessage ());
71- throw new SseAdapterException ('Failed to add message to Redis queue: ' .$ e ->getMessage ());
81+ $ this ->logAndThrow ('Failed to add message to Redis queue: ' . $ e ->getMessage (), $ e );
7282 }
7383 }
7484
75- /**
76- * Get the Redis key for a client's message queue
77- *
78- * @param string $clientId The client ID
79- * @return string The Redis key
80- */
81- protected function getQueueKey (string $ clientId ): string
82- {
83- return "{$ this ->keyPrefix }:client: {$ clientId }" ;
84- }
85-
8685 /**
8786 * Remove all messages for a specific client
8887 *
@@ -92,38 +91,32 @@ protected function getQueueKey(string $clientId): string
9291 */
9392 public function removeAllMessages (string $ clientId ): void
9493 {
95- try {
96- $ key = $ this ->getQueueKey ($ clientId );
97-
98- $ this ->redis ->del ($ key );
99-
100- } catch (Exception $ e ) {
101- $ this ->logger ?->error('Failed to remove messages from Redis queue: ' .$ e ->getMessage ());
102- throw new SseAdapterException ('Failed to remove messages from Redis queue: ' .$ e ->getMessage ());
103- }
94+ $ this ->executeRedisCommand (
95+ fn () => $ this ->redis ->del ($ this ->generateQueueKey ($ clientId )),
96+ 'Failed to remove messages from Redis queue '
97+ );
10498 }
10599
106100 /**
107101 * Receive and remove all messages for a specific client
108102 *
109103 * @param string $clientId The unique identifier for the client
110- * @return array<string> Array of messages
104+ * @return array<string|array > Array of messages
111105 *
112106 * @throws SseAdapterException If the messages cannot be retrieved
113107 */
114108 public function receiveMessages (string $ clientId ): array
115109 {
116110 try {
117- $ key = $ this ->getQueueKey ($ clientId );
111+ $ key = $ this ->generateQueueKey ($ clientId );
118112 $ messages = [];
119-
120113 while (($ message = $ this ->redis ->lpop ($ key )) !== null && $ message !== false ) {
121114 $ messages [] = $ message ;
122115 }
123116
124117 return $ messages ;
125118 } catch (Exception $ e ) {
126- throw new SseAdapterException ('Failed to receive messages from Redis queue: ' .$ e ->getMessage ());
119+ $ this -> logAndThrow ('Failed to receive messages from Redis queue: ' .$ e ->getMessage (), $ e );
127120 }
128121 }
129122
@@ -138,18 +131,12 @@ public function receiveMessages(string $clientId): array
138131 public function popMessage (string $ clientId ): ?string
139132 {
140133 try {
141- $ key = $ this ->getQueueKey ($ clientId );
142-
134+ $ key = $ this ->generateQueueKey ($ clientId );
143135 $ message = $ this ->redis ->lpop ($ key );
144136
145- if ($ message === null || $ message === false ) {
146- return null ;
147- }
148-
149- return $ message ;
137+ return $ message === false ? null : $ message ;
150138 } catch (Exception $ e ) {
151- $ this ->logger ?->error('Failed to pop message from Redis queue: ' .$ e ->getMessage ());
152- throw new SseAdapterException ('Failed to pop message from Redis queue: ' .$ e ->getMessage ());
139+ $ this ->logAndThrow ('Failed to pop message from Redis queue: ' . $ e ->getMessage (), $ e );
153140 }
154141 }
155142
@@ -161,17 +148,8 @@ public function popMessage(string $clientId): ?string
161148 */
162149 public function hasMessages (string $ clientId ): bool
163150 {
164- try {
165- $ key = $ this ->getQueueKey ($ clientId );
166-
167- $ count = $ this ->redis ->llen ($ key );
168-
169- return $ count > 0 ;
170- } catch (Exception $ e ) {
171- $ this ->logger ?->error('Failed to check for messages in Redis queue: ' .$ e ->getMessage ());
172-
173- return false ;
174- }
151+ $ key = $ this ->generateQueueKey ($ clientId );
152+ return $ this ->getRedisKeyLength ($ key ) > 0 ;
175153 }
176154
177155 /**
@@ -182,17 +160,8 @@ public function hasMessages(string $clientId): bool
182160 */
183161 public function getMessageCount (string $ clientId ): int
184162 {
185- try {
186- $ key = $ this ->getQueueKey ($ clientId );
187-
188- $ count = $ this ->redis ->llen ($ key );
189-
190- return (int ) $ count ;
191- } catch (Exception $ e ) {
192- $ this ->logger ?->error('Failed to get message count from Redis queue: ' .$ e ->getMessage ());
193-
194- return 0 ;
195- }
163+ $ key = $ this ->generateQueueKey ($ clientId );
164+ return $ this ->getRedisKeyLength ($ key );
196165 }
197166
198167 /**
@@ -205,17 +174,14 @@ public function getMessageCount(string $clientId): int
205174 */
206175 public function storeLastPongResponseTimestamp (string $ clientId , ?int $ timestamp = null ): void
207176 {
208- try {
209- $ key = $ this ->getQueueKey ($ clientId ).':last_pong ' ;
210- $ timestamp = $ timestamp ?? time ();
211-
212- $ this ->redis ->set ($ key , $ timestamp );
213- $ this ->redis ->expire ($ key , $ this ->messageTtl );
214-
215- } catch (Exception $ e ) {
216- $ this ->logger ?->error('Failed to store last pong timestamp: ' .$ e ->getMessage ());
217- throw new SseAdapterException ('Failed to store last pong timestamp: ' .$ e ->getMessage ());
218- }
177+ $ this ->executeRedisCommand (
178+ function () use ($ clientId , $ timestamp ) {
179+ $ key = $ this ->generateQueueKey ($ clientId ) . ':last_pong ' ;
180+ $ this ->redis ->set ($ key , $ timestamp ?? time ());
181+ $ this ->setKeyExpiration ($ key );
182+ },
183+ 'Failed to store last pong timestamp '
184+ );
219185 }
220186
221187 /**
@@ -229,19 +195,82 @@ public function storeLastPongResponseTimestamp(string $clientId, ?int $timestamp
229195 public function getLastPongResponseTimestamp (string $ clientId ): ?int
230196 {
231197 try {
232- $ key = $ this ->getQueueKey ($ clientId ).':last_pong ' ;
233-
198+ $ key = $ this ->generateQueueKey ($ clientId ) . ':last_pong ' ;
234199 $ timestamp = $ this ->redis ->get ($ key );
200+ return $ timestamp === false ? null : (int )$ timestamp ;
201+ } catch (Exception $ e ) {
202+ $ this ->logAndThrow ('Failed to get last pong timestamp: ' . $ e ->getMessage (), $ e );
203+ }
204+ }
235205
236- if ($ timestamp === false ) {
237- return null ;
238- }
206+ /**
207+ * Get the Redis key for a client's message queue
208+ *
209+ * @param string $clientId The client ID
210+ * @return string The Redis key
211+ */
212+ private function generateQueueKey (string $ clientId ): string
213+ {
214+ return "$ this ->keyPrefix :client: $ clientId " ;
215+ }
216+
217+ /**
218+ * Set the expiration time for the specified key in Redis
219+ *
220+ * @param string $key The key for which the expiration time will be set
221+ * @return void
222+ */
223+ private function setKeyExpiration (string $ key ): void
224+ {
225+ $ this ->redis ->expire ($ key , $ this ->messageTtl );
226+ }
239227
240- return (int ) $ timestamp ;
228+ /**
229+ * Retrieve the length of a Redis list by its key
230+ *
231+ * @param string $key The key of the Redis list
232+ * @return int The length of the Redis list, or 0 if an error occurs
233+ */
234+ private function getRedisKeyLength (string $ key ): int
235+ {
236+ try {
237+ return (int )$ this ->redis ->llen ($ key );
238+ } catch (Exception $ e ) {
239+ $ this ->logger ?->error('Failed to get message count: ' . $ e ->getMessage ());
240+ return 0 ;
241+ }
242+ }
241243
244+ /**
245+ * Executes a Redis command and handles potential exceptions.
246+ *
247+ * @param callable $command The Redis command to be executed
248+ * @param string $errorMessage The error message to log and throw in case of failure
249+ * @return void
250+ *
251+ * @throws SseAdapterException
252+ */
253+ private function executeRedisCommand (callable $ command , string $ errorMessage ): void
254+ {
255+ try {
256+ $ command ();
242257 } catch (Exception $ e ) {
243- $ this ->logger ?->error('Failed to get last pong timestamp: ' .$ e ->getMessage ());
244- throw new SseAdapterException ('Failed to get last pong timestamp: ' .$ e ->getMessage ());
258+ $ this ->logAndThrow ($ errorMessage . ': ' . $ e ->getMessage (), $ e );
245259 }
246260 }
261+
262+ /**
263+ * Logs an error message and throws an exception
264+ *
265+ * @param string $message The error message to log
266+ * @param Exception $e The original exception to be wrapped and thrown
267+ * @return never This method does not return as it always throws an exception
268+ *
269+ * @throws SseAdapterException
270+ */
271+ private function logAndThrow (string $ message , Exception $ e ): never
272+ {
273+ $ this ->logger ?->error($ message );
274+ throw new SseAdapterException ($ message , 0 , $ e );
275+ }
247276}
0 commit comments