Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 69 additions & 51 deletions src/ops/storages/neo4j.rs
Original file line number Diff line number Diff line change
Expand Up @@ -853,12 +853,9 @@ fn build_composite_field_names(qualifier: &str, field_names: &[String]) -> Strin
format!("({})", strs)
}
}
#[derive(Derivative)]
#[derivative(Debug)]
#[derive(Debug)]
pub struct GraphElementDataSetupStatus {
key: GraphElement,
#[derivative(Debug = "ignore")]
graph_pool: Arc<GraphPool>,
conn_spec: ConnectionSpec,
data_clear: Option<DataClearAction>,
change_type: SetupChangeType,
Expand All @@ -867,7 +864,6 @@ pub struct GraphElementDataSetupStatus {
impl GraphElementDataSetupStatus {
fn new(
key: GraphElement,
graph_pool: Arc<GraphPool>,
conn_spec: ConnectionSpec,
desired_state: Option<&SetupState>,
existing: &CombinedState<SetupState>,
Expand Down Expand Up @@ -899,7 +895,6 @@ impl GraphElementDataSetupStatus {

Self {
key,
graph_pool,
conn_spec,
data_clear,
change_type,
Expand Down Expand Up @@ -938,43 +933,27 @@ impl ResourceSetupStatus for GraphElementDataSetupStatus {
}
}

impl GraphElementDataSetupStatus {
async fn apply_change(&self) -> Result<()> {
let graph = self.graph_pool.get_graph(&self.conn_spec).await?;
if let Some(data_clear) = &self.data_clear {
let delete_query = neo4rs::query(&formatdoc! {"
async fn clear_graph_element_data(
graph: &Graph,
key: &GraphElement,
is_dependent: bool,
) -> Result<()> {
let var_name = CORE_ELEMENT_MATCHER_VAR;
let optional_orphan_condition = if is_dependent {
format!("WHERE NOT ({var_name})--()")
} else {
"".to_string()
};
let matcher = key.typ.matcher(CORE_ELEMENT_MATCHER_VAR);
let delete_query = neo4rs::query(&formatdoc! {"
CALL {{
MATCH {matcher}
WITH {var_name}
{optional_orphan_condition}
DELETE {var_name}
MATCH {matcher} WITH {var_name} {optional_orphan_condition} DELETE {var_name}
}} IN TRANSACTIONS
",
matcher = self.key.typ.matcher(CORE_ELEMENT_MATCHER_VAR),
var_name = CORE_ELEMENT_MATCHER_VAR,
optional_orphan_condition = match self.key.typ {
ElementType::Node(_) => format!("WHERE NOT ({CORE_ELEMENT_MATCHER_VAR})--()"),
_ => "".to_string(),
},
});
graph.run(delete_query).await?;

for node_label in &data_clear.dependent_node_labels {
let delete_node_query = neo4rs::query(&formatdoc! {"
CALL {{
MATCH (n:{node_label})
WHERE NOT (n)--()
DELETE n
}} IN TRANSACTIONS
",
node_label = node_label
});
graph.run(delete_node_query).await?;
}
}
Ok(())
}
"});
graph.run(delete_query).await?;
Ok(())
}

/// Factory for Neo4j relationships
pub struct Factory {
graph_pool: Arc<GraphPool>,
Expand Down Expand Up @@ -1266,17 +1245,12 @@ impl StorageFactoryBase for Factory {
auth_registry: &Arc<AuthRegistry>,
) -> Result<Self::SetupStatus> {
let conn_spec = auth_registry.get::<ConnectionSpec>(&key.connection)?;
let data_status = GraphElementDataSetupStatus::new(
key,
self.graph_pool.clone(),
conn_spec.clone(),
desired.as_ref(),
&existing,
);
let data_status =
GraphElementDataSetupStatus::new(key, conn_spec.clone(), desired.as_ref(), &existing);
let components = components::SetupStatus::create(
SetupComponentOperator {
graph_pool: self.graph_pool.clone(),
conn_spec: conn_spec.clone(),
conn_spec,
},
desired,
existing,
Expand Down Expand Up @@ -1342,10 +1316,54 @@ impl StorageFactoryBase for Factory {
&self,
changes: Vec<&'async_trait Self::SetupStatus>,
) -> Result<()> {
for change in changes.iter() {
change.0.apply_change().await?;
let (data_statuses, components): (Vec<_>, Vec<_>) =
changes.into_iter().map(|c| (&c.0, &c.1)).unzip();

// Relationships first, then nodes, as relationships need to be deleted before nodes they referenced.
let mut relationship_types = IndexMap::<&GraphElement, &ConnectionSpec>::new();
let mut node_labels = IndexMap::<&GraphElement, &ConnectionSpec>::new();
let mut dependent_node_labels = IndexMap::<GraphElement, &ConnectionSpec>::new();
for data_status in data_statuses.iter() {
if let Some(data_clear) = &data_status.data_clear {
match &data_status.key.typ {
ElementType::Relationship(_) => {
relationship_types.insert(&data_status.key, &data_status.conn_spec);
for label in &data_clear.dependent_node_labels {
dependent_node_labels.insert(
GraphElement {
connection: data_status.key.connection.clone(),
typ: ElementType::Node(label.clone()),
},
&data_status.conn_spec,
);
}
}
ElementType::Node(_) => {
node_labels.insert(&data_status.key, &data_status.conn_spec);
}
}
}
}
apply_component_changes(changes.iter().map(|c| &c.1).collect()).await?;

// Relationships have no dependency, so can be cleared first.
for (rel_type, conn_spec) in relationship_types.iter() {
let graph = self.graph_pool.get_graph(conn_spec).await?;
clear_graph_element_data(&graph, rel_type, false).await?;
}
// Clear standalone nodes, which is simpler than dependent nodes.
for (node_label, conn_spec) in node_labels.iter() {
let graph = self.graph_pool.get_graph(conn_spec).await?;
clear_graph_element_data(&graph, node_label, false).await?;
}
// Clear dependent nodes if they're not covered by standalone nodes.
for (node_label, conn_spec) in dependent_node_labels.iter() {
if !node_labels.contains_key(node_label) {
let graph = self.graph_pool.get_graph(conn_spec).await?;
clear_graph_element_data(&graph, node_label, true).await?;
}
}

apply_component_changes(components).await?;
Ok(())
}
}