-
Notifications
You must be signed in to change notification settings - Fork 1
Refactor server by merging the initial and hackathon ones #9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 2 commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,18 +1,54 @@ | ||
from typing import TYPE_CHECKING | ||
|
||
from fastmcp import Context, FastMCP | ||
from infrahub_sdk.branch import BranchData | ||
from infrahub_sdk.exceptions import GraphQLError | ||
|
||
from infrahub_mcp_server.utils import MCPResponse, MCPToolStatus, _log_and_return_error | ||
|
||
if TYPE_CHECKING: | ||
from infrahub_sdk import InfrahubClient | ||
|
||
mcp: FastMCP = FastMCP(name="Infrahub Branch") | ||
mcp: FastMCP = FastMCP(name="Infrahub Branches") | ||
|
||
|
||
@mcp.tool(tags=["branches", "create"]) | ||
async def branch_create(ctx: Context, name: str, sync_with_git: bool = False) -> MCPResponse[dict[str, str]]: | ||
"""Create a new branch in infrahub. | ||
Parameters: | ||
name: Name of the branch to create. | ||
sync_with_git: Whether to sync the branch with git. Defaults to False. | ||
@mcp.tool | ||
async def branch_create(ctx: Context, name: str, sync_with_git: bool = False) -> dict: | ||
"""Create a new branch in infrahub.""" | ||
Returns: | ||
Dictionary with success status and branch details. | ||
""" | ||
|
||
client: InfrahubClient = ctx.request_context.lifespan_context.client | ||
branch = await client.branch.create(branch_name=name, sync_with_git=sync_with_git, background_execution=False) | ||
ctx.info(f"Creating branch {name} in Infrahub...") | ||
|
||
try: | ||
branch = await client.branch.create(branch_name=name, sync_with_git=sync_with_git, background_execution=False) | ||
|
||
except GraphQLError as exc: | ||
return _log_and_return_error(ctx=ctx, error=exc, remediation="Check the branch name or your permissions.") | ||
|
||
return MCPResponse( | ||
status=MCPToolStatus.SUCCESS, | ||
data={ | ||
"name": branch.name, | ||
"id": branch.id, | ||
}, | ||
) | ||
|
||
|
||
@mcp.tool(tags=["branches", "retrieve"]) | ||
async def get_branches(ctx: Context) -> MCPResponse[dict[str, BranchData]]: | ||
"""Retrieve all branches from infrahub.""" | ||
|
||
client: InfrahubClient = ctx.request_context.lifespan_context.client | ||
ctx.info("Fetching all branches from Infrahub...") | ||
|
||
branches: dict[str, BranchData] = await client.branch.all() | ||
|
||
return {"name": branch.name, "id": branch.id} | ||
return MCPResponse(status=MCPToolStatus.SUCCESS, data=branches) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,7 @@ | ||
NAMESPACES_INTERNAL = ["Internal", "Profile", "Template"] | ||
|
||
schema_attribute_type_mapping = { | ||
"Text": "String", | ||
"Number": "Integer", | ||
"Boolean": "Boolean", | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,219 @@ | ||
from typing import TYPE_CHECKING, Any | ||
|
||
from fastmcp import Context, FastMCP | ||
from infrahub_sdk.exceptions import GraphQLError, SchemaNotFoundError | ||
from infrahub_sdk.types import Order | ||
|
||
from infrahub_mcp_server.constants import schema_attribute_type_mapping | ||
from infrahub_mcp_server.utils import MCPResponse, MCPToolStatus, _log_and_return_error, convert_node_to_dict | ||
|
||
if TYPE_CHECKING: | ||
from infrahub_sdk.client import InfrahubClient | ||
|
||
mcp: FastMCP = FastMCP(name="Infrahub Nodes") | ||
|
||
|
||
@mcp.tool(tags=["nodes", "retrieve"]) | ||
async def get_nodes( | ||
ctx: Context, | ||
kind: str, | ||
branch: str | None = None, | ||
filters: dict | None = None, | ||
partial_match: bool = False, | ||
) -> MCPResponse[list[str]]: | ||
"""Get all objects of a specific kind from Infrahub. | ||
|
||
To retrieve the list of available kinds, use the `get_schema_mapping` tool. | ||
To retrieve the list of available filters for a specific kind, use the `get_node_filters` tool. | ||
|
||
Parameters: | ||
kind: Kind of the objects to retrieve. | ||
branch: Branch to retrieve the objects from. Defaults to None (uses default branch). | ||
filters: Dictionary of filters to apply. | ||
partial_match: Whether to use partial matching for filters. | ||
|
||
Returns: | ||
Dictionary with success status and objects. | ||
|
||
""" | ||
client: InfrahubClient = ctx.request_context.lifespan_context.client | ||
ctx.info(f"Fetching nodes of kind: {kind} with filters: {filters} from Infrahub...") | ||
|
||
# Verify if the kind exists in the schema and guide Tool if not | ||
try: | ||
schema = await client.schema.get(kind=kind, branch=branch) | ||
except SchemaNotFoundError: | ||
error_msg = f"Schema not found for kind: {kind}." | ||
remediation_msg = "Use the `get_schema_mapping` tool to list available kinds." | ||
return _log_and_return_error(ctx=ctx, error=error_msg, remediation=remediation_msg) | ||
|
||
# TODO: Verify if the filters are valid for the kind and guide Tool if not | ||
|
||
try: | ||
if filters: | ||
nodes = await client.filters( | ||
kind=schema.kind, | ||
branch=branch, | ||
partial_match=partial_match, | ||
parallel=True, | ||
order=Order(disable=True), | ||
populate_store=True, | ||
prefetch_relationships=True, | ||
**filters, | ||
) | ||
else: | ||
nodes = await client.all( | ||
kind=schema.kind, | ||
branch=branch, | ||
parallel=True, | ||
order=Order(disable=True), | ||
populate_store=True, | ||
prefetch_relationships=True, | ||
) | ||
except GraphQLError as exc: | ||
return _log_and_return_error(ctx=ctx, error=exc, remediation="Check the provided filters or the kind name.") | ||
|
||
# Format the response with serializable data | ||
# serialized_nodes = [] | ||
# for node in nodes: | ||
# node_data = await convert_node_to_dict(obj=node, branch=branch) | ||
# serialized_nodes.append(node_data) | ||
serialized_nodes = [obj.display_label for obj in nodes] | ||
|
||
# Return the serialized response | ||
ctx.debug(f"Retrieved {len(serialized_nodes)} nodes of kind {kind}") | ||
|
||
return MCPResponse( | ||
status=MCPToolStatus.SUCCESS, | ||
data=serialized_nodes, | ||
) | ||
|
||
|
||
@mcp.tool(tags=["nodes", "filters", "retrieve"]) | ||
async def get_node_filters( | ||
ctx: Context, | ||
kind: str, | ||
branch: str | None = None, | ||
) -> MCPResponse[dict[str, str]]: | ||
"""Retrieve all the available filters for a specific schema node kind. | ||
|
||
There's multiple types of filters | ||
attribute filters are in the form attribute__value | ||
|
||
relationship filters are in the form relationship__attribute__value | ||
you can find more information on the peer node of the relationship using the `get_schema` tool | ||
|
||
Filters that start with parent refer to a related generic schema node. | ||
You can find the type of that related node by inspected the output of the `get_schema` tool. | ||
|
||
Parameters: | ||
kind: Kind of the objects to retrieve. | ||
branch: Branch to retrieve the objects from. Defaults to None (uses default branch). | ||
|
||
Returns: | ||
Dictionary with success status and filters. | ||
""" | ||
client: InfrahubClient = ctx.request_context.lifespan_context.client | ||
ctx.info(f"Fetching available filters for kind: {kind} from Infrahub...") | ||
|
||
# Verify if the kind exists in the schema and guide Tool if not | ||
try: | ||
schema = await client.schema.get(kind=kind, branch=branch) | ||
except SchemaNotFoundError: | ||
error_msg = f"Schema not found for kind: {kind}." | ||
remediation_msg = "Use the `get_schema_mapping` tool to list available kinds." | ||
return _log_and_return_error(ctx=ctx, error=error_msg, remediation=remediation_msg) | ||
|
||
filters = { | ||
f"{attribute.name}__value": schema_attribute_type_mapping.get(attribute.kind, "String") | ||
for attribute in schema.attributes | ||
} | ||
|
||
for relationship in schema.relationships: | ||
relationship_schema = await client.schema.get(kind=relationship.peer) | ||
relationship_filters = { | ||
f"{relationship.name}__{attribute.name}__value": schema_attribute_type_mapping.get(attribute.kind, "String") | ||
for attribute in relationship_schema.attributes | ||
} | ||
filters.update(relationship_filters) | ||
|
||
return MCPResponse( | ||
status=MCPToolStatus.SUCCESS, | ||
data=filters, | ||
) | ||
|
||
|
||
@mcp.tool(tags=["nodes", "retrieve"]) | ||
async def get_related_nodes( | ||
ctx: Context, | ||
kind: str, | ||
relation: str, | ||
filters: dict | None = None, | ||
branch: str | None = None, | ||
) -> MCPResponse[list[dict[str, Any]]]: | ||
"""Retrieve related nodes by relation name and a kind. | ||
|
||
Args: | ||
kind: Kind of the node to fetch. | ||
filters: Filters to apply on the node to fetch. | ||
relation: Name of the relation to fetch. | ||
branch: Branch to fetch the node from. Defaults to None (uses default branch). | ||
|
||
Returns: | ||
Dictionary with success status and objects. | ||
|
||
""" | ||
client: InfrahubClient = ctx.request_context.lifespan_context.client | ||
filters = filters or {} | ||
if branch: | ||
ctx.info(f"Fetching nodes related to {kind} with filters {filters} in branch {branch} from Infrahub...") | ||
else: | ||
ctx.info(f"Fetching nodes related to {kind} with filters {filters} from Infrahub...") | ||
|
||
try: | ||
node_id = node_hfid = None | ||
if filters.get("ids"): | ||
node_id = filters["ids"][0] | ||
elif filters.get("hfid"): | ||
node_hfid = filters["hfid"] | ||
if node_id: | ||
node = await client.get( | ||
kind=kind, | ||
id=node_id, | ||
branch=branch, | ||
include=[relation], | ||
prefetch_relationships=True, | ||
populate_store=True, | ||
) | ||
elif node_hfid: | ||
node = await client.get( | ||
kind=kind, | ||
hfid=node_hfid, | ||
branch=branch, | ||
include=[relation], | ||
prefetch_relationships=True, | ||
populate_store=True, | ||
) | ||
except Exception as exc: # noqa: BLE001 | ||
return _log_and_return_error(exc) | ||
|
||
rel = getattr(node, relation, None) | ||
if not rel: | ||
_log_and_return_error( | ||
ctx=ctx, | ||
error=f"Relation '{relation}' not found in kind '{kind}'.", | ||
remediation="Check the schema for the kind to confirm if the relation exists.", | ||
) | ||
peers = [ | ||
await convert_node_to_dict( | ||
branch=branch, | ||
obj=peer.peer, | ||
include_id=True, | ||
) | ||
for peer in rel.peers | ||
] | ||
|
||
return MCPResponse( | ||
status=MCPToolStatus.SUCCESS, | ||
data=peers, | ||
) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious, what is the impact of having the tags, how are they influencing the LLM ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from fastmcp docs: