Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions infrahub_sdk/node/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
HIERARCHY_FETCH_FEATURE_NOT_SUPPORTED_MESSAGE = "Hierarchical fields are not supported for this node."

HFID_STR_SEPARATOR = "__"
PROFILE_KIND_PREFIX = "Profile"
17 changes: 13 additions & 4 deletions infrahub_sdk/node/related_node.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from __future__ import annotations

import re
from typing import TYPE_CHECKING, Any

from ..exceptions import (
Error,
)
from ..exceptions import Error
from ..protocols_base import CoreNodeBase
from .constants import PROPERTIES_FLAG, PROPERTIES_OBJECT
from .constants import PROFILE_KIND_PREFIX, PROPERTIES_FLAG, PROPERTIES_OBJECT

if TYPE_CHECKING:
from ..client import InfrahubClient, InfrahubClientSync
Expand Down Expand Up @@ -40,6 +39,7 @@ def __init__(self, branch: str, schema: RelationshipSchemaAPI, data: Any | dict,
self._display_label: str | None = None
self._typename: str | None = None
self._kind: str | None = None
self._source_typename: str | None = None

if isinstance(data, (CoreNodeBase)):
self._peer = data
Expand Down Expand Up @@ -74,6 +74,8 @@ def __init__(self, branch: str, schema: RelationshipSchemaAPI, data: Any | dict,
prop_data = properties_data.get(prop, properties_data.get(f"_relation__{prop}", None))
if prop_data and isinstance(prop_data, dict) and "id" in prop_data:
setattr(self, prop, prop_data["id"])
if prop == "source" and "__typename" in prop_data:
self._source_typename = prop_data["__typename"]
elif prop_data and isinstance(prop_data, (str, bool)):
setattr(self, prop, prop_data)
else:
Expand Down Expand Up @@ -125,6 +127,13 @@ def kind(self) -> str | None:
return self._peer.get_kind()
return self._kind

@property
def is_from_profile(self) -> bool:
"""Return whether this relationship was set from a profile. Done by checking if the source is of a profile kind."""
if not self._source_typename:
return False
return bool(re.match(rf"^{PROFILE_KIND_PREFIX}[A-Z]", self._source_typename))

def _generate_input_data(self, allocate_from_pool: bool = False) -> dict[str, Any]:
data: dict[str, Any] = {}

Expand Down
8 changes: 8 additions & 0 deletions infrahub_sdk/node/relationship.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ def peer_hfids_str(self) -> list[str]:
def has_update(self) -> bool:
return self._has_update

@property
def is_from_profile(self) -> bool:
"""Return whether this relationship was set from a profile. All its peers must be from a profile."""
if not self.peers:
return False
all_profiles = [p.is_from_profile for p in self.peers]
return bool(all_profiles) and all(all_profiles)

def _generate_input_data(self, allocate_from_pool: bool = False) -> list[dict]:
return [peer._generate_input_data(allocate_from_pool=allocate_from_pool) for peer in self.peers]

Expand Down
123 changes: 123 additions & 0 deletions tests/unit/sdk/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
InfrahubNodeBase,
InfrahubNodeSync,
RelatedNodeBase,
RelationshipManager,
RelationshipManagerBase,
parse_human_friendly_id,
)
Expand Down Expand Up @@ -2623,3 +2624,125 @@ async def test_process_relationships_recursive_deep_nesting(
assert "ip-2" in recursive_ids # From interface-1
assert "ip-3" in recursive_ids # From interface-2
assert len(related_nodes_recursive) == 5 # 2 interfaces + 3 IP addresses


class TestRelatedNodeIsFromProfile:
def test_is_from_profile_when_source_is_profile(self, location_schema) -> None:
data = {
"node": {"id": "test-id", "display_label": "test-tag", "__typename": "BuiltinTag"},
"properties": {
"is_protected": False,
"owner": None,
"source": {"__typename": "ProfileInfraDevice", "display_label": "default-profile", "id": "profile-id"},
},
}
related_node = RelatedNodeBase(branch="main", schema=location_schema.relationships[0], data=data)
assert related_node.is_from_profile

def test_is_from_profile_when_source_is_not_profile(self, location_schema) -> None:
data = {
"node": {"id": "test-id", "display_label": "test-tag", "__typename": "BuiltinTag"},
"properties": {
"is_protected": False,
"owner": None,
"source": {"__typename": "CoreAccount", "display_label": "admin", "id": "account-id"},
},
}
related_node = RelatedNodeBase(branch="main", schema=location_schema.relationships[0], data=data)
assert not related_node.is_from_profile

def test_is_from_profile_when_source_not_queried(self, location_schema) -> None:
data = {
"node": {"id": "test-id", "display_label": "test-tag", "__typename": "BuiltinTag"},
"properties": {"is_protected": False, "owner": None, "source": None},
}
related_node = RelatedNodeBase(branch="main", schema=location_schema.relationships[0], data=data)
assert not related_node.is_from_profile

def test_is_from_profile_when_no_properties(self, location_schema) -> None:
data = {"node": {"id": "test-id", "display_label": "test-tag", "__typename": "BuiltinTag"}}
related_node = RelatedNodeBase(branch="main", schema=location_schema.relationships[0], data=data)
assert not related_node.is_from_profile


class TestRelationshipManagerIsFromProfile:
def test_is_from_profile_when_no_peers(self, location_schema) -> None:
manager = RelationshipManagerBase(name="tags", branch="main", schema=location_schema.relationships[0])
assert not manager.is_from_profile

def test_is_from_profile_when_all_peers_from_profile(self, client, location_schema) -> None:
data = {
"count": 2,
"edges": [
{
"node": {"id": "tag-1", "display_label": "tag1", "__typename": "BuiltinTag"},
"properties": {
"is_protected": False,
"owner": None,
"source": {"__typename": "ProfileInfraDevice", "display_label": "profile1", "id": "profile-1"},
},
},
{
"node": {"id": "tag-2", "display_label": "tag2", "__typename": "BuiltinTag"},
"properties": {
"is_protected": False,
"owner": None,
"source": {"__typename": "ProfileInfraDevice", "display_label": "profile1", "id": "profile-1"},
},
},
],
}
manager = RelationshipManager(
name="tags", client=client, node=None, branch="main", schema=location_schema.relationships[0], data=data
)
assert manager.is_from_profile

def test_is_from_profile_when_any_peer_not_from_profile(self, client, location_schema) -> None:
data = {
"count": 2,
"edges": [
{
"node": {"id": "tag-1", "display_label": "tag1", "__typename": "BuiltinTag"},
"properties": {
"is_protected": False,
"owner": None,
"source": {"__typename": "ProfileInfraDevice", "display_label": "profile1", "id": "profile-1"},
},
},
{
"node": {"id": "tag-2", "display_label": "tag2", "__typename": "BuiltinTag"},
"properties": {
"is_protected": False,
"owner": None,
"source": {"__typename": "CoreAccount", "display_label": "admin", "id": "account-1"},
},
},
],
}
manager = RelationshipManager(
name="tags", client=client, node=None, branch="main", schema=location_schema.relationships[0], data=data
)
assert not manager.is_from_profile

def test_is_from_profile_when_any_peer_has_unknown_source(self, client, location_schema) -> None:
data = {
"count": 2,
"edges": [
{
"node": {"id": "tag-1", "display_label": "tag1", "__typename": "BuiltinTag"},
"properties": {
"is_protected": False,
"owner": None,
"source": {"__typename": "ProfileInfraDevice", "display_label": "profile1", "id": "profile-1"},
},
},
{
"node": {"id": "tag-2", "display_label": "tag2", "__typename": "BuiltinTag"},
"properties": {"is_protected": False, "owner": None, "source": None},
},
],
}
manager = RelationshipManager(
name="tags", client=client, node=None, branch="main", schema=location_schema.relationships[0], data=data
)
assert not manager.is_from_profile