Skip to content

Commit 4ce87be

Browse files
authored
migration to clean up duplicated edges (#5955)
* migration to clean up duplicated edges * make migrations idempotent * consolidate migration code
1 parent eda425b commit 4ce87be

File tree

6 files changed

+285
-9
lines changed

6 files changed

+285
-9
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
GRAPH_VERSION = 19
1+
GRAPH_VERSION = 20

backend/infrahub/core/migrations/graph/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from .m017_add_core_profile import Migration017
2222
from .m018_uniqueness_nulls import Migration018
2323
from .m019_restore_rels_to_time import Migration019
24+
from .m020_duplicate_edges import Migration020
2425

2526
if TYPE_CHECKING:
2627
from infrahub.core.root import Root
@@ -47,6 +48,7 @@
4748
Migration017,
4849
Migration018,
4950
Migration019,
51+
Migration020,
5052
]
5153

5254

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
from __future__ import annotations
2+
3+
from typing import TYPE_CHECKING, Any, Sequence
4+
5+
from infrahub.core.constants.database import DatabaseEdgeType
6+
from infrahub.core.migrations.shared import GraphMigration, MigrationResult
7+
from infrahub.log import get_logger
8+
9+
from ...query import Query, QueryType
10+
11+
if TYPE_CHECKING:
12+
from infrahub.database import InfrahubDatabase
13+
14+
log = get_logger()
15+
16+
17+
class DeleteDuplicateHasValueEdgesQuery(Query):
18+
name = "delete_duplicate_has_value_edges"
19+
type = QueryType.WRITE
20+
insert_return = False
21+
22+
async def query_init(self, db: InfrahubDatabase, **kwargs: dict[str, Any]) -> None:
23+
query = """
24+
// -------------------
25+
// find Attribute nodes with multiple identical edges to AttributeValue nodes with the same value
26+
// -------------------
27+
MATCH (a:Attribute)-[e:HAS_VALUE]->(av:AttributeValue)
28+
WITH a, e.branch AS branch, e.branch_level AS branch_level, e.status AS status, e.from AS from, e.to AS to,
29+
av.value AS attr_val, av.is_default AS attr_default, COUNT(*) AS num_duplicate_edges
30+
WHERE num_duplicate_edges > 1
31+
// -------------------
32+
// get the the one AttributeValue we want to use
33+
// -------------------
34+
WITH DISTINCT a, branch, branch_level, status, from, to, attr_val, attr_default
35+
WITH attr_val, attr_default, collect([a, branch, branch_level, status, from, to]) AS details_list
36+
CALL {
37+
WITH attr_val, attr_default
38+
MATCH (av:AttributeValue {value: attr_val, is_default: attr_default})
39+
RETURN av AS the_one_av
40+
ORDER by %(id_func)s(av) ASC
41+
LIMIT 1
42+
}
43+
UNWIND details_list AS details_item
44+
WITH attr_val, attr_default, the_one_av,
45+
details_item[0] AS a, details_item[1] AS branch, details_item[2] AS branch_level,
46+
details_item[3] AS status, details_item[4] AS from, details_item[5] AS to
47+
// -------------------
48+
// get/create the one edge to keep
49+
// -------------------
50+
CREATE (a)-[fresh_e:HAS_VALUE {branch: branch, branch_level: branch_level, status: status, from: from}]->(the_one_av)
51+
SET fresh_e.to = to
52+
WITH a, branch, status, from, to, attr_val, attr_default, %(id_func)s(fresh_e) AS e_id_to_keep
53+
// -------------------
54+
// get the identical edges for a given set of Attribute node, edge properties, AttributeValue.value
55+
// -------------------
56+
CALL {
57+
// -------------------
58+
// delete the duplicate edges a given set of Attribute node, edge properties, AttributeValue.value
59+
// -------------------
60+
WITH a, branch, status, from, to, attr_val, attr_default, e_id_to_keep
61+
MATCH (a)-[e:HAS_VALUE]->(av:AttributeValue {value: attr_val, is_default: attr_default})
62+
WHERE %(id_func)s(e) <> e_id_to_keep
63+
AND e.branch = branch AND e.status = status AND e.from = from
64+
AND (e.to = to OR (e.to IS NULL AND to IS NULL))
65+
DELETE e
66+
}
67+
// -------------------
68+
// delete any orphaned AttributeValue nodes
69+
// -------------------
70+
WITH NULL AS nothing
71+
LIMIT 1
72+
MATCH (orphaned_av:AttributeValue)
73+
WHERE NOT exists((orphaned_av)-[]-())
74+
DELETE orphaned_av
75+
""" % {"id_func": db.get_id_function_name()}
76+
self.add_to_query(query)
77+
78+
79+
class DeleteDuplicateBooleanEdgesQuery(Query):
80+
name = "delete_duplicate_booleans_edges"
81+
type = QueryType.WRITE
82+
insert_return = False
83+
edge_type: DatabaseEdgeType | None = None
84+
85+
async def query_init(self, db: InfrahubDatabase, **kwargs: dict[str, Any]) -> None:
86+
if not self.edge_type:
87+
raise RuntimeError("edge_type is required for this query")
88+
query = """
89+
// -------------------
90+
// find Attribute nodes with multiple identical edges to Boolean nodes
91+
// -------------------
92+
MATCH (a:Attribute)-[e:%(edge_type)s]->(b)
93+
WITH a, e.branch AS branch, e.branch_level AS branch_level, e.status AS status, e.from AS from, e.to AS to, b, COUNT(*) AS num_duplicate_edges
94+
WHERE num_duplicate_edges > 1
95+
// -------------------
96+
// get the identical edges for a given set of Attribute node, edge properties, Boolean
97+
// -------------------
98+
WITH DISTINCT a, branch, branch_level, status, from, to, b
99+
CREATE (a)-[fresh_e:%(edge_type)s {branch: branch, branch_level: branch_level, status: status, from: from}]->(b)
100+
SET fresh_e.to = to
101+
WITH a, branch, status, from, to, b, %(id_func)s(fresh_e) AS e_id_to_keep
102+
CALL {
103+
WITH a, branch, status, from, to, b, e_id_to_keep
104+
MATCH (a)-[e:%(edge_type)s]->(b)
105+
WHERE %(id_func)s(e) <> e_id_to_keep
106+
AND e.branch = branch AND e.status = status AND e.from = from
107+
AND (e.to = to OR (e.to IS NULL AND to IS NULL))
108+
DELETE e
109+
}
110+
""" % {"edge_type": self.edge_type.value, "id_func": db.get_id_function_name()}
111+
self.add_to_query(query)
112+
113+
114+
class DeleteDuplicateIsVisibleEdgesQuery(DeleteDuplicateBooleanEdgesQuery):
115+
name = "delete_duplicate_is_visible_edges"
116+
type = QueryType.WRITE
117+
insert_return = False
118+
edge_type = DatabaseEdgeType.IS_VISIBLE
119+
120+
121+
class DeleteDuplicateIsProtectedEdgesQuery(DeleteDuplicateBooleanEdgesQuery):
122+
name = "delete_duplicate_is_protected_edges"
123+
type = QueryType.WRITE
124+
insert_return = False
125+
edge_type = DatabaseEdgeType.IS_PROTECTED
126+
127+
128+
class Migration020(GraphMigration):
129+
"""
130+
1. Find duplicate edges. These can be duplicated if multiple AttributeValue nodes with the same value exist b/c of concurrent
131+
database updates.
132+
a. (a:Attribute)-[e:HAS_VALUE]->(av:AttributeValue)
133+
grouped by (a, e.branch, e.from, e.to, e.status, av.value, av.is_default) to determine the number of duplicates.
134+
b. (a:Attribute)-[e:HAS_VALUE]->(b:Boolean)
135+
grouped by (a, e.branch, e.from, e.status, b) to determine the number of duplicates.
136+
2. For a given set of duplicate edges
137+
a. delete all of the duplicate edges
138+
b. merge one edge with the properties of the deleted edges
139+
3. If there are any orphaned AttributeValue nodes after these changes, then delete them
140+
141+
This migration does not account for consolidating duplicated AttributeValue nodes because more might be created
142+
in the future due to concurrent database updates. A migration to consolidate duplicated AttributeValue nodes
143+
should be run when we find a way to stop duplicate AttributeValue nodes from being created
144+
"""
145+
146+
name: str = "020_delete_duplicate_edges"
147+
minimum_version: int = 19
148+
queries: Sequence[type[Query]] = [
149+
DeleteDuplicateHasValueEdgesQuery,
150+
DeleteDuplicateIsVisibleEdgesQuery,
151+
DeleteDuplicateIsProtectedEdgesQuery,
152+
]
153+
154+
async def execute(self, db: InfrahubDatabase) -> MigrationResult:
155+
# skip the transaction b/c it will run out of memory on a large database
156+
return await self.do_execute(db=db)
157+
158+
async def validate_migration(self, db: InfrahubDatabase) -> MigrationResult:
159+
result = MigrationResult()
160+
return result

backend/infrahub/core/migrations/shared.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -120,15 +120,17 @@ async def validate_migration(self, db: InfrahubDatabase) -> MigrationResult:
120120

121121
async def execute(self, db: InfrahubDatabase) -> MigrationResult:
122122
async with db.start_transaction() as ts:
123-
result = MigrationResult()
123+
return await self.do_execute(db=ts)
124124

125-
for migration_query in self.queries:
126-
try:
127-
query = await migration_query.init(db=ts)
128-
await query.execute(db=ts)
129-
except Exception as exc: # pylint: disable=broad-exception-caught
130-
result.errors.append(str(exc))
131-
return result
125+
async def do_execute(self, db: InfrahubDatabase) -> MigrationResult:
126+
result = MigrationResult()
127+
for migration_query in self.queries:
128+
try:
129+
query = await migration_query.init(db=db)
130+
await query.execute(db=db)
131+
except Exception as exc: # pylint: disable=broad-exception-caught
132+
result.errors.append(str(exc))
133+
return result
132134

133135
return result
134136

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
from infrahub.core.manager import NodeManager
2+
from infrahub.core.migrations.graph.m020_duplicate_edges import Migration020
3+
from infrahub.core.node import Node
4+
from infrahub.core.schema.schema_branch import SchemaBranch
5+
from infrahub.core.timestamp import Timestamp
6+
from infrahub.database import InfrahubDatabase
7+
8+
9+
class TestDuplicateEdgesDeleted:
10+
the_value = 122
11+
12+
async def test_duplicate_edges_migration(self, db: InfrahubDatabase, car_person_schema: SchemaBranch) -> None:
13+
unchanged_node = await Node.init(db=db, schema="TestPerson")
14+
await unchanged_node.new(db=db, name="Unchanged", height=self.the_value)
15+
await unchanged_node.save(db=db)
16+
17+
# create 4 duplicate AttributeValue nodes
18+
query = """
19+
UNWIND [1,2,3,4] AS i
20+
CREATE (:AttributeValue {value: $value, is_default: False})
21+
"""
22+
await db.execute_query(query=query, params={"value": self.the_value})
23+
24+
node_to_update = await Node.init(db=db, schema="TestPerson")
25+
await node_to_update.new(db=db, name="Update", height=self.the_value + 1)
26+
await node_to_update.save(db=db)
27+
28+
node_to_delete = await Node.init(db=db, schema="TestPerson")
29+
await node_to_delete.new(db=db, name="Delete", height=self.the_value)
30+
await node_to_delete.save(db=db)
31+
32+
# add duplicate edges
33+
query = """
34+
MATCH (n:Node)
35+
WHERE n.uuid IN $uuids
36+
MATCH (n)-[:HAS_ATTRIBUTE]->(a:Attribute {name: $attribute_name})
37+
MATCH (a)-[e:HAS_VALUE]->(:AttributeValue {value: $the_value})
38+
MATCH (av:AttributeValue {value: $the_value})
39+
WHERE NOT exists((a)-[:HAS_VALUE]->(av))
40+
CREATE (a)-[duplicate_e:HAS_VALUE]->(av)
41+
SET duplicate_e = properties(e)
42+
WITH a
43+
CALL {
44+
WITH a
45+
MATCH (a)-[ve:IS_VISIBLE]->(v)
46+
WITH a, ve, v
47+
LIMIT 1
48+
CREATE (a)-[new_ve:IS_VISIBLE]->(v)
49+
SET new_ve = properties(ve)
50+
WITH a
51+
MATCH (a)-[pe:IS_PROTECTED]->(p)
52+
WITH a, pe, p
53+
LIMIT 1
54+
CREATE (a)-[new_pe:IS_PROTECTED]->(p)
55+
SET new_pe = properties(pe)
56+
}
57+
"""
58+
await db.execute_query(
59+
query=query,
60+
params={
61+
"uuids": [unchanged_node.get_id(), node_to_update.get_id(), node_to_delete.get_id()],
62+
"attribute_name": "height",
63+
"the_value": self.the_value,
64+
},
65+
)
66+
67+
# make the node changes
68+
node_to_update.height.value = self.the_value
69+
await node_to_update.save(db=db)
70+
before_delete = Timestamp()
71+
await node_to_delete.delete(db=db)
72+
73+
# run the migration
74+
migration = Migration020()
75+
await migration.execute(db=db)
76+
await migration.validate_migration(db=db)
77+
78+
# validate no duplicate edges
79+
for node in (unchanged_node, node_to_update, node_to_delete):
80+
await self._validate_no_duplicate_edges(db=db, node=node, attribute_name="height")
81+
82+
# validate nodes are in correct state
83+
retrieved_unchanged_node = await NodeManager.get_one(db=db, id=unchanged_node.id)
84+
assert retrieved_unchanged_node.name.value == unchanged_node.name.value
85+
assert retrieved_unchanged_node.height.value == unchanged_node.height.value
86+
retrieved_updated_node = await NodeManager.get_one(db=db, id=node_to_update.id)
87+
assert retrieved_updated_node.name.value == node_to_update.name.value
88+
assert retrieved_updated_node.height.value == node_to_update.height.value
89+
assert await NodeManager.get_one(db=db, id=node_to_delete.id) is None
90+
retrieved_deleted_node = await NodeManager.get_one(db=db, id=node_to_delete.id, at=before_delete)
91+
assert retrieved_deleted_node.name.value == node_to_delete.name.value
92+
assert retrieved_deleted_node.height.value == node_to_delete.height.value
93+
94+
async def _validate_no_duplicate_edges(self, db: InfrahubDatabase, node: Node, attribute_name: str) -> None:
95+
# validate that this node
96+
# - does not have duplicate HAS_VALUE, IS_VISIBLE, or IS_PROTECTED edges
97+
# - only connects to one AttributeValue node even though multiple exist
98+
params = {"node_id": node.get_id(), "attribute_name": attribute_name}
99+
query = """
100+
MATCH (:Node {uuid: $node_id})-[:HAS_ATTRIBUTE]->(a:Attribute {name: $attribute_name})
101+
WITH a
102+
LIMIT 1
103+
MATCH (a)-[e]-(p)
104+
RETURN a, type(e) AS edge_type, e.branch AS branch, e.status AS status, e.from AS from, e.to AS to, p.value AS value, COUNT(*) AS num_edges
105+
"""
106+
results = await db.execute_query(query=query, params=params)
107+
for result in results:
108+
edge_type = result.get("edge_type")
109+
value = result.get("value")
110+
num_edges = result.get("num_edges")
111+
assert num_edges == 1, f"{node.get_id()} has {num_edges} duplicate {edge_type} edges with {value=}"
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Remove duplicated edges that could have been added to the database during concurrent updates

0 commit comments

Comments
 (0)