|
| 1 | +from ..models import EdgeToDelete, EdgeToUpdate, PatchPlan |
| 2 | +from .base import PatchQuery |
| 3 | + |
| 4 | + |
| 5 | +class DeleteDuplicatedEdgesPatchQuery(PatchQuery): |
| 6 | + """ |
| 7 | + Find duplicated or overlapping edges of the same status, type, and branch to update and delete |
| 8 | + - one edge will be kept for each pair of nodes and a given status, type, and branch. it will be |
| 9 | + updated to have the earliest "from" and "to" times in this group |
| 10 | + - all the other duplicate/overlapping edges will be deleted |
| 11 | + """ |
| 12 | + |
| 13 | + @property |
| 14 | + def name(self) -> str: |
| 15 | + return "delete-duplicated-edges" |
| 16 | + |
| 17 | + async def plan(self) -> PatchPlan: |
| 18 | + query = """ |
| 19 | +// ------------ |
| 20 | +// Find node pairs that have duplicate edges |
| 21 | +// ------------ |
| 22 | +MATCH (node_with_dup_edges:Node)-[edge]->(peer) |
| 23 | +WITH node_with_dup_edges, type(edge) AS edge_type, edge.status AS edge_status, edge.branch AS edge_branch, peer, count(*) AS num_dup_edges |
| 24 | +WHERE num_dup_edges > 1 |
| 25 | +WITH DISTINCT node_with_dup_edges, edge_type, edge_branch, peer |
| 26 | +CALL { |
| 27 | + // ------------ |
| 28 | + // Get the earliest active and deleted edges for this branch |
| 29 | + // ------------ |
| 30 | + WITH node_with_dup_edges, edge_type, edge_branch, peer |
| 31 | + MATCH (node_with_dup_edges)-[active_edge {branch: edge_branch, status: "active"}]->(peer) |
| 32 | + WHERE type(active_edge) = edge_type |
| 33 | + WITH node_with_dup_edges, edge_type, edge_branch, peer, active_edge |
| 34 | + ORDER BY active_edge.from ASC |
| 35 | + WITH node_with_dup_edges, edge_type, edge_branch, peer, head(collect(active_edge.from)) AS active_from |
| 36 | + OPTIONAL MATCH (node_with_dup_edges)-[deleted_edge {branch: edge_branch, status: "deleted"}]->(peer) |
| 37 | + WITH node_with_dup_edges, edge_type, edge_branch, peer, active_from, deleted_edge |
| 38 | + ORDER BY deleted_edge.from ASC |
| 39 | + WITH node_with_dup_edges, edge_type, edge_branch, peer, active_from, head(collect(deleted_edge.from)) AS deleted_from |
| 40 | + // ------------ |
| 41 | + // Plan one active edge update with correct from and to times |
| 42 | + // ------------ |
| 43 | + CALL { |
| 44 | + WITH node_with_dup_edges, edge_type, edge_branch, peer, active_from, deleted_from |
| 45 | + MATCH (node_with_dup_edges)-[active_e {branch: edge_branch, status: "active"}]->(peer) |
| 46 | + WHERE type(active_e) = edge_type |
| 47 | + WITH node_with_dup_edges, edge_type, edge_branch, peer, active_from, deleted_from, active_e |
| 48 | + ORDER BY %(id_func_name)s(active_e) |
| 49 | + LIMIT 1 |
| 50 | + WITH active_e, properties(active_e) AS before_props, {from: active_from, to: deleted_from} AS prop_updates |
| 51 | + RETURN [ |
| 52 | + { |
| 53 | + db_id: %(id_func_name)s(active_e), before_props: before_props, prop_updates: prop_updates |
| 54 | + } |
| 55 | + ] AS active_edges_to_update |
| 56 | + } |
| 57 | + // ------------ |
| 58 | + // Plan deletes for all the other active edges of this type on this branch |
| 59 | + // ------------ |
| 60 | + CALL { |
| 61 | + WITH node_with_dup_edges, edge_type, edge_branch, peer |
| 62 | + MATCH (node_with_dup_edges)-[active_e {branch: edge_branch, status: "active"}]->(peer) |
| 63 | + WHERE type(active_e) = edge_type |
| 64 | + WITH node_with_dup_edges, peer, active_e |
| 65 | + ORDER BY %(id_func_name)s(active_e) |
| 66 | + SKIP 1 |
| 67 | + RETURN collect( |
| 68 | + { |
| 69 | + db_id: %(id_func_name)s(active_e), |
| 70 | + from_id: %(id_func_name)s(node_with_dup_edges), |
| 71 | + to_id: %(id_func_name)s(peer), |
| 72 | + edge_type: type(active_e), |
| 73 | + before_props: properties(active_e) |
| 74 | + } |
| 75 | + ) AS active_edges_to_delete |
| 76 | + } |
| 77 | + // ------------ |
| 78 | + // Plan one deleted edge update with correct from time |
| 79 | + // ------------ |
| 80 | + CALL { |
| 81 | + WITH node_with_dup_edges, edge_type, edge_branch, peer, deleted_from |
| 82 | + MATCH (node_with_dup_edges)-[deleted_e {branch: edge_branch, status: "deleted"}]->(peer) |
| 83 | + WHERE type(deleted_e) = edge_type |
| 84 | + WITH node_with_dup_edges, edge_type, edge_branch, peer, deleted_from, deleted_e |
| 85 | + ORDER BY %(id_func_name)s(deleted_e) |
| 86 | + LIMIT 1 |
| 87 | + WITH deleted_e, properties(deleted_e) AS before_props, {from: deleted_from} AS prop_updates |
| 88 | + RETURN [ |
| 89 | + { |
| 90 | + db_id: %(id_func_name)s(deleted_e), before_props: before_props, prop_updates: prop_updates |
| 91 | + } |
| 92 | + ] AS deleted_edges_to_update |
| 93 | + } |
| 94 | + // ------------ |
| 95 | + // Plan deletes for all the other deleted edges of this type on this branch |
| 96 | + // ------------ |
| 97 | + CALL { |
| 98 | + WITH node_with_dup_edges, edge_type, edge_branch, peer |
| 99 | + MATCH (node_with_dup_edges)-[deleted_e {branch: edge_branch, status: "deleted"}]->(peer) |
| 100 | + WHERE type(deleted_e) = edge_type |
| 101 | + WITH node_with_dup_edges, peer, deleted_e |
| 102 | + ORDER BY %(id_func_name)s(deleted_e) |
| 103 | + SKIP 1 |
| 104 | + RETURN collect( |
| 105 | + { |
| 106 | + db_id: %(id_func_name)s(deleted_e), |
| 107 | + from_id: %(id_func_name)s(node_with_dup_edges), |
| 108 | + to_id: %(id_func_name)s(peer), |
| 109 | + edge_type: type(deleted_e), |
| 110 | + before_props: properties(deleted_e) |
| 111 | + } |
| 112 | + ) AS deleted_edges_to_delete |
| 113 | + } |
| 114 | + RETURN |
| 115 | + active_edges_to_update + deleted_edges_to_update AS edges_to_update, |
| 116 | + active_edges_to_delete + deleted_edges_to_delete AS edges_to_delete |
| 117 | +} |
| 118 | +RETURN edges_to_update, edges_to_delete |
| 119 | + """ % {"id_func_name": self.db.get_id_function_name()} |
| 120 | + results = await self.db.execute_query(query=query) |
| 121 | + edges_to_delete: list[EdgeToDelete] = [] |
| 122 | + edges_to_update: list[EdgeToUpdate] = [] |
| 123 | + for result in results: |
| 124 | + for serial_edge_to_delete in result.get("edges_to_delete"): |
| 125 | + edge_to_delete = EdgeToDelete(**serial_edge_to_delete) |
| 126 | + edges_to_delete.append(edge_to_delete) |
| 127 | + for serial_edge_to_update in result.get("edges_to_update"): |
| 128 | + prop_updates = serial_edge_to_update["prop_updates"] |
| 129 | + if prop_updates: |
| 130 | + serial_edge_to_update["after_props"] = serial_edge_to_update["before_props"] | prop_updates |
| 131 | + del serial_edge_to_update["prop_updates"] |
| 132 | + edge_to_update = EdgeToUpdate(**serial_edge_to_update) |
| 133 | + edges_to_update.append(edge_to_update) |
| 134 | + return PatchPlan( |
| 135 | + name=self.name, |
| 136 | + edges_to_delete=edges_to_delete, |
| 137 | + edges_to_update=edges_to_update, |
| 138 | + ) |
0 commit comments