|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +from dataclasses import dataclass |
| 4 | +from typing import TYPE_CHECKING, Sequence |
| 5 | + |
| 6 | +from infrahub_sdk.utils import str_to_bool |
| 7 | + |
| 8 | +from infrahub.core.constants import DiffAction, RelationshipCardinality |
| 9 | +from infrahub.core.constants.database import DatabaseEdgeType |
| 10 | + |
| 11 | +from .models import ( |
| 12 | + AttributeChangelog, |
| 13 | + NodeChangelog, |
| 14 | + RelationshipCardinalityManyChangelog, |
| 15 | + RelationshipCardinalityOneChangelog, |
| 16 | + RelationshipPeerChangelog, |
| 17 | +) |
| 18 | + |
| 19 | +if TYPE_CHECKING: |
| 20 | + from infrahub.core.branch import Branch |
| 21 | + from infrahub.core.diff.model.path import ( |
| 22 | + EnrichedDiffAttribute, |
| 23 | + EnrichedDiffNode, |
| 24 | + EnrichedDiffRelationship, |
| 25 | + EnrichedDiffRoot, |
| 26 | + ) |
| 27 | + from infrahub.core.schema import MainSchemaTypes |
| 28 | + from infrahub.database import InfrahubDatabase |
| 29 | + |
| 30 | + |
| 31 | +@dataclass |
| 32 | +class NodeInDiff: |
| 33 | + node_id: str |
| 34 | + kind: str |
| 35 | + label: str |
| 36 | + |
| 37 | + |
| 38 | +class DiffChangelogCollector: |
| 39 | + def __init__(self, diff: EnrichedDiffRoot, branch: Branch, db: InfrahubDatabase) -> None: |
| 40 | + self._diff = diff |
| 41 | + self._branch = branch |
| 42 | + self._db = db |
| 43 | + self._diff_nodes: dict[str, NodeInDiff] |
| 44 | + |
| 45 | + def _populate_diff_nodes(self) -> None: |
| 46 | + self._diff_nodes = { |
| 47 | + node.uuid: NodeInDiff(node_id=node.uuid, kind=node.kind, label=node.label) for node in self._diff.nodes |
| 48 | + } |
| 49 | + |
| 50 | + def get_node(self, node_id: str) -> NodeInDiff: |
| 51 | + return self._diff_nodes[node_id] |
| 52 | + |
| 53 | + def _process_node(self, node: EnrichedDiffNode) -> NodeChangelog: |
| 54 | + node_changelog = NodeChangelog(node_id=node.uuid, node_kind=node.kind, display_label=node.label) |
| 55 | + schema = self._db.schema.get(node_changelog.node_kind, branch=self._branch, duplicate=False) |
| 56 | + for attribute in node.attributes: |
| 57 | + self._process_node_attribute(node=node_changelog, attribute=attribute, schema=schema) |
| 58 | + |
| 59 | + for relationship in node.relationships: |
| 60 | + self._process_node_relationship(node=node_changelog, relationship=relationship) |
| 61 | + |
| 62 | + return node_changelog |
| 63 | + |
| 64 | + def _process_node_attribute( |
| 65 | + self, node: NodeChangelog, attribute: EnrichedDiffAttribute, schema: MainSchemaTypes |
| 66 | + ) -> None: |
| 67 | + schema_attribute = schema.get_attribute(name=attribute.name) |
| 68 | + changelog_attribute = AttributeChangelog(name=attribute.name, kind=schema_attribute.kind) |
| 69 | + for attr_property in attribute.properties: |
| 70 | + match attr_property.property_type: |
| 71 | + case DatabaseEdgeType.HAS_VALUE: |
| 72 | + # TODO deserialize correct value type from string |
| 73 | + changelog_attribute.value = attr_property.new_value |
| 74 | + changelog_attribute.value_previous = attr_property.previous_value |
| 75 | + case DatabaseEdgeType.IS_PROTECTED: |
| 76 | + changelog_attribute.add_property( |
| 77 | + name="is_protected", |
| 78 | + value_current=self._convert_string_boolean_value(value=attr_property.new_value), |
| 79 | + value_previous=self._convert_string_boolean_value(value=attr_property.previous_value), |
| 80 | + ) |
| 81 | + case DatabaseEdgeType.IS_VISIBLE: |
| 82 | + changelog_attribute.add_property( |
| 83 | + name="is_visible", |
| 84 | + value_current=self._convert_string_boolean_value(value=attr_property.new_value), |
| 85 | + value_previous=self._convert_string_boolean_value(value=attr_property.previous_value), |
| 86 | + ) |
| 87 | + case DatabaseEdgeType.HAS_SOURCE: |
| 88 | + changelog_attribute.add_property( |
| 89 | + name="source", |
| 90 | + value_current=attr_property.new_value, |
| 91 | + value_previous=attr_property.previous_value, |
| 92 | + ) |
| 93 | + case DatabaseEdgeType.HAS_OWNER: |
| 94 | + changelog_attribute.add_property( |
| 95 | + name="owner", |
| 96 | + value_current=attr_property.new_value, |
| 97 | + value_previous=attr_property.previous_value, |
| 98 | + ) |
| 99 | + |
| 100 | + node.attributes[attribute.name] = changelog_attribute |
| 101 | + |
| 102 | + def _process_node_relationship(self, node: NodeChangelog, relationship: EnrichedDiffRelationship) -> None: |
| 103 | + match relationship.cardinality: |
| 104 | + case RelationshipCardinality.ONE: |
| 105 | + self._process_node_cardinality_one_relationship(node=node, relationship=relationship) |
| 106 | + |
| 107 | + case RelationshipCardinality.MANY: |
| 108 | + self._process_node_cardinality_many_relationship(node=node, relationship=relationship) |
| 109 | + |
| 110 | + def _process_node_cardinality_one_relationship( |
| 111 | + self, node: NodeChangelog, relationship: EnrichedDiffRelationship |
| 112 | + ) -> None: |
| 113 | + changelog_rel = RelationshipCardinalityOneChangelog(name=relationship.name) |
| 114 | + for entry in relationship.relationships: |
| 115 | + for rel_prop in entry.properties: |
| 116 | + match rel_prop.property_type: |
| 117 | + case DatabaseEdgeType.IS_RELATED: |
| 118 | + if rel_prop.new_value: |
| 119 | + changelog_rel.peer_id = rel_prop.new_value |
| 120 | + changelog_rel.peer_kind = self.get_node(node_id=rel_prop.new_value).kind |
| 121 | + if rel_prop.previous_value: |
| 122 | + changelog_rel.peer_id_previous = rel_prop.previous_value |
| 123 | + changelog_rel.peer_kind_previous = self.get_node(node_id=rel_prop.previous_value).kind |
| 124 | + case DatabaseEdgeType.IS_PROTECTED: |
| 125 | + changelog_rel.add_property( |
| 126 | + name="is_protected", |
| 127 | + value_current=self._convert_string_boolean_value(value=rel_prop.new_value), |
| 128 | + value_previous=self._convert_string_boolean_value(value=rel_prop.previous_value), |
| 129 | + ) |
| 130 | + case DatabaseEdgeType.IS_VISIBLE: |
| 131 | + changelog_rel.add_property( |
| 132 | + name="is_visible", |
| 133 | + value_current=self._convert_string_boolean_value(value=rel_prop.new_value), |
| 134 | + value_previous=self._convert_string_boolean_value(value=rel_prop.previous_value), |
| 135 | + ) |
| 136 | + case DatabaseEdgeType.HAS_OWNER: |
| 137 | + changelog_rel.add_property( |
| 138 | + name="owner", |
| 139 | + value_current=rel_prop.new_value, |
| 140 | + value_previous=rel_prop.previous_value, |
| 141 | + ) |
| 142 | + case DatabaseEdgeType.HAS_SOURCE: |
| 143 | + changelog_rel.add_property( |
| 144 | + name="source", |
| 145 | + value_current=rel_prop.new_value, |
| 146 | + value_previous=rel_prop.previous_value, |
| 147 | + ) |
| 148 | + |
| 149 | + node.add_relationship(relationship=changelog_rel) |
| 150 | + |
| 151 | + def _convert_string_boolean_value(self, value: str | None) -> bool | None: |
| 152 | + """Convert string based boolean for is_protected and is_visible.""" |
| 153 | + if value is not None: |
| 154 | + return str_to_bool(value) |
| 155 | + |
| 156 | + return None |
| 157 | + |
| 158 | + def _process_node_cardinality_many_relationship( |
| 159 | + self, node: NodeChangelog, relationship: EnrichedDiffRelationship |
| 160 | + ) -> None: |
| 161 | + changelog_rel = RelationshipCardinalityManyChangelog(name=relationship.name) |
| 162 | + for peer in relationship.relationships: |
| 163 | + peer_log = RelationshipPeerChangelog( |
| 164 | + peer_id=peer.peer_id, peer_kind=self.get_node(node_id=peer.peer_id).kind, peer_status=peer.action |
| 165 | + ) |
| 166 | + for peer_prop in peer.properties: |
| 167 | + match peer_prop.property_type: |
| 168 | + case DatabaseEdgeType.IS_VISIBLE: |
| 169 | + peer_log.add_property( |
| 170 | + name="is_visible", |
| 171 | + value_current=self._convert_string_boolean_value(value=peer_prop.new_value), |
| 172 | + value_previous=self._convert_string_boolean_value(value=peer_prop.previous_value), |
| 173 | + ) |
| 174 | + case DatabaseEdgeType.IS_PROTECTED: |
| 175 | + peer_log.add_property( |
| 176 | + name="is_protected", |
| 177 | + value_current=self._convert_string_boolean_value(value=peer_prop.new_value), |
| 178 | + value_previous=self._convert_string_boolean_value(value=peer_prop.previous_value), |
| 179 | + ) |
| 180 | + case DatabaseEdgeType.HAS_OWNER: |
| 181 | + peer_log.add_property( |
| 182 | + name="owner", |
| 183 | + value_current=peer_prop.new_value, |
| 184 | + value_previous=peer_prop.previous_value, |
| 185 | + ) |
| 186 | + case DatabaseEdgeType.HAS_SOURCE: |
| 187 | + peer_log.add_property( |
| 188 | + name="source", |
| 189 | + value_current=peer_prop.new_value, |
| 190 | + value_previous=peer_prop.previous_value, |
| 191 | + ) |
| 192 | + |
| 193 | + changelog_rel.peers.append(peer_log) |
| 194 | + |
| 195 | + node.add_relationship(relationship=changelog_rel) |
| 196 | + |
| 197 | + def collect_changelogs(self) -> Sequence[tuple[DiffAction, NodeChangelog]]: |
| 198 | + self._populate_diff_nodes() |
| 199 | + changelogs = [ |
| 200 | + (node.action, self._process_node(node=node)) |
| 201 | + for node in self._diff.nodes |
| 202 | + if node.action != DiffAction.UNCHANGED |
| 203 | + ] |
| 204 | + return changelogs |
0 commit comments