Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 169 additions & 7 deletions helix-db/src/helix_engine/tests/traversal_tests/upsert_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,11 @@ fn test_upsert_e_updates_existing_edge_with_properties() {
}

#[test]
fn test_upsert_e_ignores_non_edge_values() {
fn test_upsert_e_ignores_iterator_content() {
// After fix for issue #850: upsert_e now looks up edges by from_node/to_node/label
// directly in the database, ignoring the source iterator content entirely.
// This test verifies that even if a Node is in the iterator, the upsert
// still correctly creates an edge between the specified nodes.
let (_temp_dir, storage) = setup_test_db();
let arena = Bump::new();
let mut txn = storage.graph_env.write_txn().unwrap();
Expand All @@ -456,7 +460,8 @@ fn test_upsert_e_ignores_non_edge_values() {
.unwrap()[0]
.id();

// Upsert with node in iterator (should be ignored)
// Upsert with node in iterator - iterator content is now ignored,
// edge is created based on from_node/to_node parameters
let result = G::new_mut_from_iter(&storage, &mut txn, std::iter::once(node1.clone()), &arena)
.upsert_e(
"connects",
Expand All @@ -467,13 +472,15 @@ fn test_upsert_e_ignores_non_edge_values() {
.collect::<Result<Vec<_>, _>>()
.unwrap();

// Should return Empty since node was ignored
// Should create edge since no edge exists between node1 and node2
assert_eq!(result.len(), 1);
if let TraversalValue::Empty = &result[0] {
// This is the correct behavior - non-matching types are ignored
// and return TraversalValue::Empty
if let TraversalValue::Edge(edge) = &result[0] {
assert_eq!(edge.from_node, node1.id());
assert_eq!(edge.to_node, node2);
assert_eq!(edge.label, "connects");
assert_eq!(edge.get_property("type").unwrap(), &Value::from("friend"));
} else {
panic!("Expected Empty, got: {:?}", result[0]);
panic!("Expected Edge, got: {:?}", result[0]);
}
txn.commit().unwrap();
}
Expand Down Expand Up @@ -1246,3 +1253,158 @@ fn test_upsert_n_sequential_property_updates() {

txn.commit().unwrap();
}

// ============================================================================
// Regression Tests - Issue #850
// ============================================================================

#[test]
fn test_upsert_e_creates_edge_between_correct_nodes_issue_850() {
// Regression test for issue #850: UpsertE was ignoring From()/To() parameters
// and instead using the first edge from the source iterator.
let (_temp_dir, storage) = setup_test_db();
let arena = Bump::new();
let mut txn = storage.graph_env.write_txn().unwrap();

// Create 4 nodes
let node_a = G::new_mut(&storage, &arena, &mut txn)
.add_n("person", None, None)
.collect::<Result<Vec<_>, _>>()
.unwrap()[0]
.id();
let node_b = G::new_mut(&storage, &arena, &mut txn)
.add_n("person", None, None)
.collect::<Result<Vec<_>, _>>()
.unwrap()[0]
.id();
let node_c = G::new_mut(&storage, &arena, &mut txn)
.add_n("person", None, None)
.collect::<Result<Vec<_>, _>>()
.unwrap()[0]
.id();
let node_d = G::new_mut(&storage, &arena, &mut txn)
.add_n("person", None, None)
.collect::<Result<Vec<_>, _>>()
.unwrap()[0]
.id();

// Create existing edge A -> B
let _existing = G::new_mut(&storage, &arena, &mut txn)
.add_edge("knows", None, node_a, node_b, false, false)
.collect::<Result<Vec<_>, _>>()
.unwrap();

// UpsertE targeting C -> D (should create new edge, not update A->B)
let result = G::new_mut_from_iter(
&storage,
&mut txn,
std::iter::empty::<TraversalValue>(),
&arena,
)
.upsert_e("knows", node_c, node_d, &[("since", Value::from(2024))])
.collect::<Result<Vec<_>, _>>()
.unwrap();

assert_eq!(result.len(), 1);
if let TraversalValue::Edge(edge) = &result[0] {
// Key assertion: edge connects C->D, not A->B
assert_eq!(edge.from_node, node_c);
assert_eq!(edge.to_node, node_d);
assert_eq!(edge.get_property("since").unwrap(), &Value::from(2024));
} else {
panic!("Expected edge");
}
txn.commit().unwrap();
}

#[test]
fn test_upsert_e_updates_correct_edge_when_multiple_edges_exist_issue_850() {
// Another regression test for issue #850: When multiple edges exist,
// UpsertE should find and update the correct edge based on from_node/to_node.
let (_temp_dir, storage) = setup_test_db();
let arena = Bump::new();
let mut txn = storage.graph_env.write_txn().unwrap();

// Create nodes
let node_a = G::new_mut(&storage, &arena, &mut txn)
.add_n("person", None, None)
.collect::<Result<Vec<_>, _>>()
.unwrap()[0]
.id();
let node_b = G::new_mut(&storage, &arena, &mut txn)
.add_n("person", None, None)
.collect::<Result<Vec<_>, _>>()
.unwrap()[0]
.id();
let node_c = G::new_mut(&storage, &arena, &mut txn)
.add_n("person", None, None)
.collect::<Result<Vec<_>, _>>()
.unwrap()[0]
.id();

// Create edge A -> B with initial property
let edge_ab = G::new_mut(&storage, &arena, &mut txn)
.add_edge(
"knows",
props_option(&arena, props!("since" => 2020)),
node_a,
node_b,
false,
false,
)
.collect::<Result<Vec<_>, _>>()
.unwrap();
let edge_ab_id = if let TraversalValue::Edge(e) = &edge_ab[0] {
e.id
} else {
panic!("Expected edge");
};

// Create edge A -> C with initial property
let edge_ac = G::new_mut(&storage, &arena, &mut txn)
.add_edge(
"knows",
props_option(&arena, props!("since" => 2021)),
node_a,
node_c,
false,
false,
)
.collect::<Result<Vec<_>, _>>()
.unwrap();
let edge_ac_id = if let TraversalValue::Edge(e) = &edge_ac[0] {
e.id
} else {
panic!("Expected edge");
};

// UpsertE targeting A -> C should update edge_ac, NOT edge_ab
let result = G::new_mut_from_iter(
&storage,
&mut txn,
std::iter::empty::<TraversalValue>(),
&arena,
)
.upsert_e(
"knows",
node_a,
node_c,
&[("since", Value::from(2025)), ("updated", Value::from(true))],
)
.collect::<Result<Vec<_>, _>>()
.unwrap();

assert_eq!(result.len(), 1);
if let TraversalValue::Edge(edge) = &result[0] {
// Key assertions: should update edge_ac, not edge_ab
assert_eq!(edge.id, edge_ac_id, "Should update the correct edge (A->C)");
assert_ne!(edge.id, edge_ab_id, "Should NOT update edge A->B");
assert_eq!(edge.from_node, node_a);
assert_eq!(edge.to_node, node_c);
assert_eq!(edge.get_property("since").unwrap(), &Value::from(2025));
assert_eq!(edge.get_property("updated").unwrap(), &Value::from(true));
} else {
panic!("Expected edge");
}
txn.commit().unwrap();
}
44 changes: 33 additions & 11 deletions helix-db/src/helix_engine/traversal_core/ops/util/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use itertools::Itertools;
use crate::{
helix_engine::{
bm25::bm25::{BM25, BM25Flatten},
storage_core::HelixGraphStorage,
storage_core::{storage_methods::StorageMethods, HelixGraphStorage},
traversal_core::{traversal_iter::RwTraversalIterator, traversal_value::TraversalValue},
types::GraphError,
vector_core::{hnsw::HNSW, vector::HVector},
Expand Down Expand Up @@ -351,7 +351,7 @@ impl<'db, 'arena, 'txn, I: Iterator<Item = Result<TraversalValue<'arena>, GraphE
}

fn upsert_e(
mut self,
self,
label: &'arena str,
from_node: u128,
to_node: u128,
Expand All @@ -364,8 +364,37 @@ impl<'db, 'arena, 'txn, I: Iterator<Item = Result<TraversalValue<'arena>, GraphE
> {
let mut result: Result<TraversalValue, GraphError> = Ok(TraversalValue::Empty);

match self.inner.next() {
Some(Ok(TraversalValue::Edge(mut edge))) => {
// Look up existing edge by from_node + to_node + label (ignore source iterator)
// This fixes issue #850 where UpsertE was incorrectly using the first edge
// from the source iterator instead of finding the edge by its endpoints.
let label_hash = hash_label(label, None);
let out_key = HelixGraphStorage::out_edge_key(&from_node, &label_hash);

let existing_edge: Option<Edge> = self
.storage
.out_edges_db
.lazily_decode_data()
.get_duplicates(self.txn, &out_key)
.ok()
.flatten()
.and_then(|iter| {
iter.filter_map(|item| item.ok())
.find_map(|(_, data)| {
let data = data.decode().ok()?;
let (edge_id, node_id) =
HelixGraphStorage::unpack_adj_edge_data(data).ok()?;
if node_id == to_node {
self.storage
.get_edge(self.txn, &edge_id, self.arena)
.ok()
} else {
None
}
})
});

match existing_edge {
Some(mut edge) => {
// Update existing edge - merge properties
match edge.properties {
None => {
Expand Down Expand Up @@ -419,9 +448,6 @@ impl<'db, 'arena, 'txn, I: Iterator<Item = Result<TraversalValue<'arena>, GraphE
Err(e) => result = Err(GraphError::from(e)),
}
}
Some(Err(e)) => {
result = Err(e);
}
None => {
// Create new edge
let version = self.storage.version_info.get_latest(label);
Expand Down Expand Up @@ -460,7 +486,6 @@ impl<'db, 'arena, 'txn, I: Iterator<Item = Result<TraversalValue<'arena>, GraphE
}

// Insert into out_edges_db
let label_hash = hash_label(edge.label, None);
if let Err(e) = self.storage.out_edges_db.put_with_flags(
self.txn,
PutFlags::APPEND_DUP,
Expand All @@ -484,9 +509,6 @@ impl<'db, 'arena, 'txn, I: Iterator<Item = Result<TraversalValue<'arena>, GraphE
result = Ok(TraversalValue::Edge(edge));
}
}
Some(Ok(_)) => {
// Non-edge value in iterator - ignore
}
}

RwTraversalIterator {
Expand Down
Loading