forked from stuzero/pg-mcp-server
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathquery.py
More file actions
87 lines (70 loc) · 3.02 KB
/
query.py
File metadata and controls
87 lines (70 loc) · 3.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# server/tools/query.py
from server.config import mcp
from mcp.server.fastmcp import Context
from mcp.server.fastmcp.utilities.logging import get_logger
logger = get_logger("pg-mcp.tools.query")
async def execute_query(query: str, conn_id: str, params=None, ctx=None):
"""
Execute a read-only SQL query against the PostgreSQL database.
Args:
query: The SQL query to execute (must be read-only)
conn_id: Connection ID (required)
params: Parameters for the query (optional)
ctx: Optional request context
Returns:
Query results as a list of dictionaries
"""
# Access the database from the request context
# Access the database from either context or MCP state
if ctx is not None and hasattr(ctx, 'request_context'):
db = ctx.request_context.lifespan_context["db"]
else:
from server.config import mcp
db = mcp.state["db"]
if db is None:
raise ValueError("Database connection not available in context or MCP state.")
logger.info(f"Executing query on connection ID {conn_id}: {query}")
async with db.get_connection(conn_id) as conn:
# Ensure we're in read-only mode
await conn.execute("SET TRANSACTION READ ONLY")
# Execute the query
try:
records = await conn.fetch(query, *(params or []))
return [dict(record) for record in records]
except Exception as e:
# Log the error but don't couple to specific error types
logger.error(f"Query execution error: {e}")
raise
def register_query_tools():
"""Register database query tools with the MCP server."""
logger.debug("Registering query tools")
@mcp.tool()
async def pg_query(query: str, conn_id: str, params=None):
"""
Execute a read-only SQL query against the PostgreSQL database.
Args:
query: The SQL query to execute (must be read-only)
conn_id: Connection ID previously obtained from the connect tool
params: Parameters for the query (optional)
Returns:
Query results as a list of dictionaries
"""
# Execute the query using the connection ID
return await execute_query(query, conn_id, params)
@mcp.tool()
async def pg_explain(query: str, conn_id: str, params=None):
"""
Execute an EXPLAIN (FORMAT JSON) query to get PostgreSQL execution plan.
Args:
query: The SQL query to analyze
conn_id: Connection ID previously obtained from the connect tool
params: Parameters for the query (optional)
Returns:
Complete JSON-formatted execution plan
"""
# Prepend EXPLAIN to the query
explain_query = f"EXPLAIN {query}"
# Execute the explain query
result = await execute_query(explain_query, conn_id, params)
# Return the complete result
return result