|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +from typing import TYPE_CHECKING, Any, Sequence |
| 4 | + |
| 5 | +from infrahub.core.constants.database import DatabaseEdgeType |
| 6 | +from infrahub.core.migrations.shared import GraphMigration, MigrationResult |
| 7 | +from infrahub.log import get_logger |
| 8 | + |
| 9 | +from ...query import Query, QueryType |
| 10 | + |
| 11 | +if TYPE_CHECKING: |
| 12 | + from infrahub.database import InfrahubDatabase |
| 13 | + |
| 14 | +log = get_logger() |
| 15 | + |
| 16 | + |
| 17 | +class DeleteDuplicateHasValueEdgesQuery(Query): |
| 18 | + name = "delete_duplicate_has_value_edges" |
| 19 | + type = QueryType.WRITE |
| 20 | + insert_return = False |
| 21 | + |
| 22 | + async def query_init(self, db: InfrahubDatabase, **kwargs: dict[str, Any]) -> None: # noqa: ARG002 |
| 23 | + query = """ |
| 24 | +// ------------------- |
| 25 | +// find Attribute nodes with multiple identical edges to AttributeValue nodes with the same value |
| 26 | +// ------------------- |
| 27 | +MATCH (a:Attribute)-[e:HAS_VALUE]->(av:AttributeValue) |
| 28 | +WITH a, e.branch AS branch, e.branch_level AS branch_level, e.status AS status, e.from AS from, e.to AS to, |
| 29 | + av.value AS attr_val, av.is_default AS attr_default, COUNT(*) AS num_duplicate_edges |
| 30 | +WHERE num_duplicate_edges > 1 |
| 31 | +// ------------------- |
| 32 | +// get the the one AttributeValue we want to use |
| 33 | +// ------------------- |
| 34 | +WITH DISTINCT a, branch, branch_level, status, from, to, attr_val, attr_default |
| 35 | +WITH attr_val, attr_default, collect([a, branch, branch_level, status, from, to]) AS details_list |
| 36 | +CALL { |
| 37 | + WITH attr_val, attr_default |
| 38 | + MATCH (av:AttributeValue {value: attr_val, is_default: attr_default}) |
| 39 | + RETURN av AS the_one_av |
| 40 | + ORDER by %(id_func)s(av) ASC |
| 41 | + LIMIT 1 |
| 42 | +} |
| 43 | +UNWIND details_list AS details_item |
| 44 | +WITH attr_val, attr_default, the_one_av, |
| 45 | + details_item[0] AS a, details_item[1] AS branch, details_item[2] AS branch_level, |
| 46 | + details_item[3] AS status, details_item[4] AS from, details_item[5] AS to |
| 47 | +// ------------------- |
| 48 | +// get/create the one edge to keep |
| 49 | +// ------------------- |
| 50 | +CREATE (a)-[fresh_e:HAS_VALUE {branch: branch, branch_level: branch_level, status: status, from: from}]->(the_one_av) |
| 51 | +SET fresh_e.to = to |
| 52 | +WITH a, branch, status, from, to, attr_val, attr_default, %(id_func)s(fresh_e) AS e_id_to_keep |
| 53 | +// ------------------- |
| 54 | +// get the identical edges for a given set of Attribute node, edge properties, AttributeValue.value |
| 55 | +// ------------------- |
| 56 | +CALL { |
| 57 | + // ------------------- |
| 58 | + // delete the duplicate edges a given set of Attribute node, edge properties, AttributeValue.value |
| 59 | + // ------------------- |
| 60 | + WITH a, branch, status, from, to, attr_val, attr_default, e_id_to_keep |
| 61 | + MATCH (a)-[e:HAS_VALUE]->(av:AttributeValue {value: attr_val, is_default: attr_default}) |
| 62 | + WHERE %(id_func)s(e) <> e_id_to_keep |
| 63 | + AND e.branch = branch AND e.status = status AND e.from = from |
| 64 | + AND (e.to = to OR (e.to IS NULL AND to IS NULL)) |
| 65 | + DELETE e |
| 66 | +} |
| 67 | +// ------------------- |
| 68 | +// delete any orphaned AttributeValue nodes |
| 69 | +// ------------------- |
| 70 | +WITH NULL AS nothing |
| 71 | +LIMIT 1 |
| 72 | +MATCH (orphaned_av:AttributeValue) |
| 73 | +WHERE NOT exists((orphaned_av)-[]-()) |
| 74 | +DELETE orphaned_av |
| 75 | + """ % {"id_func": db.get_id_function_name()} |
| 76 | + self.add_to_query(query) |
| 77 | + |
| 78 | + |
| 79 | +class DeleteDuplicateBooleanEdgesQuery(Query): |
| 80 | + name = "delete_duplicate_booleans_edges" |
| 81 | + type = QueryType.WRITE |
| 82 | + insert_return = False |
| 83 | + edge_type: DatabaseEdgeType | None = None |
| 84 | + |
| 85 | + async def query_init(self, db: InfrahubDatabase, **kwargs: dict[str, Any]) -> None: # noqa: ARG002 |
| 86 | + if not self.edge_type: |
| 87 | + raise RuntimeError("edge_type is required for this query") |
| 88 | + query = """ |
| 89 | +// ------------------- |
| 90 | +// find Attribute nodes with multiple identical edges to Boolean nodes |
| 91 | +// ------------------- |
| 92 | +MATCH (a:Attribute)-[e:%(edge_type)s]->(b) |
| 93 | +WITH a, e.branch AS branch, e.branch_level AS branch_level, e.status AS status, e.from AS from, e.to AS to, b, COUNT(*) AS num_duplicate_edges |
| 94 | +WHERE num_duplicate_edges > 1 |
| 95 | +// ------------------- |
| 96 | +// get the identical edges for a given set of Attribute node, edge properties, Boolean |
| 97 | +// ------------------- |
| 98 | +WITH DISTINCT a, branch, branch_level, status, from, to, b |
| 99 | +CREATE (a)-[fresh_e:%(edge_type)s {branch: branch, branch_level: branch_level, status: status, from: from}]->(b) |
| 100 | +SET fresh_e.to = to |
| 101 | +WITH a, branch, status, from, to, b, %(id_func)s(fresh_e) AS e_id_to_keep |
| 102 | +CALL { |
| 103 | + WITH a, branch, status, from, to, b, e_id_to_keep |
| 104 | + MATCH (a)-[e:%(edge_type)s]->(b) |
| 105 | + WHERE %(id_func)s(e) <> e_id_to_keep |
| 106 | + AND e.branch = branch AND e.status = status AND e.from = from |
| 107 | + AND (e.to = to OR (e.to IS NULL AND to IS NULL)) |
| 108 | + DELETE e |
| 109 | +} |
| 110 | + """ % {"edge_type": self.edge_type.value, "id_func": db.get_id_function_name()} |
| 111 | + self.add_to_query(query) |
| 112 | + |
| 113 | + |
| 114 | +class DeleteDuplicateIsVisibleEdgesQuery(DeleteDuplicateBooleanEdgesQuery): |
| 115 | + name = "delete_duplicate_is_visible_edges" |
| 116 | + type = QueryType.WRITE |
| 117 | + insert_return = False |
| 118 | + edge_type = DatabaseEdgeType.IS_VISIBLE |
| 119 | + |
| 120 | + |
| 121 | +class DeleteDuplicateIsProtectedEdgesQuery(DeleteDuplicateBooleanEdgesQuery): |
| 122 | + name = "delete_duplicate_is_protected_edges" |
| 123 | + type = QueryType.WRITE |
| 124 | + insert_return = False |
| 125 | + edge_type = DatabaseEdgeType.IS_PROTECTED |
| 126 | + |
| 127 | + |
| 128 | +class Migration020(GraphMigration): |
| 129 | + """ |
| 130 | + 1. Find duplicate edges. These can be duplicated if multiple AttributeValue nodes with the same value exist b/c of concurrent |
| 131 | + database updates. |
| 132 | + a. (a:Attribute)-[e:HAS_VALUE]->(av:AttributeValue) |
| 133 | + grouped by (a, e.branch, e.from, e.to, e.status, av.value, av.is_default) to determine the number of duplicates. |
| 134 | + b. (a:Attribute)-[e:HAS_VALUE]->(b:Boolean) |
| 135 | + grouped by (a, e.branch, e.from, e.status, b) to determine the number of duplicates. |
| 136 | + 2. For a given set of duplicate edges |
| 137 | + a. delete all of the duplicate edges |
| 138 | + b. merge one edge with the properties of the deleted edges |
| 139 | + 3. If there are any orphaned AttributeValue nodes after these changes, then delete them |
| 140 | +
|
| 141 | + This migration does not account for consolidating duplicated AttributeValue nodes because more might be created |
| 142 | + in the future due to concurrent database updates. A migration to consolidate duplicated AttributeValue nodes |
| 143 | + should be run when we find a way to stop duplicate AttributeValue nodes from being created |
| 144 | + """ |
| 145 | + |
| 146 | + name: str = "020_delete_duplicate_edges" |
| 147 | + minimum_version: int = 19 |
| 148 | + queries: Sequence[type[Query]] = [ |
| 149 | + DeleteDuplicateHasValueEdgesQuery, |
| 150 | + DeleteDuplicateIsVisibleEdgesQuery, |
| 151 | + DeleteDuplicateIsProtectedEdgesQuery, |
| 152 | + ] |
| 153 | + |
| 154 | + async def execute(self, db: InfrahubDatabase) -> MigrationResult: |
| 155 | + # skip the transaction b/c it will run out of memory on a large database |
| 156 | + return await self.do_execute(db=db) |
| 157 | + |
| 158 | + async def validate_migration(self, db: InfrahubDatabase) -> MigrationResult: # noqa: ARG002 |
| 159 | + result = MigrationResult() |
| 160 | + return result |
0 commit comments