Skip to content

Commit f16e7c8

Browse files
authored
Merge pull request #6433 from opsmill/ajtm-05082025-rel-query-updates
update rel cypher queries for migrated kind nodes
2 parents 94beb5d + 849c1ce commit f16e7c8

File tree

9 files changed

+317
-87
lines changed

9 files changed

+317
-87
lines changed

backend/infrahub/core/query/node.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,9 +413,32 @@ async def query_init(self, db: InfrahubDatabase, **kwargs) -> None: # noqa: ARG
413413
self.params["branch"] = self.branch.name
414414
self.params["branch_level"] = self.branch.hierarchy_level
415415

416+
if self.branch.is_global or self.branch.is_default:
417+
node_query_match = """
418+
MATCH (n:Node { uuid: $uuid })
419+
OPTIONAL MATCH (n)-[delete_edge:IS_PART_OF {status: "deleted", branch: $branch}]->(:Root)
420+
WHERE delete_edge.from <= $at
421+
WITH n WHERE delete_edge IS NULL
422+
"""
423+
else:
424+
node_filter, node_filter_params = self.branch.get_query_filter_path(at=self.at, variable_name="r")
425+
node_query_match = """
426+
MATCH (n:Node { uuid: $uuid })
427+
CALL {
428+
WITH n
429+
MATCH (n)-[r:IS_PART_OF]->(:Root)
430+
WHERE %(node_filter)s
431+
RETURN r.status = "active" AS is_active
432+
ORDER BY r.from DESC
433+
LIMIT 1
434+
}
435+
WITH n WHERE is_active = TRUE
436+
""" % {"node_filter": node_filter}
437+
self.params.update(node_filter_params)
438+
self.add_to_query(node_query_match)
439+
416440
query = """
417441
MATCH (root:Root)
418-
MATCH (n:Node { uuid: $uuid })
419442
CREATE (n)-[r:IS_PART_OF { branch: $branch, branch_level: $branch_level, status: "deleted", from: $at }]->(root)
420443
"""
421444

backend/infrahub/core/query/relationship.py

Lines changed: 78 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -205,50 +205,20 @@ def get_relationship_properties_dict(self, status: RelationshipStatus) -> dict[s
205205
rel_prop_dict["hierarchy"] = self.schema.hierarchical
206206
return rel_prop_dict
207207

208-
209-
class RelationshipCreateQuery(RelationshipQuery):
210-
name = "relationship_create"
211-
212-
type: QueryType = QueryType.WRITE
213-
214-
def __init__(
215-
self,
216-
destination: Node = None,
217-
destination_id: UUID | None = None,
218-
**kwargs,
219-
):
220-
if not destination and not destination_id:
221-
raise ValueError("Either destination or destination_id must be provided.")
222-
223-
super().__init__(destination=destination, destination_id=destination_id, **kwargs)
224-
225-
async def query_init(self, db: InfrahubDatabase, **kwargs) -> None: # noqa: ARG002
226-
self.params["source_id"] = self.source_id
227-
self.params["destination_id"] = self.destination_id
228-
self.params["name"] = self.schema.identifier
229-
self.params["branch_support"] = self.schema.branch.value
230-
231-
self.params["uuid"] = str(UUIDT())
232-
233-
self.params["branch"] = self.branch.name
234-
self.params["branch_level"] = self.branch.hierarchy_level
235-
self.params["at"] = self.at.to_string()
236-
237-
self.params["is_protected"] = self.rel.is_protected
238-
self.params["is_visible"] = self.rel.is_visible
239-
240-
source_branch = self.source.get_branch_based_on_support_type()
208+
def add_source_match_to_query(self, source_branch: Branch) -> None:
209+
self.params["source_id"] = self.source_id or self.source.get_id()
241210
if source_branch.is_global or source_branch.is_default:
242211
source_query_match = """
243212
MATCH (s:Node { uuid: $source_id })
244-
WHERE NOT exists((s)-[:IS_PART_OF {status: "deleted", branch: $source_branch}]->(:Root))
213+
OPTIONAL MATCH (s)-[delete_edge:IS_PART_OF {status: "deleted", branch: $source_branch}]->(:Root)
214+
WHERE delete_edge.from <= $at
215+
WITH *, s WHERE delete_edge IS NULL
245216
"""
246217
self.params["source_branch"] = source_branch.name
247-
else:
248-
source_filter, source_filter_params = source_branch.get_query_filter_path(
249-
at=self.at, variable_name="r", params_prefix="src_"
250-
)
251-
source_query_match = """
218+
source_filter, source_filter_params = source_branch.get_query_filter_path(
219+
at=self.at, variable_name="r", params_prefix="src_"
220+
)
221+
source_query_match = """
252222
MATCH (s:Node { uuid: $source_id })
253223
CALL {
254224
WITH s
@@ -258,16 +228,19 @@ async def query_init(self, db: InfrahubDatabase, **kwargs) -> None: # noqa: ARG
258228
ORDER BY r.from DESC
259229
LIMIT 1
260230
}
261-
WITH s WHERE s_is_active = TRUE
231+
WITH *, s WHERE s_is_active = TRUE
262232
""" % {"source_filter": source_filter}
263-
self.params.update(source_filter_params)
233+
self.params.update(source_filter_params)
264234
self.add_to_query(source_query_match)
265235

266-
destination_branch = self.destination.get_branch_based_on_support_type()
236+
def add_dest_match_to_query(self, destination_branch: Branch, destination_id: str) -> None:
237+
self.params["destination_id"] = destination_id
267238
if destination_branch.is_global or destination_branch.is_default:
268239
destination_query_match = """
269240
MATCH (d:Node { uuid: $destination_id })
270-
WHERE NOT exists((d)-[:IS_PART_OF {status: "deleted", branch: $destination_branch}]->(:Root))
241+
OPTIONAL MATCH (d)-[delete_edge:IS_PART_OF {status: "deleted", branch: $destination_branch}]->(:Root)
242+
WHERE delete_edge.from <= $at
243+
WITH *, d WHERE delete_edge IS NULL
271244
"""
272245
self.params["destination_branch"] = destination_branch.name
273246
else:
@@ -284,11 +257,46 @@ async def query_init(self, db: InfrahubDatabase, **kwargs) -> None: # noqa: ARG
284257
ORDER BY r.from DESC
285258
LIMIT 1
286259
}
287-
WITH s, d WHERE d_is_active = TRUE
260+
WITH *, d WHERE d_is_active = TRUE
288261
""" % {"destination_filter": destination_filter}
289262
self.params.update(destination_filter_params)
290263
self.add_to_query(destination_query_match)
291264

265+
266+
class RelationshipCreateQuery(RelationshipQuery):
267+
name = "relationship_create"
268+
269+
type: QueryType = QueryType.WRITE
270+
271+
def __init__(
272+
self,
273+
destination: Node = None,
274+
destination_id: UUID | None = None,
275+
**kwargs,
276+
):
277+
if not destination and not destination_id:
278+
raise ValueError("Either destination or destination_id must be provided.")
279+
280+
super().__init__(destination=destination, destination_id=destination_id, **kwargs)
281+
282+
async def query_init(self, db: InfrahubDatabase, **kwargs) -> None: # noqa: ARG002
283+
self.params["name"] = self.schema.identifier
284+
self.params["branch_support"] = self.schema.branch.value
285+
286+
self.params["uuid"] = str(UUIDT())
287+
288+
self.params["branch"] = self.branch.name
289+
self.params["branch_level"] = self.branch.hierarchy_level
290+
self.params["at"] = self.at.to_string()
291+
292+
self.params["is_protected"] = self.rel.is_protected
293+
self.params["is_visible"] = self.rel.is_visible
294+
295+
self.add_source_match_to_query(source_branch=self.source.get_branch_based_on_support_type())
296+
self.add_dest_match_to_query(
297+
destination_branch=self.destination.get_branch_based_on_support_type(),
298+
destination_id=self.destination_id or self.destination.get_id(),
299+
)
292300
self.query_add_all_node_property_match()
293301

294302
self.params["rel_prop"] = self.get_relationship_properties_dict(status=RelationshipStatus.ACTIVE)
@@ -433,7 +441,6 @@ def __init__(
433441

434442
async def query_init(self, db: InfrahubDatabase, **kwargs) -> None: # noqa: ARG002
435443
self.params["source_id"] = self.source_id
436-
self.params["destination_id"] = self.data.peer_id
437444
self.params["rel_node_id"] = self.data.rel_node_id
438445
self.params["name"] = self.schema.identifier
439446
self.params["branch"] = self.branch.name
@@ -443,9 +450,10 @@ async def query_init(self, db: InfrahubDatabase, **kwargs) -> None: # noqa: ARG
443450
# -----------------------------------------------------------------------
444451
# Match all nodes, including properties
445452
# -----------------------------------------------------------------------
453+
454+
self.add_source_match_to_query(source_branch=self.source.get_branch_based_on_support_type())
455+
self.add_dest_match_to_query(destination_branch=self.branch, destination_id=self.data.peer_id)
446456
query = """
447-
MATCH (s:Node { uuid: $source_id })
448-
MATCH (d:Node { uuid: $destination_id })
449457
MATCH (rl:Relationship { uuid: $rel_node_id })
450458
"""
451459
self.add_to_query(query)
@@ -497,8 +505,6 @@ def __init__(self, **kwargs):
497505

498506
async def query_init(self, db: InfrahubDatabase, **kwargs) -> None: # noqa: ARG002
499507
rel_filter, rel_params = self.branch.get_query_filter_path(at=self.at, variable_name="edge")
500-
self.params["source_id"] = self.source_id
501-
self.params["destination_id"] = self.destination_id
502508
self.params["rel_id"] = self.rel.id
503509
self.params["branch"] = self.branch.name
504510
self.params["rel_prop"] = self.get_relationship_properties_dict(status=RelationshipStatus.DELETED)
@@ -509,9 +515,14 @@ async def query_init(self, db: InfrahubDatabase, **kwargs) -> None: # noqa: ARG
509515
r1 = f"{arrows.left.start}[r1:{self.rel_type} $rel_prop ]{arrows.left.end}"
510516
r2 = f"{arrows.right.start}[r2:{self.rel_type} $rel_prop ]{arrows.right.end}"
511517

518+
self.add_source_match_to_query(source_branch=self.source.get_branch_based_on_support_type())
519+
self.add_dest_match_to_query(
520+
destination_branch=self.destination.get_branch_based_on_support_type(),
521+
destination_id=self.destination_id or self.destination.get_id(),
522+
)
512523
query = """
513-
MATCH (s:Node { uuid: $source_id })-[:IS_RELATED]-(rl:Relationship {uuid: $rel_id})-[:IS_RELATED]-(d:Node { uuid: $destination_id })
514-
WITH s, rl, d
524+
MATCH (s)-[:IS_RELATED]-(rl:Relationship {uuid: $rel_id})-[:IS_RELATED]-(d)
525+
WITH DISTINCT s, rl, d
515526
LIMIT 1
516527
CREATE (s)%(r1)s(rl)
517528
CREATE (rl)%(r2)s(d)
@@ -853,8 +864,6 @@ class RelationshipGetQuery(RelationshipQuery):
853864
type: QueryType = QueryType.READ
854865

855866
async def query_init(self, db: InfrahubDatabase, **kwargs) -> None: # noqa: ARG002
856-
self.params["source_id"] = self.source_id
857-
self.params["destination_id"] = self.destination_id
858867
self.params["name"] = self.schema.identifier
859868
self.params["branch"] = self.branch.name
860869

@@ -868,9 +877,12 @@ async def query_init(self, db: InfrahubDatabase, **kwargs) -> None: # noqa: ARG
868877
r1 = f"{arrows.left.start}[r1:{self.rel.rel_type}]{arrows.left.end}"
869878
r2 = f"{arrows.right.start}[r2:{self.rel.rel_type}]{arrows.right.end}"
870879

880+
self.add_source_match_to_query(source_branch=self.source.get_branch_based_on_support_type())
881+
self.add_dest_match_to_query(
882+
destination_branch=self.destination.get_branch_based_on_support_type(),
883+
destination_id=self.destination_id or self.destination.get_id(),
884+
)
871885
query = """
872-
MATCH (s:Node { uuid: $source_id })
873-
MATCH (d:Node { uuid: $destination_id })
874886
MATCH (s)%s(rl:Relationship { name: $name })%s(d)
875887
WHERE %s
876888
""" % (
@@ -1097,7 +1109,11 @@ async def query_init(self, db: InfrahubDatabase, **kwargs) -> None: # noqa: ARG
10971109
CALL {
10981110
WITH rl
10991111
MATCH (rl)-[active_edge:IS_RELATED]->(n)
1100-
WHERE %(active_rel_filter)s AND active_edge.status ="active"
1112+
WHERE %(active_rel_filter)s
1113+
WITH rl, active_edge, n
1114+
ORDER BY %(id_func)s(rl), %(id_func)s(n), active_edge.from DESC
1115+
WITH rl, n, head(collect(active_edge)) AS active_edge
1116+
WHERE active_edge.status = "active"
11011117
CREATE (rl)-[deleted_edge:IS_RELATED $rel_prop]->(n)
11021118
SET deleted_edge.hierarchy = active_edge.hierarchy
11031119
WITH rl, active_edge, n
@@ -1113,7 +1129,11 @@ async def query_init(self, db: InfrahubDatabase, **kwargs) -> None: # noqa: ARG
11131129
11141130
WITH rl
11151131
MATCH (rl)<-[active_edge:IS_RELATED]-(n)
1116-
WHERE %(active_rel_filter)s AND active_edge.status ="active"
1132+
WHERE %(active_rel_filter)s
1133+
WITH rl, active_edge, n
1134+
ORDER BY %(id_func)s(rl), %(id_func)s(n), active_edge.from DESC
1135+
WITH rl, n, head(collect(active_edge)) AS active_edge
1136+
WHERE active_edge.status = "active"
11171137
CREATE (rl)<-[deleted_edge:IS_RELATED $rel_prop]-(n)
11181138
SET deleted_edge.hierarchy = active_edge.hierarchy
11191139
WITH rl, active_edge, n
@@ -1126,9 +1146,7 @@ async def query_init(self, db: InfrahubDatabase, **kwargs) -> None: # noqa: ARG
11261146
"inbound" as rel_direction
11271147
}
11281148
RETURN DISTINCT uuid, kind, rel_identifier, rel_direction
1129-
""" % {
1130-
"active_rel_filter": active_rel_filter,
1131-
}
1149+
""" % {"active_rel_filter": active_rel_filter, "id_func": db.get_id_function_name()}
11321150

11331151
self.add_to_query(query)
11341152

backend/infrahub/core/relationship/model.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ async def delete(self, db: InfrahubDatabase, at: Timestamp | None = None) -> Non
416416
await update_relationships_to(rel_ids_to_update, to=delete_at, db=db)
417417

418418
delete_query = await RelationshipDeleteQuery.init(
419-
db=db, rel=self, source_id=node.id, destination_id=peer.id, branch=branch, at=delete_at
419+
db=db, rel=self, source=node, destination=peer, branch=branch, at=delete_at
420420
)
421421
await delete_query.execute(db=db)
422422

backend/tests/helpers/db_validation.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,31 @@ async def validate_node_relationships(node: Node, branch: Branch, db: InfrahubDa
102102
for result in query.results:
103103
print(result)
104104
assert len(result.data) == 1 and result.data[0] == "Edges state is correct"
105+
106+
107+
async def verify_no_duplicate_paths(db: InfrahubDatabase) -> None:
108+
"""Verify that no duplicate paths exist at the database level"""
109+
query = """
110+
MATCH path = (p)-[e]->(q)
111+
WITH
112+
%(id_func)s(p) AS node_id1,
113+
e.branch AS branch,
114+
e.from AS from_time,
115+
type(e) AS edge_type,
116+
%(id_func)s(q) AS node_id2,
117+
path
118+
WITH node_id1, branch, from_time, edge_type, node_id2, size(collect(path)) AS num_paths
119+
WHERE num_paths > 1
120+
RETURN node_id1, branch, from_time, edge_type, node_id2, num_paths
121+
""" % {"id_func": db.get_id_function_name()}
122+
records = await db.execute_query(query=query)
123+
for record in records:
124+
node_id1 = record.get("node_id1")
125+
branch = record.get("branch")
126+
from_time = record.get("from_time")
127+
edge_type = record.get("edge_type")
128+
node_id2 = record.get("node_id2")
129+
num_paths = record.get("num_paths")
130+
raise ValueError(
131+
f"{num_paths} paths ({branch=},{edge_type=},{from_time=}) between nodes '{node_id1}' and '{node_id2}'"
132+
)

backend/tests/unit/core/diff/test_diff_and_merge.py

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,35 +21,13 @@
2121
from infrahub.core.timestamp import Timestamp
2222
from infrahub.database import InfrahubDatabase
2323
from infrahub.dependencies.registry import get_component_registry
24+
from tests.helpers.db_validation import verify_no_duplicate_paths
2425
from tests.unit.conftest import _build_hierarchical_location_data
2526
from tests.unit.core.test_utils import verify_all_linked_edges_deleted
2627

2728
from .get_one_node import get_one_diff_node
2829

2930

30-
async def verify_no_duplicate_paths(db: InfrahubDatabase) -> None:
31-
"""Verify that no duplicate paths exist at the database level"""
32-
query = """
33-
MATCH path = (p)-[e]->(q)
34-
WITH COALESCE(p.uuid, p.value) AS node_id1, e.branch AS branch, e.from AS from_time, type(e) AS edge_type, COALESCE(q.uuid, q.value) AS node_id2, path
35-
WHERE node_id1 IS NOT NULL AND node_id2 IS NOT NULL
36-
WITH node_id1, branch, from_time, edge_type, node_id2, size(collect(path)) AS num_paths
37-
WHERE num_paths > 1
38-
RETURN node_id1, branch, from_time, edge_type, node_id2, num_paths
39-
"""
40-
records = await db.execute_query(query=query)
41-
for record in records:
42-
node_id1 = record.get("node_id1")
43-
branch = record.get("branch")
44-
from_time = record.get("from_time")
45-
edge_type = record.get("edge_type")
46-
node_id2 = record.get("node_id2")
47-
num_paths = record.get("num_paths")
48-
raise ValueError(
49-
f"{num_paths} paths ({branch=},{edge_type=},{from_time=}) between nodes '{node_id1}' and '{node_id2}'"
50-
)
51-
52-
5331
class TestDiffAndMerge:
5432
@pytest.fixture
5533
async def diff_repository(self, db: InfrahubDatabase, default_branch: Branch) -> DiffRepository:

0 commit comments

Comments
 (0)