Skip to content

Commit 384e532

Browse files
authored
Fix Neo4j logic to decouple NodeSpec with those for rel ends. (#290)
1 parent 4af075c commit 384e532

File tree

2 files changed

+93
-75
lines changed

2 files changed

+93
-75
lines changed

python/cocoindex/storages.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,21 @@ class Neo4jConnectionSpec:
1818
db: str | None = None
1919

2020
@dataclass
21-
class Neo4jNodeSpec:
21+
class Neo4jRelationshipEndSpec:
2222
"""Spec for a Neo4j node type."""
2323
field_name: str
2424
label: str
2525

26+
@dataclass
27+
class Neo4jRelationshipNodeSpec:
28+
"""Spec for a Neo4j node type."""
29+
key_field_name: str | None = None
30+
2631
class Neo4jRelationship(op.StorageSpec):
2732
"""Graph storage powered by Neo4j."""
2833

2934
connection: AuthEntryReference
3035
relationship: str
31-
source_node: Neo4jNodeSpec
32-
target_node: Neo4jNodeSpec
36+
source: Neo4jRelationshipEndSpec
37+
target: Neo4jRelationshipEndSpec
38+
nodes: dict[str, Neo4jRelationshipNodeSpec]

src/ops/storages/neo4j.rs

Lines changed: 84 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,25 @@ pub struct Neo4jConnectionSpec {
1616
}
1717

1818
#[derive(Debug, Deserialize)]
19-
pub struct NodeSpec {
19+
pub struct RelationshipEndSpec {
2020
field_name: String,
2121
label: String,
2222
}
2323

24+
const DEFAULT_KEY_FIELD_NAME: &str = "value";
25+
26+
#[derive(Debug, Deserialize)]
27+
pub struct RelationshipNodeSpec {
28+
key_field_name: Option<String>,
29+
}
30+
2431
#[derive(Debug, Deserialize)]
2532
pub struct RelationshipSpec {
2633
connection: AuthEntryReference,
2734
relationship: String,
28-
source_node: NodeSpec,
29-
target_node: NodeSpec,
35+
source: RelationshipEndSpec,
36+
target: RelationshipEndSpec,
37+
nodes: BTreeMap<String, RelationshipNodeSpec>,
3038
}
3139

3240
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
@@ -284,10 +292,10 @@ MERGE (new_tgt:{tgt_node_label} {{{tgt_node_key_field_name}: ${TGT_ID_PARAM}}})
284292
MERGE (new_src)-[new_rel:{rel_type} {{id: ${REL_ID_PARAM}}}]->(new_tgt)
285293
{optional_set_rel_props}
286294
"#,
287-
src_node_label = spec.source_node.label,
288-
src_node_key_field_name = spec.source_node.field_name,
289-
tgt_node_label = spec.target_node.label,
290-
tgt_node_key_field_name = spec.target_node.field_name,
295+
src_node_label = spec.source.label,
296+
src_node_key_field_name = spec.source.field_name,
297+
tgt_node_label = spec.target.label,
298+
tgt_node_key_field_name = spec.target.field_name,
291299
rel_type = spec.relationship,
292300
);
293301
Self {
@@ -355,47 +363,63 @@ impl ExportTargetExecutor for RelationshipStorageExecutor {
355363
}
356364

357365
#[derive(Debug, Clone, Serialize, Deserialize)]
358-
pub struct NodeSetupState {
359-
label: String,
366+
pub struct NodeLabelSetupState {
360367
key_field_name: String,
361368
key_constraint_name: String,
362369
}
363370

364-
impl NodeSetupState {
365-
fn from_spec(spec: &NodeSpec) -> Self {
371+
impl NodeLabelSetupState {
372+
fn from_spec(label: &str, spec: &RelationshipNodeSpec) -> Self {
373+
let key_field_name = spec
374+
.key_field_name
375+
.to_owned()
376+
.unwrap_or_else(|| DEFAULT_KEY_FIELD_NAME.to_string());
377+
let key_constraint_name = format!("n__{}__{}", label, key_field_name);
366378
Self {
367-
label: spec.label.clone(),
368-
key_field_name: spec.field_name.clone(),
369-
key_constraint_name: format!("n__{}__{}", spec.label, spec.field_name),
379+
key_field_name,
380+
key_constraint_name,
370381
}
371382
}
372383

373384
fn is_compatible(&self, other: &Self) -> bool {
374-
self.label == other.label && self.key_field_name == other.key_field_name
385+
self.key_field_name == other.key_field_name
375386
}
376387
}
377388
#[derive(Debug, Clone, Serialize, Deserialize)]
378389
pub struct RelationshipSetupState {
379390
key_field_name: String,
380391
key_constraint_name: String,
381-
src_node: NodeSetupState,
382-
tgt_node: NodeSetupState,
392+
#[serde(default)]
393+
nodes: BTreeMap<String, NodeLabelSetupState>,
383394
}
384395

385396
impl RelationshipSetupState {
386397
fn from_spec(spec: &RelationshipSpec, key_field_name: String) -> Self {
387398
Self {
388399
key_field_name,
389400
key_constraint_name: format!("r__{}__key", spec.relationship),
390-
src_node: NodeSetupState::from_spec(&spec.source_node),
391-
tgt_node: NodeSetupState::from_spec(&spec.target_node),
401+
nodes: spec
402+
.nodes
403+
.iter()
404+
.map(|(label, node)| (label.clone(), NodeLabelSetupState::from_spec(label, node)))
405+
.collect(),
392406
}
393407
}
394408

395-
fn is_compatible(&self, other: &Self) -> bool {
396-
self.key_field_name == other.key_field_name
397-
&& self.src_node.is_compatible(&other.src_node)
398-
&& self.tgt_node.is_compatible(&other.tgt_node)
409+
fn check_compatible(&self, existing: &Self) -> SetupStateCompatibility {
410+
if self.key_field_name != existing.key_field_name {
411+
SetupStateCompatibility::NotCompatible
412+
} else if existing.nodes.iter().any(|(label, existing_node)| {
413+
!self
414+
.nodes
415+
.get(label)
416+
.map_or(false, |node| node.is_compatible(existing_node))
417+
}) {
418+
// If any node's key field change of some node label gone, we have to clear relationship.
419+
SetupStateCompatibility::NotCompatible
420+
} else {
421+
SetupStateCompatibility::Compatible
422+
}
399423
}
400424
}
401425

@@ -412,9 +436,9 @@ struct KeyConstraint {
412436
}
413437

414438
impl KeyConstraint {
415-
fn from_node_setup_state(state: &NodeSetupState) -> Self {
439+
fn new(label: String, state: &NodeLabelSetupState) -> Self {
416440
Self {
417-
label: state.label.clone(),
441+
label: label,
418442
field_name: state.key_field_name.clone(),
419443
}
420444
}
@@ -448,24 +472,27 @@ impl SetupStatusCheck {
448472
.current
449473
.as_ref()
450474
.filter(|existing_current| {
451-
!desired_state
452-
.as_ref()
453-
.map(|desired| desired.is_compatible(existing_current))
454-
.unwrap_or(false)
475+
desired_state.as_ref().map_or(true, |desired| {
476+
desired.check_compatible(existing_current)
477+
== SetupStateCompatibility::NotCompatible
478+
})
455479
})
456480
.map(|existing_current| DataClearAction {
457481
rel_type: key.relationship.clone(),
458-
node_labels: std::iter::once(existing_current.src_node.label.clone())
459-
.chain(std::iter::once(existing_current.tgt_node.label.clone()))
482+
node_labels: existing_current
483+
.nodes
484+
.values()
485+
.map(|node| node.key_constraint_name.clone())
460486
.collect(),
461487
});
462488

463489
let mut old_rel_constraints = IndexSet::new();
464490
let mut old_node_constraints = IndexSet::new();
465491
for existing_version in existing.possible_versions() {
466492
old_rel_constraints.insert(existing_version.key_constraint_name.clone());
467-
old_node_constraints.insert(existing_version.src_node.key_constraint_name.clone());
468-
old_node_constraints.insert(existing_version.tgt_node.key_constraint_name.clone());
493+
for (_, node) in existing_version.nodes.iter() {
494+
old_node_constraints.insert(node.key_constraint_name.clone());
495+
}
469496
}
470497

471498
let mut rel_constraint_to_create = IndexMap::new();
@@ -485,33 +512,23 @@ impl SetupStatusCheck {
485512
rel_constraint_to_create.insert(desired_state.key_constraint_name, rel_constraint);
486513
}
487514

488-
old_node_constraints.swap_remove(&desired_state.src_node.key_constraint_name);
489-
if !existing
490-
.current
491-
.as_ref()
492-
.map(|c| {
493-
c.src_node.is_compatible(&desired_state.src_node)
494-
|| c.tgt_node.is_compatible(&desired_state.tgt_node)
495-
})
496-
.unwrap_or(false)
497-
{
498-
node_constraint_to_create.insert(
499-
desired_state.src_node.key_constraint_name.clone(),
500-
KeyConstraint::from_node_setup_state(&desired_state.src_node),
501-
);
502-
}
503-
504-
old_node_constraints.swap_remove(&desired_state.tgt_node.key_constraint_name);
505-
if !existing
506-
.current
507-
.as_ref()
508-
.map(|c| c.src_node.is_compatible(&desired_state.src_node))
509-
.unwrap_or(false)
510-
{
511-
node_constraint_to_create.insert(
512-
desired_state.tgt_node.key_constraint_name.clone(),
513-
KeyConstraint::from_node_setup_state(&desired_state.tgt_node),
514-
);
515+
for (label, node) in desired_state.nodes.iter() {
516+
old_node_constraints.swap_remove(&node.key_constraint_name);
517+
if !existing
518+
.current
519+
.as_ref()
520+
.map(|c| {
521+
c.nodes
522+
.get(label)
523+
.map_or(false, |existing_node| node.is_compatible(existing_node))
524+
})
525+
.unwrap_or(false)
526+
{
527+
node_constraint_to_create.insert(
528+
node.key_constraint_name.clone(),
529+
KeyConstraint::new(label.clone(), node),
530+
);
531+
}
515532
}
516533
}
517534

@@ -565,7 +582,7 @@ impl ResourceSetupStatusCheck for SetupStatusCheck {
565582
}
566583
for (name, rel_constraint) in self.rel_constraint_to_create.iter() {
567584
result.push(format!(
568-
"Create UNIQUE CONSTRAINT {} ON RELATIONSHIP {} (key: {})",
585+
"Create KEY CONSTRAINT {} ON RELATIONSHIP {} (key: {})",
569586
name, rel_constraint.label, rel_constraint.field_name,
570587
));
571588
}
@@ -574,7 +591,7 @@ impl ResourceSetupStatusCheck for SetupStatusCheck {
574591
}
575592
for (name, node_constraint) in self.node_constraint_to_create.iter() {
576593
result.push(format!(
577-
"Create UNIQUE CONSTRAINT {} ON NODE {} (key: {})",
594+
"Create KEY CONSTRAINT {} ON NODE {} (key: {})",
578595
name, node_constraint.label, node_constraint.field_name,
579596
));
580597
}
@@ -692,19 +709,19 @@ impl StorageFactoryBase for RelationshipFactory {
692709
field_idx,
693710
field_schema,
694711
};
695-
if field_info.field_schema.name == spec.source_node.field_name {
712+
if field_info.field_schema.name == spec.source.field_name {
696713
src_field_info = Some(field_info);
697-
} else if field_info.field_schema.name == spec.target_node.field_name {
714+
} else if field_info.field_schema.name == spec.target.field_name {
698715
tgt_field_info = Some(field_info);
699716
} else {
700717
rel_value_fields_info.push(field_info);
701718
}
702719
}
703720
let src_field_info = src_field_info.ok_or_else(|| {
704-
anyhow::anyhow!("Source key field {} not found", spec.source_node.field_name)
721+
anyhow::anyhow!("Source key field {} not found", spec.source.field_name)
705722
})?;
706723
let tgt_field_info = tgt_field_info.ok_or_else(|| {
707-
anyhow::anyhow!("Target key field {} not found", spec.target_node.field_name)
724+
anyhow::anyhow!("Target key field {} not found", spec.target.field_name)
708725
})?;
709726
let conn_spec = context
710727
.auth_registry
@@ -751,12 +768,7 @@ impl StorageFactoryBase for RelationshipFactory {
751768
desired: &RelationshipSetupState,
752769
existing: &RelationshipSetupState,
753770
) -> Result<SetupStateCompatibility> {
754-
let compatibility = if desired.is_compatible(existing) {
755-
SetupStateCompatibility::Compatible
756-
} else {
757-
SetupStateCompatibility::NotCompatible
758-
};
759-
Ok(compatibility)
771+
Ok(desired.check_compatible(existing))
760772
}
761773

762774
fn describe_resource(&self, key: &GraphRelationship) -> Result<String> {

0 commit comments

Comments
 (0)