Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Add explain_query and explain_query_with_params tools

## 0.1.5 ##
* Fix error outputSchema defined but no structured output returned

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
ydb>=3.21.0
ydb>=3.21.9
mcp==1.12.4
123 changes: 113 additions & 10 deletions ydb_mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,78 @@ def _stringify_dict_keys(self, obj):
else:
return obj

async def explain_query(self, sql: str, params: Optional[Dict[str, Any]] = None) -> List[TextContent]:
"""Explain a SQL query against YDB

Args:
sql: SQL query to execute
params: Optional query parameters

Returns:
Execution plan of the query as TextContent object with JSON-formatted execution plan
"""
# Check if there's an authentication error
if self.auth_error:
return [TextContent(type="text", text=json.dumps({"error": self.auth_error}, indent=2))]

try:
pool = await self.get_pool()
ydb_params = None
if params:
ydb_params = {}
for key, value in params.items():
param_key = key if key.startswith("$") else f"${key}"
ydb_params[param_key] = value

structured_plan = await pool.explain_with_retries(
query=sql,
parameters=ydb_params,
result_format=ydb.QueryExplainResultFormat.DICT,
)

safe_plan = self._stringify_dict_keys(structured_plan)
formatted_plan = json.dumps(safe_plan, indent=2, cls=CustomJSONEncoder)
logger.info(f"Query plan: {formatted_plan}")
return [TextContent(type="text", text=formatted_plan)]
except Exception as e:
error_message = str(e)
safe_error = self._stringify_dict_keys({"error": error_message})
return [TextContent(type="text", text=json.dumps(safe_error, indent=2))]

async def explain_query_with_params(self, sql: str, params: str) -> List[TextContent]:
"""Explain a SQL query against YDB

Args:
sql: SQL query to execute
params: Optional query parameters

Returns:
Execution plan of the query as TextContent object with JSON-formatted execution plan
"""
"""Run a parameterized SQL query with JSON parameters.

Args:
sql: SQL query to execute
params: Parameters as a JSON string

Returns:
Query results as a list of TextContent objects or a dictionary
"""
# Handle authentication errors
if self.auth_error:
logger.error(f"Authentication error: {self.auth_error}")
safe_error = self._stringify_dict_keys({"error": f"Authentication error: {self.auth_error}"})
return [TextContent(type="text", text=json.dumps(safe_error, indent=2))]

try:
ydb_params = self._parse_str_to_ydb_params(params)
except json.JSONDecodeError as e:
logger.error(f"Error parsing JSON parameters: {str(e)}")
safe_error = self._stringify_dict_keys({"error": f"Error parsing JSON parameters: {str(e)}"})
return [TextContent(type="text", text=json.dumps(safe_error, indent=2))]

return await self.explain_query(sql, ydb_params)

async def query(self, sql: str, params: Optional[Dict[str, Any]] = None) -> List[TextContent]:
"""Run a SQL query against YDB.

Expand Down Expand Up @@ -444,14 +516,25 @@ async def query_with_params(self, sql: str, params: str) -> List[TextContent]:
logger.error(f"Authentication error: {self.auth_error}")
safe_error = self._stringify_dict_keys({"error": f"Authentication error: {self.auth_error}"})
return [TextContent(type="text", text=json.dumps(safe_error, indent=2))]
parsed_params = {}
try:
if params and params.strip():
parsed_params = json.loads(params)
ydb_params = self._parse_str_to_ydb_params(params)

return await self.query(sql, ydb_params)
except json.JSONDecodeError as e:
logger.error(f"Error parsing JSON parameters: {str(e)}")
safe_error = self._stringify_dict_keys({"error": f"Error parsing JSON parameters: {str(e)}"})
return [TextContent(type="text", text=json.dumps(safe_error, indent=2))]
except Exception as e:
error_message = f"Error executing parameterized query: {str(e)}"
logger.error(error_message)
safe_error = self._stringify_dict_keys({"error": error_message})
return [TextContent(type="text", text=json.dumps(safe_error, indent=2))]

def _parse_str_to_ydb_params(self, params: str) -> Dict:
parsed_params = {}
if params and params.strip():
parsed_params = json.loads(params)

# Convert [value, type] to YDB type if needed
ydb_params = {}
for key, value in parsed_params.items():
Expand All @@ -465,13 +548,9 @@ async def query_with_params(self, sql: str, params: str) -> List[TextContent]:
ydb_params[param_key] = param_value
else:
ydb_params[param_key] = value
try:
return await self.query(sql, ydb_params)
except Exception as e:
error_message = f"Error executing parameterized query: {str(e)}"
logger.error(error_message)
safe_error = self._stringify_dict_keys({"error": error_message})
return [TextContent(type="text", text=json.dumps(safe_error, indent=2))]

return ydb_params


def register_tools(self):
"""Register YDB query tools.
Expand All @@ -483,6 +562,30 @@ def register_tools(self):
"""
# Define tool specifications
tool_specs = [
{
"name": "ydb_explain_query",
"description": "Explain a SQL query against YDB",
"handler": self.explain_query, # Use real handler
"parameters": {
"properties": {"sql": {"type": "string", "title": "Sql"}},
"required": ["sql"],
"type": "object",
},
},
{
"name": "ydb_explain_query_with_params",
"description": "Explain a parametrized SQL query with JSON parameters",
"handler": self.explain_query_with_params, # Use real handler
"parameters": {
"properties": {
"sql": {"type": "string", "title": "Sql"},
"params": {"type": "string", "title": "Params"},
},
"required": ["sql", "params"],
"type": "object",
},

},
{
"name": "ydb_query",
"description": "Run a SQL query against YDB database",
Expand Down