diff --git a/src/ops/storages/neo4j.rs b/src/ops/storages/neo4j.rs index 757de3f35..bad7fc09f 100644 --- a/src/ops/storages/neo4j.rs +++ b/src/ops/storages/neo4j.rs @@ -853,12 +853,9 @@ fn build_composite_field_names(qualifier: &str, field_names: &[String]) -> Strin format!("({})", strs) } } -#[derive(Derivative)] -#[derivative(Debug)] +#[derive(Debug)] pub struct GraphElementDataSetupStatus { key: GraphElement, - #[derivative(Debug = "ignore")] - graph_pool: Arc, conn_spec: ConnectionSpec, data_clear: Option, change_type: SetupChangeType, @@ -867,7 +864,6 @@ pub struct GraphElementDataSetupStatus { impl GraphElementDataSetupStatus { fn new( key: GraphElement, - graph_pool: Arc, conn_spec: ConnectionSpec, desired_state: Option<&SetupState>, existing: &CombinedState, @@ -899,7 +895,6 @@ impl GraphElementDataSetupStatus { Self { key, - graph_pool, conn_spec, data_clear, change_type, @@ -938,43 +933,27 @@ impl ResourceSetupStatus for GraphElementDataSetupStatus { } } -impl GraphElementDataSetupStatus { - async fn apply_change(&self) -> Result<()> { - let graph = self.graph_pool.get_graph(&self.conn_spec).await?; - if let Some(data_clear) = &self.data_clear { - let delete_query = neo4rs::query(&formatdoc! {" +async fn clear_graph_element_data( + graph: &Graph, + key: &GraphElement, + is_dependent: bool, +) -> Result<()> { + let var_name = CORE_ELEMENT_MATCHER_VAR; + let optional_orphan_condition = if is_dependent { + format!("WHERE NOT ({var_name})--()") + } else { + "".to_string() + }; + let matcher = key.typ.matcher(CORE_ELEMENT_MATCHER_VAR); + let delete_query = neo4rs::query(&formatdoc! {" CALL {{ - MATCH {matcher} - WITH {var_name} - {optional_orphan_condition} - DELETE {var_name} + MATCH {matcher} WITH {var_name} {optional_orphan_condition} DELETE {var_name} }} IN TRANSACTIONS - ", - matcher = self.key.typ.matcher(CORE_ELEMENT_MATCHER_VAR), - var_name = CORE_ELEMENT_MATCHER_VAR, - optional_orphan_condition = match self.key.typ { - ElementType::Node(_) => format!("WHERE NOT ({CORE_ELEMENT_MATCHER_VAR})--()"), - _ => "".to_string(), - }, - }); - graph.run(delete_query).await?; - - for node_label in &data_clear.dependent_node_labels { - let delete_node_query = neo4rs::query(&formatdoc! {" - CALL {{ - MATCH (n:{node_label}) - WHERE NOT (n)--() - DELETE n - }} IN TRANSACTIONS - ", - node_label = node_label - }); - graph.run(delete_node_query).await?; - } - } - Ok(()) - } + "}); + graph.run(delete_query).await?; + Ok(()) } + /// Factory for Neo4j relationships pub struct Factory { graph_pool: Arc, @@ -1266,17 +1245,12 @@ impl StorageFactoryBase for Factory { auth_registry: &Arc, ) -> Result { let conn_spec = auth_registry.get::(&key.connection)?; - let data_status = GraphElementDataSetupStatus::new( - key, - self.graph_pool.clone(), - conn_spec.clone(), - desired.as_ref(), - &existing, - ); + let data_status = + GraphElementDataSetupStatus::new(key, conn_spec.clone(), desired.as_ref(), &existing); let components = components::SetupStatus::create( SetupComponentOperator { graph_pool: self.graph_pool.clone(), - conn_spec: conn_spec.clone(), + conn_spec, }, desired, existing, @@ -1342,10 +1316,54 @@ impl StorageFactoryBase for Factory { &self, changes: Vec<&'async_trait Self::SetupStatus>, ) -> Result<()> { - for change in changes.iter() { - change.0.apply_change().await?; + let (data_statuses, components): (Vec<_>, Vec<_>) = + changes.into_iter().map(|c| (&c.0, &c.1)).unzip(); + + // Relationships first, then nodes, as relationships need to be deleted before nodes they referenced. + let mut relationship_types = IndexMap::<&GraphElement, &ConnectionSpec>::new(); + let mut node_labels = IndexMap::<&GraphElement, &ConnectionSpec>::new(); + let mut dependent_node_labels = IndexMap::::new(); + for data_status in data_statuses.iter() { + if let Some(data_clear) = &data_status.data_clear { + match &data_status.key.typ { + ElementType::Relationship(_) => { + relationship_types.insert(&data_status.key, &data_status.conn_spec); + for label in &data_clear.dependent_node_labels { + dependent_node_labels.insert( + GraphElement { + connection: data_status.key.connection.clone(), + typ: ElementType::Node(label.clone()), + }, + &data_status.conn_spec, + ); + } + } + ElementType::Node(_) => { + node_labels.insert(&data_status.key, &data_status.conn_spec); + } + } + } } - apply_component_changes(changes.iter().map(|c| &c.1).collect()).await?; + + // Relationships have no dependency, so can be cleared first. + for (rel_type, conn_spec) in relationship_types.iter() { + let graph = self.graph_pool.get_graph(conn_spec).await?; + clear_graph_element_data(&graph, rel_type, false).await?; + } + // Clear standalone nodes, which is simpler than dependent nodes. + for (node_label, conn_spec) in node_labels.iter() { + let graph = self.graph_pool.get_graph(conn_spec).await?; + clear_graph_element_data(&graph, node_label, false).await?; + } + // Clear dependent nodes if they're not covered by standalone nodes. + for (node_label, conn_spec) in dependent_node_labels.iter() { + if !node_labels.contains_key(node_label) { + let graph = self.graph_pool.get_graph(conn_spec).await?; + clear_graph_element_data(&graph, node_label, true).await?; + } + } + + apply_component_changes(components).await?; Ok(()) } }