diff --git a/infrahub_sdk/node/constants.py b/infrahub_sdk/node/constants.py index d474bdb0..f96fce6a 100644 --- a/infrahub_sdk/node/constants.py +++ b/infrahub_sdk/node/constants.py @@ -20,3 +20,4 @@ HIERARCHY_FETCH_FEATURE_NOT_SUPPORTED_MESSAGE = "Hierarchical fields are not supported for this node." HFID_STR_SEPARATOR = "__" +PROFILE_KIND_PREFIX = "Profile" diff --git a/infrahub_sdk/node/related_node.py b/infrahub_sdk/node/related_node.py index daa9726b..1bf1307e 100644 --- a/infrahub_sdk/node/related_node.py +++ b/infrahub_sdk/node/related_node.py @@ -1,12 +1,11 @@ from __future__ import annotations +import re from typing import TYPE_CHECKING, Any -from ..exceptions import ( - Error, -) +from ..exceptions import Error from ..protocols_base import CoreNodeBase -from .constants import PROPERTIES_FLAG, PROPERTIES_OBJECT +from .constants import PROFILE_KIND_PREFIX, PROPERTIES_FLAG, PROPERTIES_OBJECT if TYPE_CHECKING: from ..client import InfrahubClient, InfrahubClientSync @@ -40,6 +39,7 @@ def __init__(self, branch: str, schema: RelationshipSchemaAPI, data: Any | dict, self._display_label: str | None = None self._typename: str | None = None self._kind: str | None = None + self._source_typename: str | None = None if isinstance(data, (CoreNodeBase)): self._peer = data @@ -74,6 +74,8 @@ def __init__(self, branch: str, schema: RelationshipSchemaAPI, data: Any | dict, prop_data = properties_data.get(prop, properties_data.get(f"_relation__{prop}", None)) if prop_data and isinstance(prop_data, dict) and "id" in prop_data: setattr(self, prop, prop_data["id"]) + if prop == "source" and "__typename" in prop_data: + self._source_typename = prop_data["__typename"] elif prop_data and isinstance(prop_data, (str, bool)): setattr(self, prop, prop_data) else: @@ -125,6 +127,13 @@ def kind(self) -> str | None: return self._peer.get_kind() return self._kind + @property + def is_from_profile(self) -> bool: + """Return whether this relationship was set from a profile. Done by checking if the source is of a profile kind.""" + if not self._source_typename: + return False + return bool(re.match(rf"^{PROFILE_KIND_PREFIX}[A-Z]", self._source_typename)) + def _generate_input_data(self, allocate_from_pool: bool = False) -> dict[str, Any]: data: dict[str, Any] = {} diff --git a/infrahub_sdk/node/relationship.py b/infrahub_sdk/node/relationship.py index 6dcc66ce..757bb875 100644 --- a/infrahub_sdk/node/relationship.py +++ b/infrahub_sdk/node/relationship.py @@ -56,6 +56,14 @@ def peer_hfids_str(self) -> list[str]: def has_update(self) -> bool: return self._has_update + @property + def is_from_profile(self) -> bool: + """Return whether this relationship was set from a profile. All its peers must be from a profile.""" + if not self.peers: + return False + all_profiles = [p.is_from_profile for p in self.peers] + return bool(all_profiles) and all(all_profiles) + def _generate_input_data(self, allocate_from_pool: bool = False) -> list[dict]: return [peer._generate_input_data(allocate_from_pool=allocate_from_pool) for peer in self.peers] diff --git a/tests/unit/sdk/test_node.py b/tests/unit/sdk/test_node.py index c0c9e9bf..f9e6b8bc 100644 --- a/tests/unit/sdk/test_node.py +++ b/tests/unit/sdk/test_node.py @@ -11,6 +11,7 @@ InfrahubNodeBase, InfrahubNodeSync, RelatedNodeBase, + RelationshipManager, RelationshipManagerBase, parse_human_friendly_id, ) @@ -2623,3 +2624,125 @@ async def test_process_relationships_recursive_deep_nesting( assert "ip-2" in recursive_ids # From interface-1 assert "ip-3" in recursive_ids # From interface-2 assert len(related_nodes_recursive) == 5 # 2 interfaces + 3 IP addresses + + +class TestRelatedNodeIsFromProfile: + def test_is_from_profile_when_source_is_profile(self, location_schema) -> None: + data = { + "node": {"id": "test-id", "display_label": "test-tag", "__typename": "BuiltinTag"}, + "properties": { + "is_protected": False, + "owner": None, + "source": {"__typename": "ProfileInfraDevice", "display_label": "default-profile", "id": "profile-id"}, + }, + } + related_node = RelatedNodeBase(branch="main", schema=location_schema.relationships[0], data=data) + assert related_node.is_from_profile + + def test_is_from_profile_when_source_is_not_profile(self, location_schema) -> None: + data = { + "node": {"id": "test-id", "display_label": "test-tag", "__typename": "BuiltinTag"}, + "properties": { + "is_protected": False, + "owner": None, + "source": {"__typename": "CoreAccount", "display_label": "admin", "id": "account-id"}, + }, + } + related_node = RelatedNodeBase(branch="main", schema=location_schema.relationships[0], data=data) + assert not related_node.is_from_profile + + def test_is_from_profile_when_source_not_queried(self, location_schema) -> None: + data = { + "node": {"id": "test-id", "display_label": "test-tag", "__typename": "BuiltinTag"}, + "properties": {"is_protected": False, "owner": None, "source": None}, + } + related_node = RelatedNodeBase(branch="main", schema=location_schema.relationships[0], data=data) + assert not related_node.is_from_profile + + def test_is_from_profile_when_no_properties(self, location_schema) -> None: + data = {"node": {"id": "test-id", "display_label": "test-tag", "__typename": "BuiltinTag"}} + related_node = RelatedNodeBase(branch="main", schema=location_schema.relationships[0], data=data) + assert not related_node.is_from_profile + + +class TestRelationshipManagerIsFromProfile: + def test_is_from_profile_when_no_peers(self, location_schema) -> None: + manager = RelationshipManagerBase(name="tags", branch="main", schema=location_schema.relationships[0]) + assert not manager.is_from_profile + + def test_is_from_profile_when_all_peers_from_profile(self, client, location_schema) -> None: + data = { + "count": 2, + "edges": [ + { + "node": {"id": "tag-1", "display_label": "tag1", "__typename": "BuiltinTag"}, + "properties": { + "is_protected": False, + "owner": None, + "source": {"__typename": "ProfileInfraDevice", "display_label": "profile1", "id": "profile-1"}, + }, + }, + { + "node": {"id": "tag-2", "display_label": "tag2", "__typename": "BuiltinTag"}, + "properties": { + "is_protected": False, + "owner": None, + "source": {"__typename": "ProfileInfraDevice", "display_label": "profile1", "id": "profile-1"}, + }, + }, + ], + } + manager = RelationshipManager( + name="tags", client=client, node=None, branch="main", schema=location_schema.relationships[0], data=data + ) + assert manager.is_from_profile + + def test_is_from_profile_when_any_peer_not_from_profile(self, client, location_schema) -> None: + data = { + "count": 2, + "edges": [ + { + "node": {"id": "tag-1", "display_label": "tag1", "__typename": "BuiltinTag"}, + "properties": { + "is_protected": False, + "owner": None, + "source": {"__typename": "ProfileInfraDevice", "display_label": "profile1", "id": "profile-1"}, + }, + }, + { + "node": {"id": "tag-2", "display_label": "tag2", "__typename": "BuiltinTag"}, + "properties": { + "is_protected": False, + "owner": None, + "source": {"__typename": "CoreAccount", "display_label": "admin", "id": "account-1"}, + }, + }, + ], + } + manager = RelationshipManager( + name="tags", client=client, node=None, branch="main", schema=location_schema.relationships[0], data=data + ) + assert not manager.is_from_profile + + def test_is_from_profile_when_any_peer_has_unknown_source(self, client, location_schema) -> None: + data = { + "count": 2, + "edges": [ + { + "node": {"id": "tag-1", "display_label": "tag1", "__typename": "BuiltinTag"}, + "properties": { + "is_protected": False, + "owner": None, + "source": {"__typename": "ProfileInfraDevice", "display_label": "profile1", "id": "profile-1"}, + }, + }, + { + "node": {"id": "tag-2", "display_label": "tag2", "__typename": "BuiltinTag"}, + "properties": {"is_protected": False, "owner": None, "source": None}, + }, + ], + } + manager = RelationshipManager( + name="tags", client=client, node=None, branch="main", schema=location_schema.relationships[0], data=data + ) + assert not manager.is_from_profile