|
1 | 1 | import json |
2 | 2 | from typing import Any |
| 3 | +from unittest.mock import patch |
3 | 4 |
|
4 | 5 | import pytest |
| 6 | +from fastmcp.exceptions import ToolError |
5 | 7 | from fastmcp.server import FastMCP |
| 8 | +from neo4j.exceptions import Neo4jError |
6 | 9 |
|
7 | 10 |
|
8 | 11 | @pytest.mark.asyncio(loop_scope="function") |
@@ -53,3 +56,100 @@ async def test_read_neo4j_cypher(mcp_server: FastMCP, init_data: Any): |
53 | 56 | assert result[0]["friend_name"] == "Bob" |
54 | 57 | assert result[1]["person"] == "Bob" |
55 | 58 | assert result[1]["friend_name"] == "Charlie" |
| 59 | + |
| 60 | + |
| 61 | +@pytest.mark.asyncio(loop_scope="function") |
| 62 | +async def test_read_query_timeout_with_slow_query(mcp_server_short_timeout: FastMCP, clear_data: Any): |
| 63 | + """Test that read queries timeout appropriately with a slow query.""" |
| 64 | + # Create a query that should take longer than 0.01 seconds |
| 65 | + slow_query = """ |
| 66 | + WITH range(1, 10000) AS r |
| 67 | + UNWIND r AS x |
| 68 | + WITH x |
| 69 | + WHERE x % 2 = 0 |
| 70 | + RETURN count(x) AS result |
| 71 | + """ |
| 72 | + |
| 73 | + tool = await mcp_server_short_timeout.get_tool("read_neo4j_cypher") |
| 74 | + |
| 75 | + # The query might timeout and raise a ToolError, or it might complete very fast |
| 76 | + # Let's just verify the server handles it without crashing |
| 77 | + try: |
| 78 | + response = await tool.run(dict(query=slow_query)) |
| 79 | + # If it completes, verify it returns valid results |
| 80 | + if response.content[0].text: |
| 81 | + result = json.loads(response.content[0].text) |
| 82 | + assert isinstance(result, list) |
| 83 | + except ToolError as e: |
| 84 | + # If it times out, that's also acceptable behavior |
| 85 | + error_message = str(e) |
| 86 | + assert "Neo4j Error" in error_message |
| 87 | + |
| 88 | + |
| 89 | +@pytest.mark.asyncio(loop_scope="function") |
| 90 | +async def test_read_query_with_normal_timeout_succeeds(mcp_server: FastMCP, init_data: Any): |
| 91 | + """Test that normal queries succeed with reasonable timeout.""" |
| 92 | + query = "MATCH (p:Person) RETURN p.name AS name ORDER BY name" |
| 93 | + |
| 94 | + tool = await mcp_server.get_tool("read_neo4j_cypher") |
| 95 | + response = await tool.run(dict(query=query)) |
| 96 | + |
| 97 | + result = json.loads(response.content[0].text) |
| 98 | + |
| 99 | + # Should succeed and return expected results |
| 100 | + assert len(result) == 3 |
| 101 | + assert result[0]["name"] == "Alice" |
| 102 | + assert result[1]["name"] == "Bob" |
| 103 | + assert result[2]["name"] == "Charlie" |
| 104 | + |
| 105 | + |
| 106 | +@pytest.mark.asyncio(loop_scope="function") |
| 107 | +async def test_schema_query_timeout(mcp_server_short_timeout: FastMCP): |
| 108 | + """Test that schema queries also respect timeout settings.""" |
| 109 | + tool = await mcp_server_short_timeout.get_tool("get_neo4j_schema") |
| 110 | + |
| 111 | + # Schema query should typically be fast, but with very short timeout it might timeout |
| 112 | + # depending on the database state. Let's just verify it doesn't crash |
| 113 | + try: |
| 114 | + response = await tool.run(dict()) |
| 115 | + # If it succeeds, verify the response format |
| 116 | + if response.content[0].text: |
| 117 | + schema = json.loads(response.content[0].text) |
| 118 | + assert isinstance(schema, dict) |
| 119 | + except ToolError as e: |
| 120 | + # If it times out, that's also acceptable behavior for this test |
| 121 | + error_message = str(e) |
| 122 | + assert "Neo4j Error" in error_message or "timeout" in error_message.lower() |
| 123 | + |
| 124 | + |
| 125 | +@pytest.mark.asyncio(loop_scope="function") |
| 126 | +async def test_write_query_no_timeout(mcp_server_short_timeout: FastMCP, clear_data: Any): |
| 127 | + """Test that write queries are not subject to timeout restrictions.""" |
| 128 | + # Write queries should not be affected by read_timeout |
| 129 | + query = "CREATE (n:TimeoutTest {name: 'test', created: timestamp()}) RETURN n.name" |
| 130 | + |
| 131 | + tool = await mcp_server_short_timeout.get_tool("write_neo4j_cypher") |
| 132 | + response = await tool.run(dict(query=query)) |
| 133 | + |
| 134 | + result = json.loads(response.content[0].text) |
| 135 | + |
| 136 | + # Write operation should succeed regardless of short timeout |
| 137 | + assert "nodes_created" in result |
| 138 | + assert result["nodes_created"] == 1 |
| 139 | + |
| 140 | + |
| 141 | +@pytest.mark.asyncio(loop_scope="function") |
| 142 | +async def test_timeout_configuration_passed_correctly(async_neo4j_driver): |
| 143 | + """Test that timeout configuration is properly passed to the server.""" |
| 144 | + from mcp_neo4j_cypher.server import create_mcp_server |
| 145 | + |
| 146 | + # Create servers with different timeout values |
| 147 | + mcp_30s = create_mcp_server(async_neo4j_driver, "neo4j", read_timeout=30) |
| 148 | + mcp_60s = create_mcp_server(async_neo4j_driver, "neo4j", read_timeout=60) |
| 149 | + |
| 150 | + # Both should be created successfully (configuration test) |
| 151 | + assert mcp_30s is not None |
| 152 | + assert mcp_60s is not None |
| 153 | + |
| 154 | + # The actual timeout values are used internally in Query objects, |
| 155 | + # so this test mainly verifies the parameter is accepted without error |
0 commit comments