Skip to content

Improve coverage doctest and test #698

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Aug 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,6 @@ make interrogate doctest test smoketest lint-web flake8 bandit pylint

# Rules
- When using git commit always add a -s to sign commits

# TO test individual files, ensure you're activated the env first, ex:
. /home/cmihai/.venv/mcpgateway/bin/activate && pytest --cov-report=annotate tests/unit/mcpgateway/test_translate.py
25 changes: 16 additions & 9 deletions mcpgateway/alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,22 @@ def _inside_alembic() -> bool:
code or during testing.

Examples:
When running migrations::

$ alembic upgrade head
# _inside_alembic() returns True

When importing in tests or application code::

from mcpgateway.alembic.env import target_metadata
# _inside_alembic() returns False
>>> # Normal import context (no _proxy attribute)
>>> import types
>>> fake_context = types.SimpleNamespace()
>>> import mcpgateway.alembic.env as env_module
>>> original_context = env_module.context
>>> env_module.context = fake_context
>>> env_module._inside_alembic()
False

>>> # Simulated Alembic context (with _proxy attribute)
>>> fake_context._proxy = True
>>> env_module._inside_alembic()
True

>>> # Restore original context
>>> env_module.context = original_context

Note:
This guard is crucial to prevent the migration execution code at the
Expand Down
4 changes: 4 additions & 0 deletions mcpgateway/cache/session_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,10 @@ async def shutdown(self) -> None:
await self._redis.aclose()
except Exception as e:
logger.error(f"Error closing Redis connection: {e}")
# Error example:
# >>> import logging
# >>> logger = logging.getLogger(__name__)
# >>> logger.error(f"Error closing Redis connection: Connection lost") # doctest: +SKIP

async def add_session(self, session_id: str, transport: SSETransport) -> None:
"""Add a session to the registry.
Expand Down
260 changes: 260 additions & 0 deletions tests/unit/mcpgateway/cache/test_session_registry_extended.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
# -*- coding: utf-8 -*-
"""Extended tests for session_registry.py to improve coverage.

Copyright 2025
SPDX-License-Identifier: Apache-2.0
Authors: Mihai Criveti

This test suite focuses on uncovered code paths in session_registry.py
including import error handling, backend edge cases, and error scenarios.
"""

# Future
from __future__ import annotations

# Standard
import sys
from unittest.mock import patch, AsyncMock, Mock
import pytest
import asyncio

# First-Party
from mcpgateway.cache.session_registry import SessionRegistry


class TestImportErrors:
"""Test import error handling for optional dependencies."""

def test_redis_import_error_flag(self):
"""Test REDIS_AVAILABLE flag when redis import fails."""
with patch.dict(sys.modules, {'redis.asyncio': None}):
import importlib
import mcpgateway.cache.session_registry
importlib.reload(mcpgateway.cache.session_registry)

# Should set REDIS_AVAILABLE = False
assert not mcpgateway.cache.session_registry.REDIS_AVAILABLE

def test_sqlalchemy_import_error_flag(self):
"""Test SQLALCHEMY_AVAILABLE flag when sqlalchemy import fails."""
with patch.dict(sys.modules, {'sqlalchemy': None}):
import importlib
import mcpgateway.cache.session_registry
importlib.reload(mcpgateway.cache.session_registry)

# Should set SQLALCHEMY_AVAILABLE = False
assert not mcpgateway.cache.session_registry.SQLALCHEMY_AVAILABLE


class TestNoneBackend:
"""Test 'none' backend functionality."""

@pytest.mark.asyncio
async def test_none_backend_initialization_logging(self, caplog):
"""Test that 'none' backend logs initialization message."""
registry = SessionRegistry(backend="none")

# Check that initialization message is logged
assert "Session registry initialized with 'none' backend - session tracking disabled" in caplog.text

@pytest.mark.asyncio
async def test_none_backend_initialize_method(self):
"""Test 'none' backend initialize method does nothing."""
registry = SessionRegistry(backend="none")

# Should not raise any errors
await registry.initialize()

# No cleanup task should be created
assert registry._cleanup_task is None


class TestRedisBackendErrors:
"""Test Redis backend error scenarios."""

@pytest.mark.asyncio
async def test_redis_add_session_error(self, monkeypatch, caplog):
"""Test Redis error during add_session."""
mock_redis = AsyncMock()
mock_redis.setex = AsyncMock(side_effect=Exception("Redis connection error"))
mock_redis.publish = AsyncMock()

with patch('mcpgateway.cache.session_registry.REDIS_AVAILABLE', True):
with patch('mcpgateway.cache.session_registry.Redis') as MockRedis:
MockRedis.from_url.return_value = mock_redis

registry = SessionRegistry(backend="redis", redis_url="redis://localhost")

class DummyTransport:
async def disconnect(self):
pass
async def is_connected(self):
return True

transport = DummyTransport()
await registry.add_session("test_session", transport)

# Should log the Redis error
assert "Redis error adding session test_session: Redis connection error" in caplog.text

@pytest.mark.asyncio
async def test_redis_broadcast_error(self, monkeypatch, caplog):
"""Test Redis error during broadcast."""
mock_redis = AsyncMock()
mock_redis.publish = AsyncMock(side_effect=Exception("Redis publish error"))

with patch('mcpgateway.cache.session_registry.REDIS_AVAILABLE', True):
with patch('mcpgateway.cache.session_registry.Redis') as MockRedis:
MockRedis.from_url.return_value = mock_redis

registry = SessionRegistry(backend="redis", redis_url="redis://localhost")

await registry.broadcast("test_session", {"test": "message"})

# Should log the Redis error
assert "Redis error during broadcast: Redis publish error" in caplog.text


class TestDatabaseBackendErrors:
"""Test database backend error scenarios."""

@pytest.mark.asyncio
async def test_database_add_session_error(self, monkeypatch, caplog):
"""Test database error during add_session."""
def mock_get_db():
mock_session = Mock()
mock_session.add = Mock(side_effect=Exception("Database connection error"))
mock_session.rollback = Mock()
mock_session.close = Mock()
yield mock_session

with patch('mcpgateway.cache.session_registry.SQLALCHEMY_AVAILABLE', True):
with patch('mcpgateway.cache.session_registry.get_db', mock_get_db):
with patch('asyncio.to_thread') as mock_to_thread:
# Simulate the database error being raised from the thread
mock_to_thread.side_effect = Exception("Database connection error")

registry = SessionRegistry(backend="database", database_url="sqlite:///test.db")

class DummyTransport:
async def disconnect(self):
pass
async def is_connected(self):
return True

transport = DummyTransport()
await registry.add_session("test_session", transport)

# Should log the database error
assert "Database error adding session test_session: Database connection error" in caplog.text

@pytest.mark.asyncio
async def test_database_broadcast_error(self, monkeypatch, caplog):
"""Test database error during broadcast."""
def mock_get_db():
mock_session = Mock()
mock_session.add = Mock(side_effect=Exception("Database broadcast error"))
mock_session.rollback = Mock()
mock_session.close = Mock()
yield mock_session

with patch('mcpgateway.cache.session_registry.SQLALCHEMY_AVAILABLE', True):
with patch('mcpgateway.cache.session_registry.get_db', mock_get_db):
with patch('asyncio.to_thread') as mock_to_thread:
# Simulate the database error being raised from the thread
mock_to_thread.side_effect = Exception("Database broadcast error")

registry = SessionRegistry(backend="database", database_url="sqlite:///test.db")

await registry.broadcast("test_session", {"test": "message"})

# Should log the database error
assert "Database error during broadcast: Database broadcast error" in caplog.text


class TestInitializationAndShutdown:
"""Test initialization and shutdown methods."""

@pytest.mark.asyncio
async def test_memory_backend_initialization_logging(self, caplog):
"""Test memory backend initialization creates cleanup task."""
registry = SessionRegistry(backend="memory")
await registry.initialize()

try:
# Should log initialization
assert "Initializing session registry with backend: memory" in caplog.text
assert "Memory cleanup task started" in caplog.text

# Should have created cleanup task
assert registry._cleanup_task is not None
assert not registry._cleanup_task.done()

finally:
await registry.shutdown()

@pytest.mark.asyncio
async def test_database_backend_initialization_logging(self, caplog):
"""Test database backend initialization creates cleanup task."""
with patch('mcpgateway.cache.session_registry.SQLALCHEMY_AVAILABLE', True):
registry = SessionRegistry(backend="database", database_url="sqlite:///test.db")
await registry.initialize()

try:
# Should log initialization
assert "Initializing session registry with backend: database" in caplog.text
assert "Database cleanup task started" in caplog.text

# Should have created cleanup task
assert registry._cleanup_task is not None
assert not registry._cleanup_task.done()

finally:
await registry.shutdown()

@pytest.mark.asyncio
async def test_redis_initialization_subscribe(self, monkeypatch):
"""Test Redis backend initialization subscribes to events."""
mock_redis = AsyncMock()
mock_pubsub = AsyncMock()
mock_redis.pubsub = Mock(return_value=mock_pubsub) # Use Mock for sync method

with patch('mcpgateway.cache.session_registry.REDIS_AVAILABLE', True):
with patch('mcpgateway.cache.session_registry.Redis') as MockRedis:
MockRedis.from_url.return_value = mock_redis

registry = SessionRegistry(backend="redis", redis_url="redis://localhost")
await registry.initialize()

try:
# Should have subscribed to events channel
mock_pubsub.subscribe.assert_called_once_with("mcp_session_events")

finally:
await registry.shutdown()

@pytest.mark.asyncio
async def test_shutdown_cancels_cleanup_task(self):
"""Test shutdown properly cancels cleanup tasks."""
registry = SessionRegistry(backend="memory")
await registry.initialize()

original_task = registry._cleanup_task
assert not original_task.cancelled()

await registry.shutdown()

# Task should be cancelled
assert original_task.cancelled()

@pytest.mark.asyncio
async def test_shutdown_handles_already_cancelled_task(self):
"""Test shutdown handles already cancelled cleanup task."""
registry = SessionRegistry(backend="memory")
await registry.initialize()

# Cancel task before shutdown
registry._cleanup_task.cancel()

# Shutdown should not raise error
await registry.shutdown()
Loading
Loading