Skip to content

Commit 1d62725

Browse files
authored
Merge pull request #9 from opsmill/bkr-refactor-server
Refactor server by merging the initial and hackathon ones
2 parents f80a50d + 46808df commit 1d62725

File tree

12 files changed

+1096
-1211
lines changed

12 files changed

+1096
-1211
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ MCP server to interact with Infrahub
44

55
## Requirements
66

7-
- Python 3.8+
7+
- Python 3.13+
88
- fastmcp
99
- infrahub_sdk
1010

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ readme = "README.md"
66
requires-python = ">=3.13"
77
dependencies = [
88
"fastmcp>=2.10.5",
9-
"infrahub-sdk[all]>=1.13.5",
9+
"infrahub-sdk>=1.13.5",
1010
]
1111

1212
[dependency-groups]

src/infrahub_mcp_server/branch.py

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,63 @@
1-
from typing import TYPE_CHECKING
1+
from typing import TYPE_CHECKING, Annotated
22

33
from fastmcp import Context, FastMCP
4+
from infrahub_sdk.branch import BranchData
5+
from infrahub_sdk.exceptions import GraphQLError
6+
from mcp.types import ToolAnnotations
7+
from pydantic import Field
8+
9+
from infrahub_mcp_server.utils import MCPResponse, MCPToolStatus, _log_and_return_error
410

511
if TYPE_CHECKING:
612
from infrahub_sdk import InfrahubClient
713

8-
mcp: FastMCP = FastMCP(name="Infrahub Branch")
14+
mcp: FastMCP = FastMCP(name="Infrahub Branches")
15+
16+
17+
@mcp.tool(
18+
tags=["branches", "create"],
19+
annotations=ToolAnnotations(readOnlyHint=False, idempotentHint=True, destructiveHint=False),
20+
)
21+
async def branch_create(
22+
ctx: Context,
23+
name: Annotated[str, Field(description="Name of the branch to create.")],
24+
sync_with_git: Annotated[bool, Field(default=False, description="Whether to sync the branch with git.")],
25+
) -> MCPResponse[dict[str, str]]:
26+
"""Create a new branch in infrahub.
927
28+
Parameters:
29+
name: Name of the branch to create.
30+
sync_with_git: Whether to sync the branch with git. Defaults to False.
1031
11-
@mcp.tool
12-
async def branch_create(ctx: Context, name: str, sync_with_git: bool = False) -> dict:
13-
"""Create a new branch in infrahub."""
32+
Returns:
33+
Dictionary with success status and branch details.
34+
"""
1435

1536
client: InfrahubClient = ctx.request_context.lifespan_context.client
16-
branch = await client.branch.create(branch_name=name, sync_with_git=sync_with_git, background_execution=False)
37+
ctx.info(f"Creating branch {name} in Infrahub...")
38+
39+
try:
40+
branch = await client.branch.create(branch_name=name, sync_with_git=sync_with_git, background_execution=False)
41+
42+
except GraphQLError as exc:
43+
return _log_and_return_error(ctx=ctx, error=exc, remediation="Check the branch name or your permissions.")
44+
45+
return MCPResponse(
46+
status=MCPToolStatus.SUCCESS,
47+
data={
48+
"name": branch.name,
49+
"id": branch.id,
50+
},
51+
)
52+
53+
54+
@mcp.tool(tags=["branches", "retrieve"], annotations=ToolAnnotations(readOnlyHint=True))
55+
async def get_branches(ctx: Context) -> MCPResponse[dict[str, BranchData]]:
56+
"""Retrieve all branches from infrahub."""
57+
58+
client: InfrahubClient = ctx.request_context.lifespan_context.client
59+
ctx.info("Fetching all branches from Infrahub...")
60+
61+
branches: dict[str, BranchData] = await client.branch.all()
1762

18-
return {"name": branch.name, "id": branch.id}
63+
return MCPResponse(status=MCPToolStatus.SUCCESS, data=branches)
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,9 @@
11
NAMESPACES_INTERNAL = ["Internal", "Profile", "Template"]
2+
3+
schema_attribute_type_mapping = {
4+
"Text": "String",
5+
"Number": "Integer",
6+
"Boolean": "Boolean",
7+
"DateTime": "DateTime",
8+
"Enum": "String",
9+
}

src/infrahub_mcp_server/gql.py

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,44 @@
1-
from typing import TYPE_CHECKING
1+
from typing import TYPE_CHECKING, Annotated, Any
22

33
from fastmcp import Context, FastMCP
4+
from mcp.types import ToolAnnotations
5+
from pydantic import Field
6+
7+
from infrahub_mcp_server.utils import MCPResponse, MCPToolStatus
48

59
if TYPE_CHECKING:
610
from infrahub_sdk import InfrahubClient
711

812
mcp: FastMCP = FastMCP(name="Infrahub GraphQL")
913

1014

11-
@mcp.tool
12-
async def get_graphql_schema(ctx: Context) -> str:
13-
"""Retrieve the GraphQL schema from Infrahub"""
15+
@mcp.tool(tags=["schemas", "retrieve"], annotations=ToolAnnotations(readOnlyHint=True))
16+
async def get_graphql_schema(ctx: Context) -> MCPResponse[str]:
17+
"""Retrieve the GraphQL schema from Infrahub
18+
19+
Parameters:
20+
None
21+
22+
Returns:
23+
MCPResponse with the GraphQL schema as a string.
24+
"""
1425
client: InfrahubClient = ctx.request_context.lifespan_context.client
1526
resp = await client._get(url=f"{client.address}/schema.graphql") # noqa: SLF001
16-
return resp.text
27+
return MCPResponse(status=MCPToolStatus.SUCCESS, data=resp.text)
28+
29+
30+
@mcp.tool(tags=["schemas", "retrieve"], annotations=ToolAnnotations(readOnlyHint=False))
31+
async def query_graphql(
32+
ctx: Context, query: Annotated[str, Field(description="GraphQL query to execute.")]
33+
) -> MCPResponse[dict[str, Any]]:
34+
"""Execute a GraphQL query against Infrahub.
35+
36+
Parameters:
37+
query: GraphQL query to execute.
1738
39+
Returns:
40+
MCPResponse with the result of the query.
1841
19-
@mcp.tool
20-
async def query_graphql(ctx: Context, query: str) -> dict:
21-
"""Execute a GraphQL query against Infrahub."""
42+
"""
2243
client: InfrahubClient = ctx.request_context.lifespan_context.client
2344
return await client.execute_graphql(query=query)

src/infrahub_mcp_server/nodes.py

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
from typing import TYPE_CHECKING, Annotated, Any
2+
3+
from fastmcp import Context, FastMCP
4+
from infrahub_sdk.exceptions import GraphQLError, SchemaNotFoundError
5+
from infrahub_sdk.types import Order
6+
from mcp.types import ToolAnnotations
7+
from pydantic import Field
8+
9+
from infrahub_mcp_server.constants import schema_attribute_type_mapping
10+
from infrahub_mcp_server.utils import MCPResponse, MCPToolStatus, _log_and_return_error, convert_node_to_dict
11+
12+
if TYPE_CHECKING:
13+
from infrahub_sdk.client import InfrahubClient
14+
15+
mcp: FastMCP = FastMCP(name="Infrahub Nodes")
16+
17+
18+
@mcp.tool(tags=["nodes", "retrieve"], annotations=ToolAnnotations(readOnlyHint=True))
19+
async def get_nodes(
20+
ctx: Context,
21+
kind: Annotated[str, Field(description="Kind of the objects to retrieve.")],
22+
branch: Annotated[
23+
str | None,
24+
Field(default=None, description="Branch to retrieve the objects from. Defaults to None (uses default branch)."),
25+
],
26+
filters: Annotated[dict[str, Any] | None, Field(default=None, description="Dictionary of filters to apply.")],
27+
partial_match: Annotated[bool, Field(default=False, description="Whether to use partial matching for filters.")],
28+
) -> MCPResponse[list[str]]:
29+
"""Get all objects of a specific kind from Infrahub.
30+
31+
To retrieve the list of available kinds, use the `get_schema_mapping` tool.
32+
To retrieve the list of available filters for a specific kind, use the `get_node_filters` tool.
33+
34+
Parameters:
35+
kind: Kind of the objects to retrieve.
36+
branch: Branch to retrieve the objects from. Defaults to None (uses default branch).
37+
filters: Dictionary of filters to apply.
38+
partial_match: Whether to use partial matching for filters.
39+
40+
Returns:
41+
MCPResponse with success status and objects.
42+
43+
"""
44+
client: InfrahubClient = ctx.request_context.lifespan_context.client
45+
ctx.info(f"Fetching nodes of kind: {kind} with filters: {filters} from Infrahub...")
46+
47+
# Verify if the kind exists in the schema and guide Tool if not
48+
try:
49+
schema = await client.schema.get(kind=kind, branch=branch)
50+
except SchemaNotFoundError:
51+
error_msg = f"Schema not found for kind: {kind}."
52+
remediation_msg = "Use the `get_schema_mapping` tool to list available kinds."
53+
return _log_and_return_error(ctx=ctx, error=error_msg, remediation=remediation_msg)
54+
55+
# TODO: Verify if the filters are valid for the kind and guide Tool if not
56+
57+
try:
58+
if filters:
59+
nodes = await client.filters(
60+
kind=schema.kind,
61+
branch=branch,
62+
partial_match=partial_match,
63+
parallel=True,
64+
order=Order(disable=True),
65+
populate_store=True,
66+
prefetch_relationships=True,
67+
**filters,
68+
)
69+
else:
70+
nodes = await client.all(
71+
kind=schema.kind,
72+
branch=branch,
73+
parallel=True,
74+
order=Order(disable=True),
75+
populate_store=True,
76+
prefetch_relationships=True,
77+
)
78+
except GraphQLError as exc:
79+
return _log_and_return_error(ctx=ctx, error=exc, remediation="Check the provided filters or the kind name.")
80+
81+
# Format the response with serializable data
82+
# serialized_nodes = []
83+
# for node in nodes:
84+
# node_data = await convert_node_to_dict(obj=node, branch=branch)
85+
# serialized_nodes.append(node_data)
86+
serialized_nodes = [obj.display_label for obj in nodes]
87+
88+
# Return the serialized response
89+
ctx.debug(f"Retrieved {len(serialized_nodes)} nodes of kind {kind}")
90+
91+
return MCPResponse(
92+
status=MCPToolStatus.SUCCESS,
93+
data=serialized_nodes,
94+
)
95+
96+
97+
@mcp.tool(tags=["nodes", "filters", "retrieve"], annotations=ToolAnnotations(readOnlyHint=True))
98+
async def get_node_filters(
99+
ctx: Context,
100+
kind: Annotated[str, Field(description="Kind of the objects to retrieve.")],
101+
branch: Annotated[
102+
str | None,
103+
Field(default=None, description="Branch to retrieve the objects from. Defaults to None (uses default branch)."),
104+
],
105+
) -> MCPResponse[dict[str, str]]:
106+
"""Retrieve all the available filters for a specific schema node kind.
107+
108+
There's multiple types of filters
109+
attribute filters are in the form attribute__value
110+
111+
relationship filters are in the form relationship__attribute__value
112+
you can find more information on the peer node of the relationship using the `get_schema` tool
113+
114+
Filters that start with parent refer to a related generic schema node.
115+
You can find the type of that related node by inspected the output of the `get_schema` tool.
116+
117+
Parameters:
118+
kind: Kind of the objects to retrieve.
119+
branch: Branch to retrieve the objects from. Defaults to None (uses default branch).
120+
121+
Returns:
122+
MCPResponse with success status and filters.
123+
"""
124+
client: InfrahubClient = ctx.request_context.lifespan_context.client
125+
ctx.info(f"Fetching available filters for kind: {kind} from Infrahub...")
126+
127+
# Verify if the kind exists in the schema and guide Tool if not
128+
try:
129+
schema = await client.schema.get(kind=kind, branch=branch)
130+
except SchemaNotFoundError:
131+
error_msg = f"Schema not found for kind: {kind}."
132+
remediation_msg = "Use the `get_schema_mapping` tool to list available kinds."
133+
return _log_and_return_error(ctx=ctx, error=error_msg, remediation=remediation_msg)
134+
135+
filters = {
136+
f"{attribute.name}__value": schema_attribute_type_mapping.get(attribute.kind, "String")
137+
for attribute in schema.attributes
138+
}
139+
140+
for relationship in schema.relationships:
141+
relationship_schema = await client.schema.get(kind=relationship.peer)
142+
relationship_filters = {
143+
f"{relationship.name}__{attribute.name}__value": schema_attribute_type_mapping.get(attribute.kind, "String")
144+
for attribute in relationship_schema.attributes
145+
}
146+
filters.update(relationship_filters)
147+
148+
return MCPResponse(
149+
status=MCPToolStatus.SUCCESS,
150+
data=filters,
151+
)
152+
153+
154+
@mcp.tool(tags=["nodes", "retrieve"], annotations=ToolAnnotations(readOnlyHint=True))
155+
async def get_related_nodes(
156+
ctx: Context,
157+
kind: Annotated[str, Field(description="Kind of the objects to retrieve.")],
158+
relation: Annotated[str, Field(description="Name of the relation to fetch.")],
159+
filters: Annotated[dict[str, Any] | None, Field(default=None, description="Dictionary of filters to apply.")],
160+
branch: Annotated[
161+
str | None,
162+
Field(default=None, description="Branch to retrieve the objects from. Defaults to None (uses default branch)."),
163+
],
164+
) -> MCPResponse[list[dict[str, Any]]]:
165+
"""Retrieve related nodes by relation name and a kind.
166+
167+
Args:
168+
kind: Kind of the node to fetch.
169+
filters: Filters to apply on the node to fetch.
170+
relation: Name of the relation to fetch.
171+
branch: Branch to fetch the node from. Defaults to None (uses default branch).
172+
173+
Returns:
174+
MCPResponse with success status and objects.
175+
176+
"""
177+
client: InfrahubClient = ctx.request_context.lifespan_context.client
178+
filters = filters or {}
179+
if branch:
180+
ctx.info(f"Fetching nodes related to {kind} with filters {filters} in branch {branch} from Infrahub...")
181+
else:
182+
ctx.info(f"Fetching nodes related to {kind} with filters {filters} from Infrahub...")
183+
184+
try:
185+
node_id = node_hfid = None
186+
if filters.get("ids"):
187+
node_id = filters["ids"][0]
188+
elif filters.get("hfid"):
189+
node_hfid = filters["hfid"]
190+
if node_id:
191+
node = await client.get(
192+
kind=kind,
193+
id=node_id,
194+
branch=branch,
195+
include=[relation],
196+
prefetch_relationships=True,
197+
populate_store=True,
198+
)
199+
elif node_hfid:
200+
node = await client.get(
201+
kind=kind,
202+
hfid=node_hfid,
203+
branch=branch,
204+
include=[relation],
205+
prefetch_relationships=True,
206+
populate_store=True,
207+
)
208+
except Exception as exc: # noqa: BLE001
209+
return _log_and_return_error(exc)
210+
211+
rel = getattr(node, relation, None)
212+
if not rel:
213+
_log_and_return_error(
214+
ctx=ctx,
215+
error=f"Relation '{relation}' not found in kind '{kind}'.",
216+
remediation="Check the schema for the kind to confirm if the relation exists.",
217+
)
218+
peers = [
219+
await convert_node_to_dict(
220+
branch=branch,
221+
obj=peer.peer,
222+
include_id=True,
223+
)
224+
for peer in rel.peers
225+
]
226+
227+
return MCPResponse(
228+
status=MCPToolStatus.SUCCESS,
229+
data=peers,
230+
)

0 commit comments

Comments
 (0)