Skip to content

Commit ad7f72f

Browse files
committed
refactor: extract storage common logic into shared for kuzu reuse
1 parent 5305e4c commit ad7f72f

File tree

8 files changed

+381
-332
lines changed

8 files changed

+381
-332
lines changed

src/ops/storages/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
pub mod neo4j;
22
pub mod postgres;
33
pub mod qdrant;
4-
pub mod spec;
4+
pub mod shared;

src/ops/storages/neo4j.rs

Lines changed: 19 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::prelude::*;
22

3-
use super::spec::{GraphDeclaration, GraphElementMapping, NodeFromFieldsSpec, TargetFieldMapping};
3+
use super::shared::property_graph::*;
4+
45
use crate::setup::components::{self, State, apply_component_changes};
56
use crate::setup::{ResourceSetupStatus, SetupChangeType};
67
use crate::{ops::sdk::*, setup::CombinedState};
@@ -33,6 +34,8 @@ pub struct Declaration {
3334
decl: GraphDeclaration,
3435
}
3536

37+
type Neo4jGraphElement = GraphElement<ConnectionSpec>;
38+
3639
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
3740
struct GraphKey {
3841
uri: String,
@@ -48,61 +51,6 @@ impl GraphKey {
4851
}
4952
}
5053

51-
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Hash, Clone)]
52-
enum ElementType {
53-
Node(String),
54-
Relationship(String),
55-
}
56-
57-
impl ElementType {
58-
fn label(&self) -> &str {
59-
match self {
60-
ElementType::Node(label) => label,
61-
ElementType::Relationship(label) => label,
62-
}
63-
}
64-
65-
fn from_mapping_spec(spec: &GraphElementMapping) -> Self {
66-
match spec {
67-
GraphElementMapping::Relationship(spec) => {
68-
ElementType::Relationship(spec.rel_type.clone())
69-
}
70-
GraphElementMapping::Node(spec) => ElementType::Node(spec.label.clone()),
71-
}
72-
}
73-
74-
fn matcher(&self, var_name: &str) -> String {
75-
match self {
76-
ElementType::Relationship(label) => format!("()-[{var_name}:{label}]->()"),
77-
ElementType::Node(label) => format!("({var_name}:{label})"),
78-
}
79-
}
80-
}
81-
82-
impl std::fmt::Display for ElementType {
83-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84-
match self {
85-
ElementType::Node(label) => write!(f, "Node(label:{label})"),
86-
ElementType::Relationship(rel_type) => write!(f, "Relationship(type:{rel_type})"),
87-
}
88-
}
89-
}
90-
91-
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
92-
pub struct GraphElement {
93-
connection: AuthEntryReference<ConnectionSpec>,
94-
typ: ElementType,
95-
}
96-
97-
impl GraphElement {
98-
fn from_spec(spec: &Spec) -> Self {
99-
Self {
100-
connection: spec.connection.clone(),
101-
typ: ElementType::from_mapping_spec(&spec.mapping),
102-
}
103-
}
104-
}
105-
10654
impl retryable::IsRetryable for neo4rs::Error {
10755
fn is_retryable(&self) -> bool {
10856
match self {
@@ -856,15 +804,15 @@ fn build_composite_field_names(qualifier: &str, field_names: &[String]) -> Strin
856804
}
857805
#[derive(Debug)]
858806
pub struct GraphElementDataSetupStatus {
859-
key: GraphElement,
807+
key: Neo4jGraphElement,
860808
conn_spec: ConnectionSpec,
861809
data_clear: Option<DataClearAction>,
862810
change_type: SetupChangeType,
863811
}
864812

865813
impl GraphElementDataSetupStatus {
866814
fn new(
867-
key: GraphElement,
815+
key: Neo4jGraphElement,
868816
conn_spec: ConnectionSpec,
869817
desired_state: Option<&SetupState>,
870818
existing: &CombinedState<SetupState>,
@@ -936,7 +884,7 @@ impl ResourceSetupStatus for GraphElementDataSetupStatus {
936884

937885
async fn clear_graph_element_data(
938886
graph: &Graph,
939-
key: &GraphElement,
887+
key: &Neo4jGraphElement,
940888
is_self_contained: bool,
941889
) -> Result<()> {
942890
let var_name = CORE_ELEMENT_MATCHER_VAR;
@@ -1083,7 +1031,7 @@ impl StorageFactoryBase for Factory {
10831031
GraphElementDataSetupStatus,
10841032
components::SetupStatus<SetupComponentOperator>,
10851033
);
1086-
type Key = GraphElement;
1034+
type Key = Neo4jGraphElement;
10871035
type ExportContext = ExportContext;
10881036

10891037
fn name(&self) -> &str {
@@ -1097,7 +1045,7 @@ impl StorageFactoryBase for Factory {
10971045
context: Arc<FlowInstanceContext>,
10981046
) -> Result<(
10991047
Vec<TypedExportDataCollectionBuildOutput<Self>>,
1100-
Vec<(GraphElement, SetupState)>,
1048+
Vec<(Neo4jGraphElement, SetupState)>,
11011049
)> {
11021050
let node_labels_index_options = data_collections
11031051
.iter()
@@ -1116,7 +1064,10 @@ impl StorageFactoryBase for Factory {
11161064
let data_coll_output = data_collections
11171065
.into_iter()
11181066
.map(|d| {
1119-
let setup_key = GraphElement::from_spec(&d.spec);
1067+
let setup_key = Neo4jGraphElement {
1068+
connection: d.spec.connection.clone(),
1069+
typ: ElementType::from_mapping_spec(&d.spec.mapping),
1070+
};
11201071

11211072
let (value_fields_info, rel_end_label_info, dependent_node_labels) = match &d
11221073
.spec
@@ -1257,7 +1208,7 @@ impl StorageFactoryBase for Factory {
12571208

12581209
async fn check_setup_status(
12591210
&self,
1260-
key: GraphElement,
1211+
key: Neo4jGraphElement,
12611212
desired: Option<SetupState>,
12621213
existing: CombinedState<SetupState>,
12631214
auth_registry: &Arc<AuthRegistry>,
@@ -1284,7 +1235,7 @@ impl StorageFactoryBase for Factory {
12841235
Ok(desired.check_compatible(existing))
12851236
}
12861237

1287-
fn describe_resource(&self, key: &GraphElement) -> Result<String> {
1238+
fn describe_resource(&self, key: &Neo4jGraphElement) -> Result<String> {
12881239
Ok(format!("Neo4j {}", key.typ))
12891240
}
12901241

@@ -1339,17 +1290,17 @@ impl StorageFactoryBase for Factory {
13391290
changes.into_iter().map(|c| (&c.0, &c.1)).unzip();
13401291

13411292
// Relationships first, then nodes, as relationships need to be deleted before nodes they referenced.
1342-
let mut relationship_types = IndexMap::<&GraphElement, &ConnectionSpec>::new();
1343-
let mut node_labels = IndexMap::<&GraphElement, &ConnectionSpec>::new();
1344-
let mut dependent_node_labels = IndexMap::<GraphElement, &ConnectionSpec>::new();
1293+
let mut relationship_types = IndexMap::<&Neo4jGraphElement, &ConnectionSpec>::new();
1294+
let mut node_labels = IndexMap::<&Neo4jGraphElement, &ConnectionSpec>::new();
1295+
let mut dependent_node_labels = IndexMap::<Neo4jGraphElement, &ConnectionSpec>::new();
13451296
for data_status in data_statuses.iter() {
13461297
if let Some(data_clear) = &data_status.data_clear {
13471298
match &data_status.key.typ {
13481299
ElementType::Relationship(_) => {
13491300
relationship_types.insert(&data_status.key, &data_status.conn_spec);
13501301
for label in &data_clear.dependent_node_labels {
13511302
dependent_node_labels.insert(
1352-
GraphElement {
1303+
Neo4jGraphElement {
13531304
connection: data_status.key.connection.clone(),
13541305
typ: ElementType::Node(label.clone()),
13551306
},

0 commit comments

Comments
 (0)