1
+ # -*- coding: utf-8 -*-
2
+ """Extended tests for session_registry.py to improve coverage.
3
+
4
+ Copyright 2025
5
+ SPDX-License-Identifier: Apache-2.0
6
+ Authors: Mihai Criveti
7
+
8
+ This test suite focuses on uncovered code paths in session_registry.py
9
+ including import error handling, backend edge cases, and error scenarios.
10
+ """
11
+
12
+ # Future
13
+ from __future__ import annotations
14
+
15
+ # Standard
16
+ import sys
17
+ from unittest .mock import patch , AsyncMock , Mock
18
+ import pytest
19
+ import asyncio
20
+
21
+ # First-Party
22
+ from mcpgateway .cache .session_registry import SessionRegistry
23
+
24
+
25
+ class TestImportErrors :
26
+ """Test import error handling for optional dependencies."""
27
+
28
+ def test_redis_import_error_flag (self ):
29
+ """Test REDIS_AVAILABLE flag when redis import fails."""
30
+ with patch .dict (sys .modules , {'redis.asyncio' : None }):
31
+ import importlib
32
+ import mcpgateway .cache .session_registry
33
+ importlib .reload (mcpgateway .cache .session_registry )
34
+
35
+ # Should set REDIS_AVAILABLE = False
36
+ assert not mcpgateway .cache .session_registry .REDIS_AVAILABLE
37
+
38
+ def test_sqlalchemy_import_error_flag (self ):
39
+ """Test SQLALCHEMY_AVAILABLE flag when sqlalchemy import fails."""
40
+ with patch .dict (sys .modules , {'sqlalchemy' : None }):
41
+ import importlib
42
+ import mcpgateway .cache .session_registry
43
+ importlib .reload (mcpgateway .cache .session_registry )
44
+
45
+ # Should set SQLALCHEMY_AVAILABLE = False
46
+ assert not mcpgateway .cache .session_registry .SQLALCHEMY_AVAILABLE
47
+
48
+
49
+ class TestNoneBackend :
50
+ """Test 'none' backend functionality."""
51
+
52
+ @pytest .mark .asyncio
53
+ async def test_none_backend_initialization_logging (self , caplog ):
54
+ """Test that 'none' backend logs initialization message."""
55
+ registry = SessionRegistry (backend = "none" )
56
+
57
+ # Check that initialization message is logged
58
+ assert "Session registry initialized with 'none' backend - session tracking disabled" in caplog .text
59
+
60
+ @pytest .mark .asyncio
61
+ async def test_none_backend_initialize_method (self ):
62
+ """Test 'none' backend initialize method does nothing."""
63
+ registry = SessionRegistry (backend = "none" )
64
+
65
+ # Should not raise any errors
66
+ await registry .initialize ()
67
+
68
+ # No cleanup task should be created
69
+ assert registry ._cleanup_task is None
70
+
71
+
72
+ class TestRedisBackendErrors :
73
+ """Test Redis backend error scenarios."""
74
+
75
+ @pytest .mark .asyncio
76
+ async def test_redis_add_session_error (self , monkeypatch , caplog ):
77
+ """Test Redis error during add_session."""
78
+ mock_redis = AsyncMock ()
79
+ mock_redis .setex = AsyncMock (side_effect = Exception ("Redis connection error" ))
80
+ mock_redis .publish = AsyncMock ()
81
+
82
+ with patch ('mcpgateway.cache.session_registry.REDIS_AVAILABLE' , True ):
83
+ with patch ('mcpgateway.cache.session_registry.Redis' ) as MockRedis :
84
+ MockRedis .from_url .return_value = mock_redis
85
+
86
+ registry = SessionRegistry (backend = "redis" , redis_url = "redis://localhost" )
87
+
88
+ class DummyTransport :
89
+ async def disconnect (self ):
90
+ pass
91
+ async def is_connected (self ):
92
+ return True
93
+
94
+ transport = DummyTransport ()
95
+ await registry .add_session ("test_session" , transport )
96
+
97
+ # Should log the Redis error
98
+ assert "Redis error adding session test_session: Redis connection error" in caplog .text
99
+
100
+ @pytest .mark .asyncio
101
+ async def test_redis_broadcast_error (self , monkeypatch , caplog ):
102
+ """Test Redis error during broadcast."""
103
+ mock_redis = AsyncMock ()
104
+ mock_redis .publish = AsyncMock (side_effect = Exception ("Redis publish error" ))
105
+
106
+ with patch ('mcpgateway.cache.session_registry.REDIS_AVAILABLE' , True ):
107
+ with patch ('mcpgateway.cache.session_registry.Redis' ) as MockRedis :
108
+ MockRedis .from_url .return_value = mock_redis
109
+
110
+ registry = SessionRegistry (backend = "redis" , redis_url = "redis://localhost" )
111
+
112
+ await registry .broadcast ("test_session" , {"test" : "message" })
113
+
114
+ # Should log the Redis error
115
+ assert "Redis error during broadcast: Redis publish error" in caplog .text
116
+
117
+
118
+ class TestDatabaseBackendErrors :
119
+ """Test database backend error scenarios."""
120
+
121
+ @pytest .mark .asyncio
122
+ async def test_database_add_session_error (self , monkeypatch , caplog ):
123
+ """Test database error during add_session."""
124
+ def mock_get_db ():
125
+ mock_session = Mock ()
126
+ mock_session .add = Mock (side_effect = Exception ("Database connection error" ))
127
+ mock_session .rollback = Mock ()
128
+ mock_session .close = Mock ()
129
+ yield mock_session
130
+
131
+ with patch ('mcpgateway.cache.session_registry.SQLALCHEMY_AVAILABLE' , True ):
132
+ with patch ('mcpgateway.cache.session_registry.get_db' , mock_get_db ):
133
+ with patch ('asyncio.to_thread' ) as mock_to_thread :
134
+ # Simulate the database error being raised from the thread
135
+ mock_to_thread .side_effect = Exception ("Database connection error" )
136
+
137
+ registry = SessionRegistry (backend = "database" , database_url = "sqlite:///test.db" )
138
+
139
+ class DummyTransport :
140
+ async def disconnect (self ):
141
+ pass
142
+ async def is_connected (self ):
143
+ return True
144
+
145
+ transport = DummyTransport ()
146
+ await registry .add_session ("test_session" , transport )
147
+
148
+ # Should log the database error
149
+ assert "Database error adding session test_session: Database connection error" in caplog .text
150
+
151
+ @pytest .mark .asyncio
152
+ async def test_database_broadcast_error (self , monkeypatch , caplog ):
153
+ """Test database error during broadcast."""
154
+ def mock_get_db ():
155
+ mock_session = Mock ()
156
+ mock_session .add = Mock (side_effect = Exception ("Database broadcast error" ))
157
+ mock_session .rollback = Mock ()
158
+ mock_session .close = Mock ()
159
+ yield mock_session
160
+
161
+ with patch ('mcpgateway.cache.session_registry.SQLALCHEMY_AVAILABLE' , True ):
162
+ with patch ('mcpgateway.cache.session_registry.get_db' , mock_get_db ):
163
+ with patch ('asyncio.to_thread' ) as mock_to_thread :
164
+ # Simulate the database error being raised from the thread
165
+ mock_to_thread .side_effect = Exception ("Database broadcast error" )
166
+
167
+ registry = SessionRegistry (backend = "database" , database_url = "sqlite:///test.db" )
168
+
169
+ await registry .broadcast ("test_session" , {"test" : "message" })
170
+
171
+ # Should log the database error
172
+ assert "Database error during broadcast: Database broadcast error" in caplog .text
173
+
174
+
175
+ class TestInitializationAndShutdown :
176
+ """Test initialization and shutdown methods."""
177
+
178
+ @pytest .mark .asyncio
179
+ async def test_memory_backend_initialization_logging (self , caplog ):
180
+ """Test memory backend initialization creates cleanup task."""
181
+ registry = SessionRegistry (backend = "memory" )
182
+ await registry .initialize ()
183
+
184
+ try :
185
+ # Should log initialization
186
+ assert "Initializing session registry with backend: memory" in caplog .text
187
+ assert "Memory cleanup task started" in caplog .text
188
+
189
+ # Should have created cleanup task
190
+ assert registry ._cleanup_task is not None
191
+ assert not registry ._cleanup_task .done ()
192
+
193
+ finally :
194
+ await registry .shutdown ()
195
+
196
+ @pytest .mark .asyncio
197
+ async def test_database_backend_initialization_logging (self , caplog ):
198
+ """Test database backend initialization creates cleanup task."""
199
+ with patch ('mcpgateway.cache.session_registry.SQLALCHEMY_AVAILABLE' , True ):
200
+ registry = SessionRegistry (backend = "database" , database_url = "sqlite:///test.db" )
201
+ await registry .initialize ()
202
+
203
+ try :
204
+ # Should log initialization
205
+ assert "Initializing session registry with backend: database" in caplog .text
206
+ assert "Database cleanup task started" in caplog .text
207
+
208
+ # Should have created cleanup task
209
+ assert registry ._cleanup_task is not None
210
+ assert not registry ._cleanup_task .done ()
211
+
212
+ finally :
213
+ await registry .shutdown ()
214
+
215
+ @pytest .mark .asyncio
216
+ async def test_redis_initialization_subscribe (self , monkeypatch ):
217
+ """Test Redis backend initialization subscribes to events."""
218
+ mock_redis = AsyncMock ()
219
+ mock_pubsub = AsyncMock ()
220
+ mock_redis .pubsub = Mock (return_value = mock_pubsub ) # Use Mock for sync method
221
+
222
+ with patch ('mcpgateway.cache.session_registry.REDIS_AVAILABLE' , True ):
223
+ with patch ('mcpgateway.cache.session_registry.Redis' ) as MockRedis :
224
+ MockRedis .from_url .return_value = mock_redis
225
+
226
+ registry = SessionRegistry (backend = "redis" , redis_url = "redis://localhost" )
227
+ await registry .initialize ()
228
+
229
+ try :
230
+ # Should have subscribed to events channel
231
+ mock_pubsub .subscribe .assert_called_once_with ("mcp_session_events" )
232
+
233
+ finally :
234
+ await registry .shutdown ()
235
+
236
+ @pytest .mark .asyncio
237
+ async def test_shutdown_cancels_cleanup_task (self ):
238
+ """Test shutdown properly cancels cleanup tasks."""
239
+ registry = SessionRegistry (backend = "memory" )
240
+ await registry .initialize ()
241
+
242
+ original_task = registry ._cleanup_task
243
+ assert not original_task .cancelled ()
244
+
245
+ await registry .shutdown ()
246
+
247
+ # Task should be cancelled
248
+ assert original_task .cancelled ()
249
+
250
+ @pytest .mark .asyncio
251
+ async def test_shutdown_handles_already_cancelled_task (self ):
252
+ """Test shutdown handles already cancelled cleanup task."""
253
+ registry = SessionRegistry (backend = "memory" )
254
+ await registry .initialize ()
255
+
256
+ # Cancel task before shutdown
257
+ registry ._cleanup_task .cancel ()
258
+
259
+ # Shutdown should not raise error
260
+ await registry .shutdown ()
0 commit comments