Skip to content

Commit a627edb

Browse files
committed
Split node.py into multiple files
1 parent 380e573 commit a627edb

File tree

9 files changed

+850
-752
lines changed

9 files changed

+850
-752
lines changed

infrahub_sdk/node/__init__.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from __future__ import annotations
2+
3+
from .constants import HFID_STR_SEPARATOR
4+
from .node import InfrahubNode, InfrahubNodeBase, InfrahubNodeSync
5+
from .related_node import RelatedNode, RelatedNodeBase, RelatedNodeSync
6+
from .relationship import RelationshipManager, RelationshipManagerBase, RelationshipManagerSync
7+
8+
__all__ = [
9+
"InfrahubNode",
10+
"InfrahubNodeBase",
11+
"InfrahubNodeSync",
12+
"RelatedNode",
13+
"RelatedNodeBase",
14+
"RelatedNodeSync",
15+
"RelationshipManager",
16+
"RelationshipManagerBase",
17+
"RelationshipManagerSync",
18+
]
19+
20+
21+
def parse_human_friendly_id(hfid: str | list[str]) -> tuple[str | None, list[str]]:
22+
"""Parse a human friendly ID into a kind and an identifier."""
23+
if isinstance(hfid, str):
24+
hfid_parts = hfid.split(HFID_STR_SEPARATOR)
25+
if len(hfid_parts) == 1:
26+
return None, hfid_parts
27+
return hfid_parts[0], hfid_parts[1:]
28+
if isinstance(hfid, list):
29+
return None, hfid
30+
raise ValueError(f"Invalid human friendly ID: {hfid}")

infrahub_sdk/node/attribute.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
from __future__ import annotations
2+
3+
import ipaddress
4+
from typing import TYPE_CHECKING, Any, Callable, get_args
5+
6+
from ..protocols_base import CoreNodeBase
7+
from ..uuidt import UUIDT
8+
from .constants import IP_TYPES, PROPERTIES_FLAG, PROPERTIES_OBJECT, SAFE_VALUE
9+
from .property import NodeProperty
10+
11+
if TYPE_CHECKING:
12+
from ..schema import AttributeSchemaAPI
13+
14+
15+
class Attribute:
16+
"""Represents an attribute of a Node, including its schema, value, and properties."""
17+
18+
def __init__(self, name: str, schema: AttributeSchemaAPI, data: Any | dict):
19+
"""
20+
Args:
21+
name (str): The name of the attribute.
22+
schema (AttributeSchema): The schema defining the attribute.
23+
data (Union[Any, dict]): The data for the attribute, either in raw form or as a dictionary.
24+
"""
25+
self.name = name
26+
self._schema = schema
27+
28+
if not isinstance(data, dict) or "value" not in data.keys():
29+
data = {"value": data}
30+
31+
self._properties_flag = PROPERTIES_FLAG
32+
self._properties_object = PROPERTIES_OBJECT
33+
self._properties = self._properties_flag + self._properties_object
34+
35+
self._read_only = ["updated_at", "is_inherited"]
36+
37+
self.id: str | None = data.get("id", None)
38+
39+
self._value: Any | None = data.get("value", None)
40+
self.value_has_been_mutated = False
41+
self.is_default: bool | None = data.get("is_default", None)
42+
self.is_from_profile: bool | None = data.get("is_from_profile", None)
43+
44+
if self._value:
45+
value_mapper: dict[str, Callable] = {
46+
"IPHost": ipaddress.ip_interface,
47+
"IPNetwork": ipaddress.ip_network,
48+
}
49+
mapper = value_mapper.get(schema.kind, lambda value: value)
50+
self._value = mapper(data.get("value"))
51+
52+
self.is_inherited: bool | None = data.get("is_inherited", None)
53+
self.updated_at: str | None = data.get("updated_at", None)
54+
55+
self.is_visible: bool | None = data.get("is_visible", None)
56+
self.is_protected: bool | None = data.get("is_protected", None)
57+
58+
self.source: NodeProperty | None = None
59+
self.owner: NodeProperty | None = None
60+
61+
for prop_name in self._properties_object:
62+
if data.get(prop_name):
63+
setattr(self, prop_name, NodeProperty(data=data.get(prop_name))) # type: ignore[arg-type]
64+
65+
@property
66+
def value(self) -> Any:
67+
return self._value
68+
69+
@value.setter
70+
def value(self, value: Any) -> None:
71+
self._value = value
72+
self.value_has_been_mutated = True
73+
74+
def _generate_input_data(self) -> dict | None:
75+
data: dict[str, Any] = {}
76+
variables: dict[str, Any] = {}
77+
78+
if self.value is None:
79+
return data
80+
81+
if isinstance(self.value, str):
82+
if SAFE_VALUE.match(self.value):
83+
data["value"] = self.value
84+
else:
85+
var_name = f"value_{UUIDT.new().hex}"
86+
variables[var_name] = self.value
87+
data["value"] = f"${var_name}"
88+
elif isinstance(self.value, get_args(IP_TYPES)):
89+
data["value"] = self.value.with_prefixlen
90+
elif isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool():
91+
data["from_pool"] = {"id": self.value.id}
92+
else:
93+
data["value"] = self.value
94+
95+
for prop_name in self._properties_flag:
96+
if getattr(self, prop_name) is not None:
97+
data[prop_name] = getattr(self, prop_name)
98+
99+
for prop_name in self._properties_object:
100+
if getattr(self, prop_name) is not None:
101+
data[prop_name] = getattr(self, prop_name)._generate_input_data()
102+
103+
return {"data": data, "variables": variables}
104+
105+
def _generate_query_data(self, property: bool = False) -> dict | None:
106+
data: dict[str, Any] = {"value": None}
107+
108+
if property:
109+
data.update({"is_default": None, "is_from_profile": None})
110+
111+
for prop_name in self._properties_flag:
112+
data[prop_name] = None
113+
for prop_name in self._properties_object:
114+
data[prop_name] = {"id": None, "display_label": None, "__typename": None}
115+
116+
return data
117+
118+
def _generate_mutation_query(self) -> dict[str, Any]:
119+
if isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool():
120+
# If it points to a pool, ask for the value of the pool allocated resource
121+
return {self.name: {"value": None}}
122+
return {}

infrahub_sdk/node/constants.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import ipaddress
2+
import re
3+
from typing import Union
4+
5+
PROPERTIES_FLAG = ["is_visible", "is_protected"]
6+
PROPERTIES_OBJECT = ["source", "owner"]
7+
SAFE_VALUE = re.compile(r"(^[\. /:a-zA-Z0-9_-]+$)|(^$)")
8+
9+
IP_TYPES = Union[ipaddress.IPv4Interface, ipaddress.IPv6Interface, ipaddress.IPv4Network, ipaddress.IPv6Network]
10+
11+
ARTIFACT_FETCH_FEATURE_NOT_SUPPORTED_MESSAGE = (
12+
"calling artifact_fetch is only supported for nodes that are Artifact Definition target"
13+
)
14+
ARTIFACT_GENERATE_FEATURE_NOT_SUPPORTED_MESSAGE = (
15+
"calling artifact_generate is only supported for nodes that are Artifact Definition targets"
16+
)
17+
ARTIFACT_DEFINITION_GENERATE_FEATURE_NOT_SUPPORTED_MESSAGE = (
18+
"calling generate is only supported for CoreArtifactDefinition nodes"
19+
)
20+
21+
HFID_STR_SEPARATOR = "__"

0 commit comments

Comments
 (0)