diff --git a/Cargo.lock b/Cargo.lock index 8cabe50a..6bef1a8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -427,9 +427,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" +checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" [[package]] name = "cc" diff --git a/minigu/storage/src/ap/iterators/adjacency_iterator.rs b/minigu/storage/src/ap/iterators/adjacency_iterator.rs index e6cfb992..e1cdf674 100644 --- a/minigu/storage/src/ap/iterators/adjacency_iterator.rs +++ b/minigu/storage/src/ap/iterators/adjacency_iterator.rs @@ -1,6 +1,7 @@ use std::num::NonZeroU32; use minigu_common::types::VertexId; +use minigu_transaction::Timestamp; use crate::ap::olap_graph::{OlapEdge, OlapPropertyStore, OlapStorage, OlapStorageEdge}; use crate::error::StorageError; @@ -23,30 +24,124 @@ impl Iterator for AdjacencyIterator<'_> { fn next(&mut self) -> Option { while self.block_idx != usize::MAX { let temporary = self.storage.edges.read().unwrap(); - let option = temporary.get(self.block_idx); - - // Return if none,should not happen - let _v = option?; + let block = match temporary.get(self.block_idx) { + Some(block) => block, + None => { + self.block_idx = usize::MAX; + return None; + } + }; - let block = option.unwrap(); // Return if tombstone if block.is_tombstone { - if option?.pre_block_index.is_none() { + if block.pre_block_index.is_none() { self.block_idx = usize::MAX; return None; } - self.block_idx = block.pre_block_index.unwrap(); continue; } + // Move to next block if self.offset == BLOCK_CAPACITY { self.offset = 0; - self.block_idx = if let Some(idx) = block.pre_block_index { - idx - } else { - usize::MAX + self.block_idx = block.pre_block_index.unwrap_or(usize::MAX); + continue; + } + + if self.offset < BLOCK_CAPACITY { + let raw: &OlapStorageEdge = &block.edges[self.offset]; + if raw.label_id == NonZeroU32::new(1) && raw.dst_id == 1 { + self.offset = 0; + self.block_idx = block.pre_block_index.unwrap_or(usize::MAX); + continue; + } + + // Build edge result + let edge = OlapEdge { + label_id: raw.label_id, + src_id: block.src_id, + dst_id: raw.dst_id, + properties: { + let mut props = OlapPropertyStore::default(); + + for (col_idx, column) in self + .storage + .property_columns + .read() + .unwrap() + .iter() + .enumerate() + { + if let Some(val) = column + .blocks + .get(self.block_idx) + .and_then(|blk| blk.values.get(self.offset)) + .and_then(|versions| { + crate::ap::olap_graph::latest_committed_prop_value(versions) + }) + { + props.set_prop(col_idx, Some(val)); + } + } + props + }, }; + self.offset += 1; + return Some(Ok(edge)); + } + self.block_idx = block.pre_block_index.unwrap_or(usize::MAX); + } + None + } +} + +#[allow(dead_code)] +pub struct AdjacencyIteratorAtTs<'a> { + pub storage: &'a OlapStorage, + // Vertex ID + pub vertex_id: VertexId, + // Index of the current block + pub block_idx: usize, + // Offset within block + pub offset: usize, + pub txn_id: Option, + pub start_ts: Timestamp, +} +impl Iterator for AdjacencyIteratorAtTs<'_> { + type Item = Result; + + fn next(&mut self) -> Option { + while self.block_idx != usize::MAX { + let temporary = self.storage.edges.read().unwrap(); + let block = match temporary.get(self.block_idx) { + Some(block) => block, + None => { + self.block_idx = usize::MAX; + return None; + } + }; + + // Return if tombstone + if block.is_tombstone { + if block.pre_block_index.is_none() { + self.block_idx = usize::MAX; + return None; + } + self.block_idx = block.pre_block_index.unwrap(); + continue; + } + + if block.min_ts.is_commit_ts() && self.start_ts.raw() < block.min_ts.raw() { + self.block_idx = block.pre_block_index.unwrap_or(usize::MAX); + self.offset = 0; + continue; + } + + // Move to next block + if self.offset == BLOCK_CAPACITY { + self.offset = 0; + self.block_idx = block.pre_block_index.unwrap_or(usize::MAX); continue; } @@ -55,13 +150,26 @@ impl Iterator for AdjacencyIterator<'_> { // Scan next block once scanned empty edge if raw.label_id == NonZeroU32::new(1) && raw.dst_id == 1 { self.offset = 0; - self.block_idx = if let Some(idx) = block.pre_block_index { - idx + self.block_idx = block.pre_block_index.unwrap_or(usize::MAX); + continue; + } + + // Visibility filtering by edge commit_ts using snapshot start_ts + if raw.commit_ts.is_txn_id() { + if let Some(txn_id) = self.txn_id { + if raw.commit_ts != txn_id { + self.offset += 1; + continue; + } } else { - usize::MAX - }; + self.offset += 1; + continue; + } + } else if raw.commit_ts.raw() > self.start_ts.raw() { + self.offset += 1; continue; } + // Build edge result let edge = OlapEdge { label_id: raw.label_id, @@ -69,7 +177,6 @@ impl Iterator for AdjacencyIterator<'_> { dst_id: raw.dst_id, properties: { let mut props = OlapPropertyStore::default(); - for (col_idx, column) in self .storage .property_columns @@ -82,9 +189,15 @@ impl Iterator for AdjacencyIterator<'_> { .blocks .get(self.block_idx) .and_then(|blk| blk.values.get(self.offset)) - .cloned() + .and_then(|versions| { + crate::ap::olap_graph::prop_value_visible_at( + versions, + self.txn_id, + self.start_ts, + ) + }) { - props.set_prop(col_idx, val); + props.set_prop(col_idx, Some(val)); } } props @@ -93,11 +206,8 @@ impl Iterator for AdjacencyIterator<'_> { self.offset += 1; return Some(Ok(edge)); } - self.block_idx = if let Some(idx) = block.pre_block_index { - idx - } else { - usize::MAX - }; + + self.block_idx = block.pre_block_index.unwrap_or(usize::MAX); } None } diff --git a/minigu/storage/src/ap/iterators/edge_iterator.rs b/minigu/storage/src/ap/iterators/edge_iterator.rs index 146aad11..5794f4b3 100644 --- a/minigu/storage/src/ap/iterators/edge_iterator.rs +++ b/minigu/storage/src/ap/iterators/edge_iterator.rs @@ -1,9 +1,12 @@ use std::num::NonZeroU32; +use minigu_transaction::Timestamp; + use crate::ap::olap_graph::{OlapEdge, OlapPropertyStore, OlapStorage, OlapStorageEdge}; use crate::error::StorageError; const BLOCK_CAPACITY: usize = 256; + pub struct EdgeIter<'a> { pub storage: &'a OlapStorage, // Index of the current block @@ -68,9 +71,11 @@ impl Iterator for EdgeIter<'_> { .blocks .get(self.block_idx) .and_then(|blk| blk.values.get(self.offset)) - .cloned() + .and_then(|versions| { + crate::ap::olap_graph::latest_committed_prop_value(versions) + }) { - props.set_prop(col_idx, val); + props.set_prop(col_idx, Some(val)); } } props @@ -84,3 +89,118 @@ impl Iterator for EdgeIter<'_> { None } } + +pub struct EdgeIterAtTs<'a> { + pub storage: &'a OlapStorage, + // Index of the current block + pub block_idx: usize, + // Offset within block + pub offset: usize, + pub txn_id: Option, + pub start_ts: Timestamp, +} +impl Iterator for EdgeIterAtTs<'_> { + type Item = Result; + + fn next(&mut self) -> Option { + // 1. Scan Block + let edges = self.storage.edges.read().unwrap(); + while self.block_idx < edges.len() { + // 1.1 If none,move to next block + let borrow = self.storage.edges.read().unwrap(); + let block = match borrow.get(self.block_idx) { + Some(block) => block, + None => { + self.block_idx += 1; + self.offset = 0; + continue; + } + }; + if block.is_tombstone { + self.block_idx += 1; + self.offset = 0; + continue; + } + // Block-level timestamp filter + if block.min_ts.is_commit_ts() && self.start_ts.raw() < block.min_ts.raw() { + self.block_idx += 1; + self.offset = 0; + continue; + } + // 1.2 If one block has been finished,move to next + if self.offset == BLOCK_CAPACITY { + self.offset = 0; + self.block_idx += 1; + continue; + } + // 2. Scan within block + if self.offset < block.edges.len() { + let raw: &OlapStorageEdge = &block.edges[self.offset]; + // 2.1 Determine logical end of block and skip tombstones + // Use eid == 0 as the end-of-block sentinel to avoid + // confusing transactional tombstones with padding. + if raw.eid == 0 { + // No more valid edges in this block; move to next block. + self.offset = 0; + self.block_idx += 1; + continue; + } + // Transactional delete tombstone: skip this edge but continue scanning. + if raw.label_id == NonZeroU32::new(1) && raw.dst_id == 1 { + self.offset += 1; + continue; + } + // 2.2 Visibility filtering by edge commit_ts + let is_visible = if raw.commit_ts.is_txn_id() { + self.txn_id == Some(raw.commit_ts) + } else { + raw.commit_ts.raw() <= self.start_ts.raw() + }; + + if !is_visible { + self.offset += 1; + continue; + } + + // 2.3 Build edge result + let edge = OlapEdge { + label_id: raw.label_id, + src_id: block.src_id, + dst_id: raw.dst_id, + properties: { + let mut props = OlapPropertyStore::default(); + + for (col_idx, column) in self + .storage + .property_columns + .read() + .unwrap() + .iter() + .enumerate() + { + if let Some(val) = column + .blocks + .get(self.block_idx) + .and_then(|blk| blk.values.get(self.offset)) + .and_then(|versions| { + crate::ap::olap_graph::prop_value_visible_at( + versions, + self.txn_id, + self.start_ts, + ) + }) + { + props.set_prop(col_idx, Some(val)); + } + } + props + }, + }; + // 2.4 Increase offset + self.offset += 1; + return Some(Ok(edge)); + } + } + None + } +} diff --git a/minigu/storage/src/ap/mod.rs b/minigu/storage/src/ap/mod.rs index 9dd49f01..56d96c74 100644 --- a/minigu/storage/src/ap/mod.rs +++ b/minigu/storage/src/ap/mod.rs @@ -1,5 +1,6 @@ pub mod iterators; pub mod olap_graph; pub mod olap_storage; +pub mod transaction; pub use olap_storage::{MutOlapGraph, OlapGraph, StorageTransaction}; diff --git a/minigu/storage/src/ap/olap_graph.rs b/minigu/storage/src/ap/olap_graph.rs index b0b1178a..c3a8326f 100644 --- a/minigu/storage/src/ap/olap_graph.rs +++ b/minigu/storage/src/ap/olap_graph.rs @@ -7,12 +7,16 @@ use bitvec::bitvec; use bitvec::prelude::Lsb0; use bitvec::vec::BitVec; use dashmap::DashMap; -use minigu_common::types::{LabelId, VertexId}; +use minigu_common::types::{EdgeId, LabelId, VertexId}; use minigu_common::value::ScalarValue; +use minigu_transaction::Timestamp; use serde::{Deserialize, Serialize}; -use crate::ap::iterators::{AdjacencyIterator, EdgeIter, VertexIter}; +use crate::ap::iterators::{ + AdjacencyIterator, AdjacencyIteratorAtTs, EdgeIter, EdgeIterAtTs, VertexIter, +}; use crate::ap::olap_storage::{MutOlapGraph, OlapGraph}; +use crate::common::DeltaOp; use crate::error::EdgeNotFoundError::EdgeNotFound; use crate::error::VertexNotFoundError::VertexNotFound; use crate::error::{StorageError, StorageResult}; @@ -56,15 +60,19 @@ impl Hash for OlapVertex { #[derive(Clone, Debug, Copy)] pub struct OlapStorageEdge { // Edge data + pub eid: EdgeId, // Global unique edge identifier pub label_id: Option, pub dst_id: VertexId, + pub commit_ts: Timestamp, } impl OlapStorageEdge { // (Temporarily) Stands for null fn default() -> OlapStorageEdge { OlapStorageEdge { + eid: 0, label_id: NonZeroU32::new(TOMBSTONE_LABEL_ID), dst_id: TOMBSTONE_DST_ID, + commit_ts: Timestamp::with_ts(0), } } } @@ -100,6 +108,37 @@ impl OlapPropertyStore { } } +pub(crate) fn latest_prop_value(versions: &[PropertyVersion]) -> Option { + versions.last().and_then(|v| v.value.clone()) +} + +pub(crate) fn prop_value_visible_at( + versions: &[PropertyVersion], + txn_id: Option, + start_ts: Timestamp, +) -> Option { + for v in versions.iter().rev() { + if v.ts.is_txn_id() { + if Some(v.ts) == txn_id { + return v.value.clone(); + } + continue; + } + if v.ts.raw() <= start_ts.raw() { + return v.value.clone(); + } + } + None +} + +pub(crate) fn latest_committed_prop_value(versions: &[PropertyVersion]) -> Option { + versions + .iter() + .rev() + .find(|v| !v.ts.is_txn_id()) + .and_then(|v| v.value.clone()) +} + // Block of edge array (Header + Actual Storage + MVCC) #[derive(Clone, Debug)] pub struct EdgeBlock { @@ -115,6 +154,8 @@ pub struct EdgeBlock { // Min and max to id (However may not be used) pub max_dst_id: VertexId, pub min_dst_id: VertexId, + pub min_ts: Timestamp, + pub max_ts: Timestamp, // Edge storage pub src_id: VertexId, pub edge_counter: usize, @@ -135,6 +176,8 @@ pub struct CompressedEdgeBlock { // Min and max to id (Vid) pub max_dst_id: VertexId, pub min_dst_id: VertexId, + pub min_ts: Timestamp, + pub max_ts: Timestamp, // Edge storage pub src_id: VertexId, pub edge_counter: usize, @@ -142,13 +185,23 @@ pub struct CompressedEdgeBlock { pub first_dst_id: VertexId, pub compressed_dst_ids: BitVec, pub label_ids: [Option; BLOCK_CAPACITY], + pub version_ts: Timestamp, +} + +// Property value with commit timestamp for MVCC reads +#[derive(Clone, Debug)] +pub struct PropertyVersion { + pub ts: Timestamp, + pub value: Option, } // Property block (Column storage) #[derive(Clone, Debug)] pub struct PropertyBlock { - /// Property storage - pub values: Vec>, + pub min_ts: Timestamp, + pub max_ts: Timestamp, + /// Property storage: one version list per edge slot + pub values: Vec>, } // Property column storage #[derive(Debug)] @@ -160,6 +213,9 @@ pub struct PropertyColumn { #[derive(Clone, Debug)] #[allow(dead_code)] pub struct CompressedPropertyBlock { + pub min_ts: Timestamp, + pub max_ts: Timestamp, + pub version_ts: Timestamp, pub bitmap: BitVec, // Stands for numbers not null elements in every 16 elements pub offsets: [u8; BLOCK_CAPACITY / 16], @@ -175,8 +231,12 @@ pub struct CompressedPropertyColumn { pub struct OlapStorage { // For allocating vertex logical id pub logic_id_counter: AtomicU64, + // For allocating edge id + pub edge_id_counter: AtomicU64, // Actual id to logical id mapping pub dense_id_map: DashMap, + // EdgeId to (block_idx, offset) mapping for fast lookup + pub edge_id_map: DashMap, // Vertex array (Use lock for without MVCC) pub vertices: RwLock>, // Edge array @@ -244,6 +304,9 @@ impl OlapStorage { } label_ids[0] = edges[0].label_id; + let version_ts = minigu_transaction::global_timestamp_generator() + .next() + .unwrap(); // 3.3 Build compressed edge block self.compressed_edges.write().unwrap().insert( index, @@ -254,12 +317,15 @@ impl OlapStorage { min_label_id: edge_block.min_label_id, max_dst_id: edge_block.max_dst_id, min_dst_id: edge_block.min_dst_id, + min_ts: edge_block.min_ts, + max_ts: edge_block.max_ts, src_id: edge_block.src_id, edge_counter: edge_block.edge_counter, delta_bit_width: bit_width, first_dst_id: edge_block.edges[0].dst_id, compressed_dst_ids, label_ids, + version_ts, }, ) } @@ -278,6 +344,9 @@ impl OlapStorage { let mut compressed_properties = self.compressed_properties.write().unwrap(); let _column_cnt = property_columns.len(); + let version_ts = minigu_transaction::global_timestamp_generator() + .next() + .unwrap(); // 3. Traverse property columns for (column_index, column) in property_columns.iter().enumerate() { @@ -288,14 +357,11 @@ impl OlapStorage { let mut values: Vec = Vec::new(); let mut offsets: [u8; BLOCK_CAPACITY / 16] = [0u8; BLOCK_CAPACITY / 16]; - for (value_index, value_option) in block.values.iter().enumerate() { - if value_option.is_none() { - continue; + for (value_index, versions) in block.values.iter().enumerate() { + if let Some(latest) = latest_committed_prop_value(versions) { + bitmap.set(value_index, true); + values.push(latest); } - - // Should not panic - bitmap.set(value_index, true); - values.push(value_option.clone().unwrap()); } for (chunk_index, offset) in @@ -312,6 +378,9 @@ impl OlapStorage { compressed_blocks.blocks.insert( block_index, CompressedPropertyBlock { + min_ts: block.min_ts, + max_ts: block.max_ts, + version_ts, bitmap, offsets, values, @@ -323,16 +392,372 @@ impl OlapStorage { let _ = std::mem::take(&mut *property_columns); } + + /// Transactional variant: write using transaction's txn_id and record undo + #[allow(dead_code)] + pub fn create_edge_in_txn( + &self, + txn: &crate::ap::transaction::MemTransaction, + edge: OlapEdge, + ) -> StorageResult { + use crate::common::model::edge::Edge as CommonEdge; + + // 1. Found vertex + let dense_id = *self.dense_id_map.get(&edge.src_id).ok_or_else(|| { + StorageError::VertexNotFound(VertexNotFound(format!( + "Source vertex {} not found", + edge.src_id + ))) + })?; + let mut binding = self.vertices.write().unwrap(); + let vertex = binding.get_mut(dense_id as usize).ok_or_else(|| { + StorageError::VertexNotFound(VertexNotFound(format!( + "Source vertex {} not found", + edge.src_id + ))) + })?; + + // 2. Initial block (lazy load) if not exists + if vertex.block_offset == usize::MAX { + let index = self.edges.read().unwrap().len(); + self.edges.write().unwrap().push(EdgeBlock { + pre_block_index: None, + cur_block_index: index, + is_tombstone: false, + max_label_id: NonZeroU32::new(1), + min_label_id: NonZeroU32::new(u32::MAX), + max_dst_id: 0, + min_dst_id: u64::MAX, + min_ts: txn.txn_id, + max_ts: txn.txn_id, + edge_counter: 0, + src_id: edge.src_id, + edges: [OlapStorageEdge::default(); BLOCK_CAPACITY], + }); + vertex.block_offset = index; + } else { + let edge_count = self + .edges + .read() + .unwrap() + .get(vertex.block_offset) + .ok_or_else(|| { + StorageError::EdgeNotFound(EdgeNotFound(format!( + "Vertex {} not found", + vertex.vid + ))) + })? + .edge_counter; + if edge_count >= BLOCK_CAPACITY { + let index = self.edges.read().unwrap().len(); + self.edges.write().unwrap().push(EdgeBlock { + pre_block_index: Option::from(vertex.block_offset), + cur_block_index: index, + is_tombstone: false, + max_label_id: NonZeroU32::new(1), + min_label_id: NonZeroU32::new(u32::MAX), + max_dst_id: 0, + min_dst_id: u64::MAX, + min_ts: txn.txn_id, + max_ts: txn.txn_id, + src_id: edge.src_id, + edge_counter: 0, + edges: [OlapStorageEdge::default(); BLOCK_CAPACITY], + }); + vertex.block_offset = index; + } + } + + // 4. Allocate global unique EdgeId + let eid = self.edge_id_counter.fetch_add(1, Ordering::SeqCst); + + // 5. Insert edge + let mut binding = self.edges.write().unwrap(); + let block = binding.get_mut(vertex.block_offset).ok_or_else(|| { + StorageError::EdgeNotFound(EdgeNotFound(format!( + "Edge block for vertex {} not found", + vertex.vid + ))) + })?; + + let insert_pos = block.edges[..block.edge_counter] + .binary_search_by_key(&(&edge.dst_id, &edge.label_id), |e| { + (&e.dst_id, &e.label_id) + }) + .unwrap_or_else(|e| e); + + for i in (insert_pos..block.edge_counter).rev() { + block.edges[i + 1] = block.edges[i]; + } + block.edge_counter += 1; + + // Update mapping for moved edges (shifted right by one) + for i in (insert_pos + 1)..block.edge_counter { + let moved_eid = block.edges[i].eid; + if moved_eid != 0 { + self.edge_id_map.insert(moved_eid, (vertex.block_offset, i)); + } + } + + // set commit_ts to txn id and store eid + block.edges[insert_pos] = OlapStorageEdge { + eid, + label_id: edge.label_id, + dst_id: edge.dst_id, + commit_ts: txn.txn_id, + }; + + // Build EdgeId to location mapping + self.edge_id_map + .insert(eid, (vertex.block_offset, insert_pos)); + + // ensure property columns exist based on edge properties + let mut property_columns = self.property_columns.write().unwrap(); + while property_columns.len() < edge.properties.properties.len() { + property_columns.push(PropertyColumn { blocks: Vec::new() }); + } + + // insert properties + for (i, column) in property_columns.iter_mut().enumerate() { + let property_block = if let Some(block) = column.blocks.get_mut(vertex.block_offset) { + block + } else { + column.blocks.insert( + vertex.block_offset, + PropertyBlock { + min_ts: txn.txn_id, + max_ts: txn.txn_id, + values: vec![Vec::new(); BLOCK_CAPACITY], + }, + ); + column.blocks.get_mut(vertex.block_offset).unwrap() + }; + + property_block.min_ts = property_block.min_ts.min(txn.txn_id); + property_block.max_ts = property_block.max_ts.max(txn.txn_id); + + for j in (insert_pos..block.edge_counter - 1).rev() { + property_block.values[j + 1] = property_block.values[j].clone(); + } + + let ts = txn.txn_id; + if let Some(property_value) = edge.properties.get(i) { + property_block.values[insert_pos].push(PropertyVersion { + ts, + value: Some(property_value), + }); + } else { + property_block.values[insert_pos].push(PropertyVersion { ts, value: None }); + } + } + // update block header using txn id + block.min_dst_id = edge.dst_id.min(block.min_dst_id); + block.max_dst_id = edge.dst_id.max(block.max_dst_id); + block.max_label_id = edge.label_id.max(block.max_label_id); + block.min_label_id = edge.label_id.min(block.min_label_id); + block.min_ts = block.min_ts.min(txn.txn_id); + block.max_ts = block.max_ts.max(txn.txn_id); + + // push undo entry + let common_edge = CommonEdge::new( + eid, + edge.src_id, + edge.dst_id, + edge.label_id.unwrap_or_else(|| NonZeroU32::new(1).unwrap()), + PropertyRecord::default(), + ); + txn.push_undo(DeltaOp::CreateEdge(common_edge), txn.txn_id); + + Ok(eid) + } + + /// Transactional variant of `set_edge_property`. + /// Sets properties and marks the edge's `commit_ts` to `txn.txn_id`, and records an undo entry. + #[allow(dead_code)] + pub fn set_edge_property_in_txn( + &self, + txn: &crate::ap::transaction::MemTransaction, + eid: ::EdgeID, + indices: Vec, + props: Vec, + ) -> StorageResult<()> { + if indices.len() != props.len() { + return Err(StorageError::NotSupported(format!( + "indices/props length mismatch: {} vs {}", + indices.len(), + props.len() + ))); + } + + // Use EdgeId mapping for fast lookup + let (block_idx, offset) = *self + .edge_id_map + .get(&eid) + .ok_or_else(|| { + StorageError::EdgeNotFound(EdgeNotFound(format!("Edge {} not found", eid))) + })? + .value(); + + let mut edges_lock = self.edges.write().unwrap(); + let block = edges_lock.get_mut(block_idx).ok_or_else(|| { + StorageError::EdgeNotFound(EdgeNotFound(format!("Edge block {} not found", block_idx))) + })?; + + if block.is_tombstone || offset >= block.edge_counter { + return Err(StorageError::EdgeNotFound(EdgeNotFound(format!( + "Edge {} not found", + eid + )))); + } + + let edge = &mut block.edges[offset]; + if edge.eid != eid { + return Err(StorageError::EdgeNotFound(EdgeNotFound(format!( + "Edge {} not found", + eid + )))); + } + + let mut old_props: Vec = Vec::new(); + { + let property_columns = self.property_columns.read().unwrap(); + if indices.iter().any(|&idx| idx >= property_columns.len()) { + return Err(StorageError::NotSupported( + "property index out of range".to_string(), + )); + } + // collect old property values for undo + for &idx in indices.iter() { + if let Some(col) = property_columns.get(idx) + && let Some(pb) = col.blocks.get(block_idx) + && let Some(versions) = pb.values.get(offset) + && let Some(v) = latest_prop_value(versions) + { + old_props.push(v); + } else { + old_props.push(minigu_common::value::ScalarValue::Null); + } + } + } + + // capture old commit_ts + let old_commit_ts = edge.commit_ts; + + let set_op = crate::common::SetPropsOp { + indices: indices.clone(), + props: old_props.clone(), + }; + // Push SetEdgeProps with the previous edge snapshot and the SetPropsOp + txn.push_undo( + crate::common::DeltaOp::SetEdgeProps(eid, set_op), + old_commit_ts, + ); + + // mark edge as modified by txn + edge.commit_ts = txn.txn_id; + // update block-level ts + block.min_ts = block.min_ts.min(txn.txn_id); + block.max_ts = block.max_ts.max(txn.txn_id); + + // update properties + let mut property_columns = self.property_columns.write().unwrap(); + for (i, prop) in indices.into_iter().zip(props.into_iter()) { + let column = &mut property_columns[i]; + // ensure property block exists for this block index + if column.blocks.get(block_idx).is_none() { + column.blocks.insert( + block_idx, + PropertyBlock { + min_ts: txn.txn_id, + max_ts: txn.txn_id, + values: vec![Vec::new(); BLOCK_CAPACITY], + }, + ); + } + let property_block = &mut column.blocks[block_idx]; + property_block.min_ts = property_block.min_ts.min(txn.txn_id); + property_block.max_ts = property_block.max_ts.max(txn.txn_id); + property_block.values[offset].push(PropertyVersion { + ts: txn.txn_id, + value: Some(prop), + }); + } + Ok(()) + } + + /// Transactional variant of `delete_edge`. + /// Deletes the edge (in-place removal) and records undo entry with the edge id. + #[allow(dead_code)] + pub fn delete_edge_in_txn( + &self, + txn: &crate::ap::transaction::MemTransaction, + eid: ::EdgeID, + ) -> StorageResult<()> { + // Use EdgeId mapping for fast lookup + let (block_idx, offset) = *self + .edge_id_map + .get(&eid) + .ok_or_else(|| { + StorageError::EdgeNotFound(EdgeNotFound(format!("Edge {} not found", eid))) + })? + .value(); + + let mut edges_lock = self.edges.write().unwrap(); + let (old_commit_ts, old_label_id, old_dst_id) = { + let block = edges_lock.get(block_idx).ok_or_else(|| { + StorageError::EdgeNotFound(EdgeNotFound(format!( + "Edge block {} not found", + block_idx + ))) + })?; + + if block.is_tombstone || offset >= block.edge_counter { + return Err(StorageError::EdgeNotFound(EdgeNotFound(format!( + "Edge {} not found", + eid + )))); + } + + let edge_data = &block.edges[offset]; + if edge_data.eid != eid { + return Err(StorageError::EdgeNotFound(EdgeNotFound(format!( + "Edge {} not found", + eid + )))); + } + + (edge_data.commit_ts, edge_data.label_id, edge_data.dst_id) + }; + + // Save old_commit_ts as timestamp in undo entry + // Record original identifiers for rollback + txn.record_deleted_edge(eid, old_label_id, old_dst_id); + // Push DelEdge with the previous edge id (old_commit_ts passed as timestamp) + txn.push_undo(crate::common::DeltaOp::DelEdge(eid), old_commit_ts); + + // Mark edge as deleted: set tombstone values and stamp txn_id + let edge_block = &mut edges_lock[block_idx]; + edge_block.edges[offset].label_id = NonZeroU32::new(TOMBSTONE_LABEL_ID); + edge_block.edges[offset].dst_id = TOMBSTONE_DST_ID; + edge_block.edges[offset].commit_ts = txn.txn_id; + + edge_block.min_ts = edge_block.min_ts.min(txn.txn_id); + edge_block.max_ts = edge_block.max_ts.max(txn.txn_id); + + Ok(()) + } } impl OlapGraph for OlapStorage { type Adjacency = OlapEdge; type AdjacencyIter<'a> = AdjacencyIterator<'a>; + type AdjacencyIterAtTs<'a> = AdjacencyIteratorAtTs<'a>; type Edge = OlapEdge; // TODO: type EdgeID = EdgeId; - type EdgeID = Option; + type EdgeID = EdgeId; type EdgeIter<'a> = EdgeIter<'a>; - type Transaction = (); + type EdgeIterAtTs<'a> = EdgeIterAtTs<'a>; + type Transaction = crate::ap::transaction::MemTransaction; type Vertex = OlapVertex; type VertexID = VertexId; type VertexIter<'a> = VertexIter<'a>; @@ -362,54 +787,136 @@ impl OlapGraph for OlapStorage { Ok(vertex.clone()) } - fn get_edge(&self, _txn: &Self::Transaction, eid: Self::EdgeID) -> StorageResult { - for (block_idx, block) in self.edges.read().unwrap().iter().enumerate() { - if block.is_tombstone { - continue; - } + fn get_edge(&self, txn: &Self::Transaction, eid: Self::EdgeID) -> StorageResult { + // Use EdgeId mapping for fast lookup + let (block_idx, offset) = *self + .edge_id_map + .get(&eid) + .ok_or_else(|| { + StorageError::EdgeNotFound(EdgeNotFound(format!("Edge {} not found", eid))) + })? + .value(); + + let edges = self.edges.read().unwrap(); + let block = edges.get(block_idx).ok_or_else(|| { + StorageError::EdgeNotFound(EdgeNotFound(format!("Edge block {} not found", block_idx))) + })?; - let min = block.min_label_id; - let max = block.max_label_id; - // Locate edge block - if eid < min || eid > max { - continue; - } + if block.is_tombstone || offset >= block.edge_counter { + return Err(StorageError::EdgeNotFound(EdgeNotFound(format!( + "Edge {} not found", + eid + )))); + } - // 1. Traverse edge iterator - for (offset, edge) in block.edges.iter().enumerate() { - if edge.label_id == eid { - let edge_with_props = OlapEdge { - label_id: edge.label_id, - src_id: block.src_id, - dst_id: edge.dst_id, - // 2. Get edge properties - properties: { - let mut props = OlapPropertyStore { - properties: Vec::new(), - }; - for (col_idx, column) in - self.property_columns.read().unwrap().iter().enumerate() - { - if let Some(val) = column - .blocks - .get(block_idx) - .and_then(|blk| blk.values.get(offset)) - .cloned() - { - props.set_prop(col_idx, val); - } - } - props - }, - }; - return Ok(edge_with_props); + let edge = &block.edges[offset]; + if edge.label_id == NonZeroU32::new(TOMBSTONE_LABEL_ID) && edge.dst_id == TOMBSTONE_DST_ID { + return Err(StorageError::EdgeNotFound(EdgeNotFound(format!( + "Edge {} not found", + eid + )))); + } + if edge.eid != eid { + return Err(StorageError::EdgeNotFound(EdgeNotFound(format!( + "Edge {} not found", + eid + )))); + } + + let edge_with_props = OlapEdge { + label_id: edge.label_id, + src_id: block.src_id, + dst_id: edge.dst_id, + properties: { + let mut props = OlapPropertyStore { + properties: Vec::new(), + }; + for (col_idx, column) in self.property_columns.read().unwrap().iter().enumerate() { + if let Some(val) = column + .blocks + .get(block_idx) + .and_then(|blk| blk.values.get(offset)) + .and_then(|versions| { + prop_value_visible_at(versions, Some(txn.txn_id), txn.start_ts) + }) + { + props.set_prop(col_idx, Some(val)); + } } - } + props + }, + }; + Ok(edge_with_props) + } + + fn get_edge_at_ts( + &self, + txn: &Self::Transaction, + eid: Self::EdgeID, + ) -> StorageResult> { + // Use EdgeId mapping for fast lookup + let (block_idx, offset) = match self.edge_id_map.get(&eid) { + Some(loc) => *loc.value(), + None => return Ok(None), + }; + + let edges = self.edges.read().unwrap(); + let block = match edges.get(block_idx) { + Some(b) => b, + None => return Ok(None), + }; + + if block.is_tombstone || offset >= block.edge_counter { + return Ok(None); + } + + if block.min_ts.is_commit_ts() && txn.start_ts.raw() < block.min_ts.raw() { + return Ok(None); } - Err(StorageError::EdgeNotFound(EdgeNotFound(format!( - "Edge {} not found", - eid.unwrap() - )))) + + let edge = &block.edges[offset]; + if edge.eid != eid { + return Ok(None); + } + + if edge.label_id == NonZeroU32::new(TOMBSTONE_LABEL_ID) && edge.dst_id == TOMBSTONE_DST_ID { + return Ok(None); + } + + let is_visible = if edge.commit_ts.is_txn_id() { + edge.commit_ts.raw() == txn.txn_id.raw() + } else { + edge.commit_ts.raw() <= txn.start_ts.raw() + }; + + if !is_visible { + return Ok(None); + } + + let edge_with_props = OlapEdge { + label_id: edge.label_id, + src_id: block.src_id, + dst_id: edge.dst_id, + properties: { + let mut props = OlapPropertyStore { + properties: Vec::new(), + }; + for (col_idx, column) in self.property_columns.read().unwrap().iter().enumerate() { + if let Some(val) = column + .blocks + .get(block_idx) + .and_then(|blk| blk.values.get(offset)) + .and_then(|versions| { + prop_value_visible_at(versions, Some(txn.txn_id), txn.start_ts) + }) + { + props.set_prop(col_idx, Some(val)); + } + } + props + }, + }; + Ok(Some(edge_with_props)) } fn iter_vertices<'a>( @@ -430,6 +937,19 @@ impl OlapGraph for OlapStorage { }) } + fn iter_edges_at_ts<'a>( + &'a self, + txn: &'a Self::Transaction, + ) -> StorageResult> { + Ok(EdgeIterAtTs { + storage: self, + block_idx: 0, + offset: 0, + txn_id: Some(txn.txn_id), + start_ts: txn.start_ts, + }) + } + fn iter_adjacency<'a>( &'a self, _txn: &'a Self::Transaction, @@ -444,6 +964,22 @@ impl OlapGraph for OlapStorage { offset: 0, }) } + + fn iter_adjacency_at_ts<'a>( + &'a self, + txn: &'a Self::Transaction, + vid: Self::VertexID, + ) -> StorageResult> { + let vertex = self.get_vertex(txn, vid)?; + Ok(AdjacencyIteratorAtTs { + storage: self, + vertex_id: vid, + block_idx: vertex.block_offset, + offset: 0, + txn_id: Some(txn.txn_id), + start_ts: txn.start_ts, + }) + } } impl MutOlapGraph for OlapStorage { @@ -507,6 +1043,8 @@ impl MutOlapGraph for OlapStorage { min_label_id: NonZeroU32::new(u32::MAX), max_dst_id: 0, min_dst_id: u64::MAX, + min_ts: Timestamp::max_commit_ts(), + max_ts: Timestamp::with_ts(0), edge_counter: 0, src_id: edge.src_id, edges: [OlapStorageEdge::default(); BLOCK_CAPACITY], @@ -536,6 +1074,8 @@ impl MutOlapGraph for OlapStorage { min_label_id: NonZeroU32::new(u32::MAX), max_dst_id: 0, min_dst_id: u64::MAX, + min_ts: Timestamp::max_commit_ts(), + max_ts: Timestamp::with_ts(0), src_id: edge.src_id, edge_counter: 0, edges: [OlapStorageEdge::default(); BLOCK_CAPACITY], @@ -544,8 +1084,11 @@ impl MutOlapGraph for OlapStorage { } } - // 4. Insert edge - // 4.1 Calculate position + // 4. Allocate global unique EdgeId + let eid = self.edge_id_counter.fetch_add(1, Ordering::SeqCst); + + // 5. Insert edge + // 5.1 Calculate position let mut binding = self.edges.write().unwrap(); let block = binding.get_mut(vertex.block_offset).ok_or_else(|| { StorageError::EdgeNotFound(EdgeNotFound(format!( @@ -559,19 +1102,33 @@ impl MutOlapGraph for OlapStorage { }) .unwrap_or_else(|e| e); - // 4.2 Move elements + // 5.2 Move elements for i in (insert_pos..block.edge_counter).rev() { block.edges[i + 1] = block.edges[i]; } block.edge_counter += 1; - // 4.3 Actual insert + // Update mapping for moved edges (shifted right by one) + for i in (insert_pos + 1)..block.edge_counter { + let moved_eid = block.edges[i].eid; + if moved_eid != 0 { + self.edge_id_map.insert(moved_eid, (vertex.block_offset, i)); + } + } + + // 5.3 Actual insert with eid block.edges[insert_pos] = OlapStorageEdge { + eid, label_id: edge.label_id, dst_id: edge.dst_id, + commit_ts: Timestamp::with_ts(0), }; - // 5. Insert properties + // Build EdgeId to location mapping + self.edge_id_map + .insert(eid, (vertex.block_offset, insert_pos)); + + // 6. Insert properties for (i, column) in self .property_columns .write() @@ -579,43 +1136,51 @@ impl MutOlapGraph for OlapStorage { .iter_mut() .enumerate() { - // 5.1 Get property block or allocate one + // 6.1 Get property block or allocate one let property_block = if let Some(block) = column.blocks.get_mut(vertex.block_offset) { block } else { column.blocks.insert( vertex.block_offset, PropertyBlock { - values: vec![None; BLOCK_CAPACITY], + values: vec![Vec::new(); BLOCK_CAPACITY], + min_ts: Timestamp::max_commit_ts(), + max_ts: Timestamp::with_ts(0), }, ); column.blocks.get_mut(vertex.block_offset).unwrap() }; - // 5.2 Move property elements + // 6.2 Move property elements for j in (insert_pos..block.edge_counter - 1).rev() { property_block.values[j + 1] = property_block.values[j].clone(); } - // 5.3 Insert property + // 6.3 Insert property if let Some(property_value) = edge.properties.get(i) { - property_block.values[insert_pos] = Some(property_value); + property_block.values[insert_pos].push(PropertyVersion { + ts: Timestamp::with_ts(0), + value: Some(property_value), + }); } else { - property_block.values[insert_pos] = None; + property_block.values[insert_pos].push(PropertyVersion { + ts: Timestamp::with_ts(0), + value: None, + }); } } - // 6.Update block header + // 7. Update block header block.min_dst_id = edge.dst_id.min(block.min_dst_id); block.max_dst_id = edge.dst_id.max(block.max_dst_id); block.max_label_id = edge.label_id.max(block.max_label_id); block.min_label_id = edge.label_id.min(block.min_label_id); - Ok(edge.label_id) + Ok(eid) } fn delete_vertex(&self, _txn: &Self::Transaction, vid: Self::VertexID) -> StorageResult<()> { - let mut vertex_iter = self.iter_vertices(&())?; + let mut vertex_iter = self.iter_vertices(_txn)?; let mut is_found: bool = false; for vertex in vertex_iter.by_ref() { if vertex?.vid == vid { @@ -648,54 +1213,68 @@ impl MutOlapGraph for OlapStorage { } fn delete_edge(&self, _txn: &Self::Transaction, eid: Self::EdgeID) -> StorageResult<()> { - let mut edge_iter = self.iter_edges(&())?; + // Use EdgeId mapping for fast lookup + let (block_idx, offset) = *self + .edge_id_map + .get(&eid) + .ok_or_else(|| { + StorageError::EdgeNotFound(EdgeNotFound(format!("Edge {} not found", eid))) + })? + .value(); - let mut is_found: bool = false; - for edge in edge_iter.by_ref() { - if edge?.label_id == eid { - is_found = true; - break; - } - } + // Remove edge + let mut edge_blocks = self.edges.write().unwrap(); + let edge_block = &mut edge_blocks[block_idx]; - if !is_found { + if offset >= edge_block.edge_counter || edge_block.edges[offset].eid != eid { return Err(StorageError::EdgeNotFound(EdgeNotFound(format!( "Edge {} not found", - eid.unwrap() + eid )))); } - let block_idx = edge_iter.block_idx; - let offset = edge_iter.offset - 1; - - // Remove edge - let mut edge_blocks = self.edges.write().unwrap(); - let edge_block = &mut edge_blocks[block_idx]; let edges = &mut edge_block.edges; edge_block.edge_counter -= 1; if edge_block.edge_counter == 0 { edge_block.is_tombstone = true; + self.edge_id_map.remove(&eid); return Ok(()); } for i in offset..edge_block.edge_counter { + // Update mapping for moved edges + if edges[i + 1].eid != 0 { + self.edge_id_map.insert(edges[i + 1].eid, (block_idx, i)); + } edges[i] = edges[i + 1]; } edges[edge_block.edge_counter] = OlapStorageEdge { + eid: 0, label_id: NonZeroU32::new(1), dst_id: 1, + commit_ts: Timestamp::with_ts(0), }; + // Remove from mapping + self.edge_id_map.remove(&eid); + // Remove property let mut property_cols = self.property_columns.write().unwrap(); for property_col in property_cols.iter_mut() { - let property_block = &mut property_col.blocks[block_idx]; - let values = &mut property_block.values; - values.remove(offset); - values.push(None); + if let Some(property_block) = property_col.blocks.get_mut(block_idx) { + let values = &mut property_block.values; + for i in offset..edge_block.edge_counter { + if i + 1 < values.len() { + values[i] = values[i + 1].clone(); + } + } + if edge_block.edge_counter < values.len() { + values[edge_block.edge_counter] = Vec::new(); + } + } } Ok(()) @@ -728,26 +1307,53 @@ impl MutOlapGraph for OlapStorage { fn set_edge_property( &self, - txn: &Self::Transaction, + _txn: &Self::Transaction, eid: Self::EdgeID, indices: Vec, props: Vec, ) -> StorageResult<()> { - let mut iterator = self.iter_edges(txn)?; - while let Some(edge) = iterator.next() { - if edge?.label_id == eid { - for (index, prop) in indices.into_iter().zip(props.into_iter()) { - let mut property_column = self.property_columns.write().unwrap(); - let column = &mut property_column[index]; - let block = &mut column.blocks[iterator.block_idx]; - block.values[iterator.offset - 1] = Some(prop); - } - return Ok(()); + // Use EdgeId mapping for fast lookup + let (block_idx, offset) = *self + .edge_id_map + .get(&eid) + .ok_or_else(|| { + StorageError::EdgeNotFound(EdgeNotFound(format!("Edge {} not found", eid))) + })? + .value(); + + let edges = self.edges.read().unwrap(); + let block = edges.get(block_idx).ok_or_else(|| { + StorageError::EdgeNotFound(EdgeNotFound(format!("Edge block {} not found", block_idx))) + })?; + + if block.is_tombstone || offset >= block.edge_counter { + return Err(StorageError::EdgeNotFound(EdgeNotFound(format!( + "Edge {} not found", + eid + )))); + } + + if block.edges[offset].eid != eid { + return Err(StorageError::EdgeNotFound(EdgeNotFound(format!( + "Edge {} not found", + eid + )))); + } + drop(edges); + + // Update properties + let mut property_column = self.property_columns.write().unwrap(); + for (index, prop) in indices.into_iter().zip(props.into_iter()) { + if let Some(column) = property_column.get_mut(index) + && let Some(block) = column.blocks.get_mut(block_idx) + && offset < block.values.len() + { + block.values[offset].push(PropertyVersion { + ts: Timestamp::with_ts(0), + value: Some(prop), + }); } } - Err(StorageError::EdgeNotFound(EdgeNotFound(format!( - "Edge {} not found", - eid.unwrap() - )))) + Ok(()) } } diff --git a/minigu/storage/src/ap/olap_storage.rs b/minigu/storage/src/ap/olap_storage.rs index 6e761691..0f3b0e42 100644 --- a/minigu/storage/src/ap/olap_storage.rs +++ b/minigu/storage/src/ap/olap_storage.rs @@ -6,6 +6,9 @@ use crate::error::StorageResult; pub trait StorageTransaction { type CommitTimestamp; + /// Get the unique transaction ID. + fn txn_id(&self) -> minigu_transaction::Timestamp; + /// Commit the current transaction, returning a commit timestamp on success. fn commit(&self) -> StorageResult; @@ -25,9 +28,15 @@ pub trait OlapGraph { where Self: 'a; type EdgeIter<'a>: Iterator> + where + Self: 'a; + type EdgeIterAtTs<'a>: Iterator> where Self: 'a; type AdjacencyIter<'a>: Iterator> + where + Self: 'a; + type AdjacencyIterAtTs<'a>: Iterator> where Self: 'a; @@ -41,6 +50,13 @@ pub trait OlapGraph { /// Retrieve an edge by its ID within a transaction. fn get_edge(&self, txn: &Self::Transaction, id: Self::EdgeID) -> StorageResult; + /// Retrieve an edge by its ID at a specific timestamp within a transaction. + fn get_edge_at_ts( + &self, + txn: &Self::Transaction, + id: Self::EdgeID, + ) -> StorageResult>; + /// Get an iterator over all vertices in the graph within a transaction. fn iter_vertices<'a>( &'a self, @@ -49,6 +65,12 @@ pub trait OlapGraph { /// Get an iterator over all edges in the graph within a transaction. fn iter_edges<'a>(&'a self, txn: &'a Self::Transaction) -> StorageResult>; + /// Get an iterator over all edges in the graph at a specific timestamp within a transaction. + fn iter_edges_at_ts<'a>( + &'a self, + txn: &'a Self::Transaction, + ) -> StorageResult>; + /// Get an iterator over adjacency entries (edges connected to a vertex) /// in a given direction (incoming or outgoing) within a transaction. fn iter_adjacency<'a>( @@ -56,6 +78,13 @@ pub trait OlapGraph { txn: &'a Self::Transaction, vid: Self::VertexID, ) -> StorageResult>; + + /// Get an iterator over adjacency entries at a specific timestamp within a transaction. + fn iter_adjacency_at_ts<'a>( + &'a self, + txn: &'a Self::Transaction, + vid: Self::VertexID, + ) -> StorageResult>; } pub trait MutOlapGraph: OlapGraph { diff --git a/minigu/storage/src/ap/transaction.rs b/minigu/storage/src/ap/transaction.rs new file mode 100644 index 00000000..0691b84a --- /dev/null +++ b/minigu/storage/src/ap/transaction.rs @@ -0,0 +1,359 @@ +use std::collections::HashMap; +use std::num::NonZeroU32; +use std::sync::{Arc, OnceLock}; + +use minigu_common::types::{EdgeId, LabelId, VertexId}; +use minigu_transaction::{IsolationLevel, Timestamp, Transaction, global_timestamp_generator}; + +use crate::ap::olap_graph::{OlapStorage, OlapStorageEdge}; +use crate::common::DeltaOp; +use crate::error::{StorageError, StorageResult, TransactionError}; + +/// Minimal AP transaction that performs in-memory commit/abort +/// Behavior: +/// - Uses a txn id (Timestamp) to mark uncommitted entries in blocks +/// - On commit, allocates a commit_ts and replaces commit_ts fields equal to txn_id with the +/// assigned commit_ts, and updates block `min_ts`/`max_ts` accordingly. +pub struct MemTransaction { + pub storage: Arc, + pub txn_id: Timestamp, + pub start_ts: Timestamp, + pub isolation_level: IsolationLevel, + pub commit_ts: OnceLock, + /// Undo buffer: a sequence of DeltaOp timestamps recorded by the transaction. + /// For this minimal implementation we store pairs of (DeltaOp, timestamp) + pub undo_buffer: parking_lot::RwLock>, + /// Snapshots for edges soft-deleted in this txn (label_id, dst_id) + pub deleted_edge_snapshot: parking_lot::RwLock, VertexId)>>, +} + +impl MemTransaction { + pub fn new( + storage: Arc, + txn_id: Timestamp, + start_ts: Timestamp, + isolation_level: IsolationLevel, + ) -> Self { + Self { + storage, + txn_id, + start_ts, + isolation_level, + commit_ts: OnceLock::new(), + undo_buffer: parking_lot::RwLock::new(Vec::new()), + deleted_edge_snapshot: parking_lot::RwLock::new(HashMap::new()), + } + } + + /// Minimal commit: allocate commit_ts and apply in-memory replacements. + pub fn commit_at(&self, commit_ts_opt: Option) -> StorageResult { + let commit_ts = if let Some(ts) = commit_ts_opt { + global_timestamp_generator() + .update_if_greater(ts) + .map_err(TransactionError::Timestamp)?; + ts + } else { + global_timestamp_generator() + .next() + .map_err(TransactionError::Timestamp)? + }; + + if self.commit_ts.set(commit_ts).is_err() { + return Err(StorageError::Transaction( + TransactionError::TransactionAlreadyCommitted(format!("{:?}", commit_ts)), + )); + } + + // Walk undo buffer and for create/set/del edge ops, replace commit_ts markers + let undo_entries = self.undo_buffer.read().clone(); + let mut edges = self.storage.edges.write().unwrap(); + for (op, _ts) in undo_entries.into_iter() { + match op { + DeltaOp::CreateEdge(edge) => { + // Use EdgeId mapping to find and update commit_ts + if let Some(loc) = self.storage.edge_id_map.get(&edge.eid()) { + let (block_idx, offset) = *loc.value(); + if let Some(block) = edges.get_mut(block_idx) + && offset < block.edge_counter + && block.edges[offset].eid == edge.eid() + && block.edges[offset].commit_ts == self.txn_id + { + block.edges[offset].commit_ts = commit_ts; + block.min_ts = if block.min_ts.is_txn_id() { + commit_ts + } else { + block.min_ts.min(commit_ts) + }; + block.max_ts = if block.max_ts.is_txn_id() { + commit_ts + } else { + block.max_ts.max(commit_ts) + }; + // promote property versions written in this txn to committed + let mut prop_cols = self.storage.property_columns.write().unwrap(); + for column in prop_cols.iter_mut() { + if let Some(pb) = column.blocks.get_mut(block_idx) + && offset < pb.values.len() + && let Some(last) = pb.values[offset].last_mut() + && last.ts == self.txn_id + { + last.ts = commit_ts; + pb.min_ts = pb.min_ts.min(commit_ts); + pb.max_ts = pb.max_ts.max(commit_ts); + } + } + } + } + } + DeltaOp::SetEdgeProps(eid, _) => { + // Use EdgeId mapping to find and update commit_ts + if let Some(loc) = self.storage.edge_id_map.get(&eid) { + let (block_idx, offset) = *loc.value(); + if let Some(block) = edges.get_mut(block_idx) + && offset < block.edge_counter + && block.edges[offset].eid == eid + && block.edges[offset].commit_ts == self.txn_id + { + block.edges[offset].commit_ts = commit_ts; + block.min_ts = if block.min_ts.is_txn_id() { + commit_ts + } else { + block.min_ts.min(commit_ts) + }; + block.max_ts = if block.max_ts.is_txn_id() { + commit_ts + } else { + block.max_ts.max(commit_ts) + }; + // promote property versions written in this txn to committed + let mut prop_cols = self.storage.property_columns.write().unwrap(); + for column in prop_cols.iter_mut() { + if let Some(pb) = column.blocks.get_mut(block_idx) + && offset < pb.values.len() + && let Some(last) = pb.values[offset].last_mut() + && last.ts == self.txn_id + { + last.ts = commit_ts; + pb.min_ts = pb.min_ts.min(commit_ts); + pb.max_ts = pb.max_ts.max(commit_ts); + } + } + } + } + } + DeltaOp::DelEdge(eid) => { + // Use EdgeId mapping to find and update commit_ts + if let Some(loc) = self.storage.edge_id_map.get(&eid) { + let (block_idx, offset) = *loc.value(); + if let Some(block) = edges.get_mut(block_idx) + && offset < block.edge_counter + && block.edges[offset].eid == eid + && block.edges[offset].commit_ts == self.txn_id + { + block.edges[offset].commit_ts = commit_ts; + block.min_ts = if block.min_ts.is_txn_id() { + commit_ts + } else { + block.min_ts.min(commit_ts) + }; + block.max_ts = if block.max_ts.is_txn_id() { + commit_ts + } else { + block.max_ts.max(commit_ts) + }; + } + } + } + _ => {} + } + } + + // Clear deletion snapshots after commit bookkeeping + self.deleted_edge_snapshot.write().clear(); + self.undo_buffer.write().clear(); + + Ok(commit_ts) + } + + pub fn abort(&self) -> StorageResult<()> { + // Prevent abort from running after the transaction has been committed + if let Some(commit_ts) = self.commit_ts.get().copied() { + return Err(StorageError::Transaction( + TransactionError::TransactionAlreadyCommitted(format!( + "abort called on already committed transaction at {:?}", + commit_ts + )), + )); + } + // Apply undo entries in reverse order + let mut buffer = self.undo_buffer.write(); + let entries = buffer.clone(); + let mut edges = self.storage.edges.write().unwrap(); + for (op, old_ts) in entries.into_iter().rev() { + match op { + DeltaOp::CreateEdge(edge) => { + // Undo a creation -> remove the created edge using EdgeId + let eid = edge.eid(); + if let Some(loc) = self.storage.edge_id_map.get(&eid) { + let (block_idx, offset) = *loc.value(); + drop(loc); + if let Some(block) = edges.get_mut(block_idx) + && offset < block.edge_counter + && block.edges[offset].eid == eid + && block.edges[offset].commit_ts == self.txn_id + { + // remove it + for j in offset..block.edge_counter - 1 { + block.edges[j] = block.edges[j + 1]; + } + block.edge_counter -= 1; + for i in offset..block.edge_counter { + let moved_eid = block.edges[i].eid; + if moved_eid != 0 { + self.storage.edge_id_map.insert(moved_eid, (block_idx, i)); + } + } + block.edges[block.edge_counter] = OlapStorageEdge { + eid: 0, + label_id: NonZeroU32::new(1), + dst_id: 1, + commit_ts: Timestamp::with_ts(0), + }; + let mut property_cols = self.storage.property_columns.write().unwrap(); + for property_col in property_cols.iter_mut() { + if let Some(property_block) = property_col.blocks.get_mut(block_idx) + { + let values = &mut property_block.values; + for i in offset..block.edge_counter { + if i + 1 < values.len() { + values[i] = values[i + 1].clone(); + } + } + if block.edge_counter < values.len() { + values[block.edge_counter] = Vec::new(); + } + } + } + // Remove from mapping + self.storage.edge_id_map.remove(&eid); + } + } + } + DeltaOp::DelEdge(eid) => { + // Undo a deletion -> restore the old edge commit_ts + // Edge data and properties are still in storage. Restore commit_ts and + // label/dst from snapshot (if present). old_commit_ts is from undo entry. + if let Some(loc) = self.storage.edge_id_map.get(&eid) { + let (block_idx, offset) = *loc.value(); + if let Some(block) = edges.get_mut(block_idx) + && offset < block.edge_counter + && block.edges[offset].eid == eid + { + // Restore edge commit_ts (properties are still in storage) + block.edges[offset].commit_ts = old_ts; + if let Some((label_id, dst_id)) = + self.deleted_edge_snapshot.read().get(&eid).cloned() + { + block.edges[offset].label_id = label_id; + block.edges[offset].dst_id = dst_id; + } + } + } + } + DeltaOp::SetEdgeProps(eid, props_op) => { + // Restore old property values and commit_ts using EdgeId + // old_commit_ts is obtained from undo entry's timestamp + if let Some(loc) = self.storage.edge_id_map.get(&eid) { + let (block_idx, offset) = *loc.value(); + if let Some(block) = edges.get_mut(block_idx) + && offset < block.edge_counter + && block.edges[offset].eid == eid + { + // Restore props + let mut prop_cols = self.storage.property_columns.write().unwrap(); + for (k, idx) in props_op.indices.iter().enumerate() { + if prop_cols.get(*idx).is_none() { + continue; + } + let column = &mut prop_cols[*idx]; + if column.blocks.get(block_idx).is_none() { + column.blocks.insert( + block_idx, + crate::ap::olap_graph::PropertyBlock { + values: vec![ + Vec::new(); + crate::ap::olap_graph::BLOCK_CAPACITY + ], + min_ts: old_ts, + max_ts: old_ts, + }, + ); + } + let pb = &mut column.blocks[block_idx]; + pb.min_ts = pb.min_ts.min(old_ts); + pb.max_ts = pb.max_ts.max(old_ts); + pb.values[offset].retain(|v| v.ts != self.txn_id); + let old_val = props_op.props[k].clone(); + pb.values[offset].push(crate::ap::olap_graph::PropertyVersion { + ts: old_ts, + value: Some(old_val), + }); + } + // Restore commit_ts + block.edges[offset].commit_ts = old_ts; + } + } + } + _ => {} + } + } + + // clear undo buffer after abort + buffer.clear(); + self.deleted_edge_snapshot.write().clear(); + + Ok(()) + } +} + +// Lightweight helpers to record undo entries +impl MemTransaction { + pub fn push_undo(&self, op: DeltaOp, ts: Timestamp) { + self.undo_buffer.write().push((op, ts)); + } + + /// Record snapshot of an edge before soft deletion so abort can restore it. + pub fn record_deleted_edge(&self, eid: EdgeId, label_id: Option, dst_id: VertexId) { + self.deleted_edge_snapshot + .write() + .insert(eid, (label_id, dst_id)); + } +} + +impl Transaction for MemTransaction { + type Error = StorageError; + + fn txn_id(&self) -> Timestamp { + self.txn_id + } + + fn start_ts(&self) -> Timestamp { + self.start_ts + } + + fn commit_ts(&self) -> Option { + self.commit_ts.get().copied() + } + + fn isolation_level(&self) -> &IsolationLevel { + &self.isolation_level + } + + fn commit(&self) -> Result { + self.commit_at(None) + } + + fn abort(&self) -> Result<(), Self::Error> { + MemTransaction::abort(self) + } +} diff --git a/minigu/storage/src/tp/memory_graph.rs b/minigu/storage/src/tp/memory_graph.rs index 89942359..28e0dafc 100644 --- a/minigu/storage/src/tp/memory_graph.rs +++ b/minigu/storage/src/tp/memory_graph.rs @@ -154,7 +154,7 @@ impl VersionedVertex { DeltaOp::DelVertex(_) => { visible_vertex.is_tombstone = true; } - _ => unreachable!("Unreachable delta op for a vertex"), + _ => {} }; MemTransaction::apply_deltas_for_read(undo_ptr, apply_deltas, txn.start_ts()); // Check if the vertex is tombstone after applying the deltas @@ -260,7 +260,7 @@ impl VersionedEdge { DeltaOp::DelEdge(_) => { current_edge.is_tombstone = true; } - _ => unreachable!("Unreachable delta op for an edge"), + _ => {} }; MemTransaction::apply_deltas_for_read(undo_ptr, apply_deltas, txn.start_ts()); // Check if the vertex is tombstone after applying the deltas diff --git a/minigu/storage/tests/ap/ap_graph_test.rs b/minigu/storage/tests/ap/ap_graph_test.rs index 7c9f440d..f6da5a79 100644 --- a/minigu/storage/tests/ap/ap_graph_test.rs +++ b/minigu/storage/tests/ap/ap_graph_test.rs @@ -3,8 +3,8 @@ use std::fs::File; use std::io; use std::io::BufRead; use std::num::NonZeroU32; -use std::sync::RwLock; use std::sync::atomic::{AtomicBool, AtomicU64}; +use std::sync::{Arc, RwLock}; use std::time::Instant; use bitvec::order::Lsb0; @@ -17,15 +17,19 @@ use minigu_storage::ap::olap_graph::{ EdgeBlock, OlapEdge, OlapPropertyStore, OlapStorage, OlapStorageEdge, OlapVertex, PropertyBlock, PropertyColumn, }; +use minigu_storage::ap::transaction::MemTransaction; use minigu_storage::ap::{MutOlapGraph, OlapGraph}; use minigu_storage::model::properties::PropertyRecord; +use minigu_transaction::{IsolationLevel, Timestamp}; const PATH: &str = ""; -fn mock_olap_graph(property_cnt: u64) -> OlapStorage { +pub fn mock_olap_graph(property_cnt: u64) -> OlapStorage { let storage = OlapStorage { logic_id_counter: AtomicU64::new(0), + edge_id_counter: AtomicU64::new(1), dense_id_map: DashMap::new(), + edge_id_map: DashMap::new(), vertices: RwLock::new(Vec::new()), edges: RwLock::new(Vec::new()), property_columns: RwLock::new(Vec::new()), @@ -44,12 +48,25 @@ fn mock_olap_graph(property_cnt: u64) -> OlapStorage { storage } +// Helper function to create a mock transaction for testing +fn mock_transaction(storage: Arc) -> MemTransaction { + let arc_storage = storage; + let txn_id = Timestamp::with_ts(Timestamp::TXN_ID_START + 1); + MemTransaction::new( + arc_storage, + txn_id, + Timestamp::with_ts(1), + IsolationLevel::Snapshot, + ) +} + #[test] fn create_vertex_test() { - let storage = mock_olap_graph(0); + let storage = Arc::new(mock_olap_graph(0)); + let txn = mock_transaction(storage.clone()); for i in 1..=289 { let _result = storage.create_vertex( - &(), + &txn, OlapVertex { vid: (i + 30) as VertexId, properties: PropertyRecord::new(vec![ @@ -84,11 +101,12 @@ fn create_vertex_test() { #[test] fn create_edge_test() { - let storage = mock_olap_graph(1); + let storage = Arc::new(mock_olap_graph(1)); + let txn = mock_transaction(storage.clone()); // Insert vertex for i in 1u32..=5 { let _result = storage.create_vertex( - &(), + &txn, OlapVertex { vid: i as VertexId, properties: PropertyRecord::default(), @@ -98,7 +116,7 @@ fn create_edge_test() { for j in 1u32..=(400 - (i - 1) * 10) { let _result1 = storage.create_edge( - &(), + &txn, OlapEdge { label_id: NonZeroU32::new(i * 10000 + j), src_id: i as u64, @@ -124,12 +142,50 @@ fn create_edge_test() { assert_eq!(edges.first().unwrap().src_id, 1); } +#[test] +fn test_create_edge_sets_timestamps() { + let storage = Arc::new(mock_olap_graph(1)); + let txn = mock_transaction(storage.clone()); + // Insert vertex + let _ = storage.create_vertex( + &txn, + OlapVertex { + vid: 1, + properties: PropertyRecord::default(), + block_offset: 0, + }, + ); + + // Create an edge + let _ = storage.create_edge( + &txn, + OlapEdge { + label_id: NonZeroU32::new(100), + src_id: 1, + dst_id: 42, + properties: OlapPropertyStore::default(), + }, + ); + + // Inspect block header and edge + let edges = storage.edges.read().unwrap(); + let block = edges.first().unwrap(); + // Initial block timestamps should be initialized as max/min sentinel + assert_eq!(block.min_ts, Timestamp::max_commit_ts()); + assert_eq!(block.max_ts, Timestamp::with_ts(0)); + + // Check the inserted edge's commit_ts is default (with_ts(0)) + let e = block.edges[0]; + assert_eq!(e.commit_ts, Timestamp::with_ts(0)); +} + #[test] fn get_vertex_test() { - let storage = mock_olap_graph(0); + let storage = Arc::new(mock_olap_graph(0)); + let txn = mock_transaction(storage.clone()); for i in 0..289 { let _result = storage.create_vertex( - &(), + &txn, OlapVertex { vid: (i + 30) as VertexId, properties: PropertyRecord::new(vec![ @@ -141,11 +197,11 @@ fn get_vertex_test() { ); } - let result1 = storage.get_vertex(&(), 33); + let result1 = storage.get_vertex(&txn, 33); assert!(result1.is_ok()); assert_eq!(result1.unwrap().vid, 33); - let result2 = storage.get_vertex(&(), 63); + let result2 = storage.get_vertex(&txn, 63); assert!(result2.is_ok()); assert_eq!( result2 @@ -161,11 +217,13 @@ fn get_vertex_test() { #[test] fn get_edge_test() { - let storage = mock_olap_graph(1); + let storage = Arc::new(mock_olap_graph(1)); + let txn = mock_transaction(storage.clone()); // Insert vertex + let mut label_to_eid: HashMap = HashMap::new(); for i in 1..=5 { let _result = storage.create_vertex( - &(), + &txn, OlapVertex { vid: i as VertexId, properties: PropertyRecord::default(), @@ -174,36 +232,41 @@ fn get_edge_test() { ); for j in 1..=(400 - i * 10) { - let _result1 = storage.create_edge( - &(), - OlapEdge { - label_id: NonZeroU32::new(i * 10000 + j), - src_id: i as u64, - dst_id: (j * (i + 1)) as u64, - properties: OlapPropertyStore::new(vec![Some(ScalarValue::String(Some( - "hello".to_string(), - )))]), - }, - ); + let label_num = (i * 10000 + j) as u64; + let eid = storage + .create_edge( + &txn, + OlapEdge { + label_id: NonZeroU32::new((i * 10000 + j) as u32), + src_id: i as u64, + dst_id: (j * (i + 1)) as u64, + properties: OlapPropertyStore::new(vec![Some(ScalarValue::String(Some( + "hello".to_string(), + )))]), + }, + ) + .unwrap(); + label_to_eid.insert(label_num, eid); } } - let result1 = storage.get_edge(&(), NonZeroU32::new(30099)); + let result1 = storage.get_edge(&txn, *label_to_eid.get(&30099u64).unwrap()); println!("{result1:?}"); assert!(result1.is_ok()); assert_eq!(result1.unwrap().dst_id, 396); - let result2 = storage.get_edge(&(), NonZeroU32::new(20333)); + let result2 = storage.get_edge(&txn, *label_to_eid.get(&20333u64).unwrap()); assert!(result2.is_ok()); assert_eq!(result2.unwrap().label_id, NonZeroU32::new(20333)); } #[test] fn vertex_iterator_test() { - let storage = mock_olap_graph(0); + let storage = Arc::new(mock_olap_graph(0)); + let txn = mock_transaction(storage.clone()); for i in 0..500 { let _result = storage.create_vertex( - &(), + &txn, OlapVertex { vid: (i + 30) as VertexId, properties: PropertyRecord::new(vec![ @@ -215,7 +278,7 @@ fn vertex_iterator_test() { ); } - let mut vertex_iter = storage.iter_vertices(&()).unwrap(); + let mut vertex_iter = storage.iter_vertices(&txn).unwrap(); let vertex1 = vertex_iter.next().unwrap().unwrap(); let vertex2 = vertex_iter.next().unwrap().unwrap(); @@ -225,10 +288,11 @@ fn vertex_iterator_test() { #[test] fn edge_iterator_test() { - let storage = mock_olap_graph(1); + let storage = Arc::new(mock_olap_graph(1)); + let txn = mock_transaction(storage.clone()); for i in 1i32..=4 { let _result = storage.create_vertex( - &(), + &txn, OlapVertex { vid: i as VertexId, properties: PropertyRecord::new(vec![ @@ -241,7 +305,7 @@ fn edge_iterator_test() { for j in 1i32..=(i * 10) { let _result1 = storage.create_edge( - &(), + &txn, OlapEdge { label_id: NonZeroU32::new((i * 10000 + j) as u32), src_id: i as VertexId, @@ -254,7 +318,7 @@ fn edge_iterator_test() { } } - let edge_iter = storage.iter_edges(&()).unwrap(); + let edge_iter = storage.iter_edges(&txn).unwrap(); let mut cnt: usize = 0; for next in edge_iter { @@ -273,11 +337,12 @@ fn edge_iterator_test() { #[test] fn adjacency_iterator_test() { - let storage = mock_olap_graph(1); + let storage = Arc::new(mock_olap_graph(1)); + let txn = mock_transaction(storage.clone()); for i in 0..10 { let _result = storage.create_vertex( - &(), + &txn, OlapVertex { vid: i as VertexId, properties: PropertyRecord::default(), @@ -287,9 +352,9 @@ fn adjacency_iterator_test() { for j in 0..(i * 100) { let _result1 = storage.create_edge( - &(), + &txn, OlapEdge { - label_id: NonZeroU32::new(i * 10000 + j), + label_id: NonZeroU32::new((i * 10000 + j) as u32), src_id: i as VertexId, dst_id: (j * (i + 1)) as VertexId, properties: OlapPropertyStore::new(vec![Option::from(ScalarValue::String( @@ -300,7 +365,7 @@ fn adjacency_iterator_test() { } } - let mut adjacency = storage.iter_adjacency(&(), 8).unwrap(); + let mut adjacency = storage.iter_adjacency(&txn, 8).unwrap(); // Should be the 759th edge assert_eq!( adjacency.next().unwrap().unwrap().dst_id, @@ -332,10 +397,11 @@ fn adjacency_iterator_test() { #[test] fn set_vertex_properties_test() { - let storage = mock_olap_graph(0); + let storage = Arc::new(mock_olap_graph(0)); + let txn = mock_transaction(storage.clone()); for i in 0..100 { let _result = storage.create_vertex( - &(), + &txn, OlapVertex { vid: (i + 30) as VertexId, properties: PropertyRecord::new(vec![ @@ -347,9 +413,9 @@ fn set_vertex_properties_test() { ); } - let result1 = storage.set_vertex_property(&(), 30, vec![0], vec![ScalarValue::Int32(Some(1))]); + let result1 = storage.set_vertex_property(&txn, 30, vec![0], vec![ScalarValue::Int32(Some(1))]); let result2 = storage.set_vertex_property( - &(), + &txn, 50, vec![1], vec![ScalarValue::String(Some("No hello".to_string()))], @@ -388,10 +454,12 @@ fn set_vertex_properties_test() { #[test] fn set_edge_properties_test() { - let storage = mock_olap_graph(3); + let storage = Arc::new(mock_olap_graph(3)); + let txn = mock_transaction(storage.clone()); + let mut label_to_eid: HashMap = HashMap::new(); for i in 0..2 { let _result = storage.create_vertex( - &(), + &txn, OlapVertex { vid: i as VertexId, properties: PropertyRecord::default(), @@ -399,31 +467,35 @@ fn set_edge_properties_test() { }, ); for j in 0..3 { - let _result1 = storage.create_edge( - &(), - OlapEdge { - label_id: NonZeroU32::new(i * 10000 + j), - src_id: i as VertexId, - dst_id: (j + i) as VertexId, - properties: OlapPropertyStore::new(vec![ - Some(ScalarValue::UInt32(Some(j * 10))), - Some(ScalarValue::String(Some("hello".to_string()))), - Some(ScalarValue::Boolean(Some(true))), - ]), - }, - ); + let label_num = (i * 10000 + j) as u64; + let eid = storage + .create_edge( + &txn, + OlapEdge { + label_id: NonZeroU32::new((i * 10000 + j) as u32), + src_id: i as VertexId, + dst_id: (j + i) as VertexId, + properties: OlapPropertyStore::new(vec![ + Some(ScalarValue::UInt32(Some((j * 10) as u32))), + Some(ScalarValue::String(Some("hello".to_string()))), + Some(ScalarValue::Boolean(Some(true))), + ]), + }, + ) + .unwrap(); + label_to_eid.insert(label_num, eid); } } let _ = storage.set_edge_property( - &(), - NonZeroU32::new(10001), + &txn, + *label_to_eid.get(&10001u64).unwrap(), vec![0], vec![ScalarValue::Int32(Some(10086))], ); let _ = storage.set_edge_property( - &(), - NonZeroU32::new(10002), + &txn, + *label_to_eid.get(&10002u64).unwrap(), vec![1, 2], vec![ ScalarValue::String(Some("No hello".to_string())), @@ -432,14 +504,14 @@ fn set_edge_properties_test() { ); let store1 = storage - .get_edge(&(), NonZeroU32::new(10001)) + .get_edge(&txn, *label_to_eid.get(&10001u64).unwrap()) .unwrap() .properties; let clone1 = store1.properties.first().unwrap().clone(); assert_eq!(clone1.unwrap(), ScalarValue::Int32(Some(10086))); let store2 = storage - .get_edge(&(), NonZeroU32::new(10002)) + .get_edge(&txn, *label_to_eid.get(&10002u64).unwrap()) .unwrap() .properties; let clone2 = store2.properties.get(1).unwrap().clone(); @@ -453,11 +525,12 @@ fn set_edge_properties_test() { #[test] fn delete_vertex_test() { - let storage = mock_olap_graph(3); + let storage = Arc::new(mock_olap_graph(3)); + let txn = mock_transaction(storage.clone()); for i in 0..5 { let _result = storage.create_vertex( - &(), + &txn, OlapVertex { vid: i as VertexId, properties: PropertyRecord::default(), @@ -466,9 +539,9 @@ fn delete_vertex_test() { ); for j in 0..300 { let _result1 = storage.create_edge( - &(), + &txn, OlapEdge { - label_id: NonZeroU32::new(i * 10000 + j), + label_id: NonZeroU32::new((i * 10000 + j) as u32), src_id: i as VertexId, dst_id: (j + i) as VertexId, properties: OlapPropertyStore::default(), @@ -479,7 +552,7 @@ fn delete_vertex_test() { assert_eq!(storage.vertices.read().unwrap().len(), 5); - let _ = storage.delete_vertex(&(), 3); + let _ = storage.delete_vertex(&txn, 3); assert_eq!(storage.vertices.read().unwrap().len(), 4); assert!(!storage.edges.read().unwrap().get(5).unwrap().is_tombstone); @@ -490,10 +563,11 @@ fn delete_vertex_test() { #[test] fn delete_property_test() { - let storage = mock_olap_graph(5); + let storage = Arc::new(mock_olap_graph(5)); + let txn = mock_transaction(storage.clone()); let _result = storage.create_vertex( - &(), + &txn, OlapVertex { vid: 1 as VertexId, properties: PropertyRecord::default(), @@ -501,25 +575,30 @@ fn delete_property_test() { }, ); + let mut label_to_eid: HashMap = HashMap::new(); for i in 1..=5 { - let _result1 = storage.create_edge( - &(), - OlapEdge { - label_id: NonZeroU32::new(i), - src_id: 1 as VertexId, - dst_id: (10000 + i) as VertexId, - properties: OlapPropertyStore::new(vec![ - Some(ScalarValue::UInt32(Some(i * 10))), - Some(ScalarValue::String(Some("hello".to_string()))), - Some(ScalarValue::Boolean(Some(true))), - Some(ScalarValue::Float32(Some(F32::from(0.5) + i as f32))), - Some(ScalarValue::String(Some("another hello".to_string()))), - ]), - }, - ); + let label_num = i as u64; + let eid = storage + .create_edge( + &txn, + OlapEdge { + label_id: NonZeroU32::new(i), + src_id: 1 as VertexId, + dst_id: (10000 + i) as VertexId, + properties: OlapPropertyStore::new(vec![ + Some(ScalarValue::UInt32(Some(i * 10))), + Some(ScalarValue::String(Some("hello".to_string()))), + Some(ScalarValue::Boolean(Some(true))), + Some(ScalarValue::Float32(Some(F32::from(0.5) + i as f32))), + Some(ScalarValue::String(Some("another hello".to_string()))), + ]), + }, + ) + .unwrap(); + label_to_eid.insert(label_num, eid); } - let _ = storage.delete_edge(&(), NonZeroU32::new(2)); + let _ = storage.delete_edge(&txn, *label_to_eid.get(&2u64).unwrap()); { let binding = storage.edges.read().unwrap(); @@ -530,20 +609,20 @@ fn delete_property_test() { let binding = storage.property_columns.read().unwrap(); let property_block = binding.first().unwrap().blocks.first().unwrap(); - assert_eq!( - property_block.values[0], - Some(ScalarValue::UInt32(Some(10))) - ); - assert_eq!( - property_block.values[1], - Some(ScalarValue::UInt32(Some(30))) - ); + let v0 = property_block.values[0] + .last() + .and_then(|pv| pv.value.clone()); + let v1 = property_block.values[1] + .last() + .and_then(|pv| pv.value.clone()); + assert_eq!(v0, Some(ScalarValue::UInt32(Some(10)))); + assert_eq!(v1, Some(ScalarValue::UInt32(Some(30)))); } - let _ = storage.delete_edge(&(), NonZeroU32::new(1)); - let _ = storage.delete_edge(&(), NonZeroU32::new(3)); - let _ = storage.delete_edge(&(), NonZeroU32::new(4)); - let _ = storage.delete_edge(&(), NonZeroU32::new(5)); + let _ = storage.delete_edge(&txn, *label_to_eid.get(&1u64).unwrap()); + let _ = storage.delete_edge(&txn, *label_to_eid.get(&3u64).unwrap()); + let _ = storage.delete_edge(&txn, *label_to_eid.get(&4u64).unwrap()); + let _ = storage.delete_edge(&txn, *label_to_eid.get(&5u64).unwrap()); assert_eq!( storage.edges.read().unwrap().first().unwrap().edge_counter, @@ -554,11 +633,12 @@ fn delete_property_test() { #[test] fn compress_edge_test() { - let storage = mock_olap_graph(0); + let storage = Arc::new(mock_olap_graph(0)); + let txn = mock_transaction(storage.clone()); // Insert vertex for i in 1..=5 { let _result = storage.create_vertex( - &(), + &txn, OlapVertex { vid: i as VertexId, properties: PropertyRecord::default(), @@ -568,9 +648,9 @@ fn compress_edge_test() { for j in 1..=(400 - (i - 1) * 10) { let _result1 = storage.create_edge( - &(), + &txn, OlapEdge { - label_id: NonZeroU32::new(i * 10000 + j), + label_id: NonZeroU32::new((i * 10000 + j) as u32), src_id: i as u64, dst_id: (j + i) as u64, properties: Default::default(), @@ -600,11 +680,12 @@ fn compress_edge_test() { #[test] fn compress_property_test() { - let storage = mock_olap_graph(2); + let storage = Arc::new(mock_olap_graph(2)); + let txn = mock_transaction(storage.clone()); for i in 1..=5 { let _result = storage.create_vertex( - &(), + &txn, OlapVertex { vid: i as VertexId, properties: PropertyRecord::default(), @@ -614,13 +695,13 @@ fn compress_property_test() { for j in 1..=400 { let _result1 = storage.create_edge( - &(), + &txn, OlapEdge { - label_id: NonZeroU32::new(i * 10000 + j), + label_id: NonZeroU32::new((i * 10000 + j) as u32), src_id: i as u64, dst_id: (j * (i + 1)) as u64, properties: OlapPropertyStore::new(vec![ - Option::from(ScalarValue::UInt32(Some(j))), + Option::from(ScalarValue::UInt32(Some(j.try_into().unwrap()))), None, ]), }, @@ -629,9 +710,9 @@ fn compress_property_test() { for j in 1..=400 { let _result1 = storage.create_edge( - &(), + &txn, OlapEdge { - label_id: NonZeroU32::new(i * 2 * 10000 + j), + label_id: NonZeroU32::new((i * 2 * 10000 + j) as u32), src_id: i as u64, dst_id: (j * (i * 2 + 1)) as u64, properties: OlapPropertyStore::new(vec![ @@ -658,7 +739,8 @@ fn compress_property_test() { #[test] #[ignore] fn dataset1_create_edge_for_storage_test() { - let storage = mock_olap_graph(1); + let storage = Arc::new(mock_olap_graph(1)); + let txn = mock_transaction(storage.clone()); println!("Test for Twitter-Congress dataset"); // Twitter Congress Dataset @@ -673,13 +755,13 @@ fn dataset1_create_edge_for_storage_test() { let start_vertex = Instant::now(); for olap_vertex in vertices_clone { - let _result = storage.create_vertex(&(), olap_vertex); + let _result = storage.create_vertex(&txn, olap_vertex); } let _duration_vertex = start_vertex.elapsed(); let start_edge = Instant::now(); for olap_edges in edges_clone { - let _result = storage.create_edge(&(), olap_edges); + let _result = storage.create_edge(&txn, olap_edges); } let duration_edge = start_edge.elapsed(); @@ -693,7 +775,8 @@ fn dataset1_create_edge_for_storage_test() { #[test] #[ignore] fn dataset2_create_edge_for_storage_test() { - let storage = mock_olap_graph(0); + let storage = Arc::new(mock_olap_graph(0)); + let txn = mock_transaction(storage.clone()); println!("Test for Wiki-Vote dataset"); println!(); @@ -709,13 +792,13 @@ fn dataset2_create_edge_for_storage_test() { let start_vertex = Instant::now(); for olap_vertex in vertices_clone { - let _result = storage.create_vertex(&(), olap_vertex); + let _result = storage.create_vertex(&txn, olap_vertex); } let _duration_vertex = start_vertex.elapsed(); let start_edge = Instant::now(); for olap_edges in edges_clone { - let _result = storage.create_edge(&(), olap_edges); + let _result = storage.create_edge(&txn, olap_edges); } let duration_edge = start_edge.elapsed(); @@ -729,7 +812,8 @@ fn dataset2_create_edge_for_storage_test() { #[test] #[ignore] fn dataset3_create_edge_for_storage_test() { - let storage = mock_olap_graph(0); + let storage = Arc::new(mock_olap_graph(0)); + let txn = mock_transaction(storage.clone()); println!("Test for P2P-Gnutella25 dataset"); println!(); @@ -745,13 +829,13 @@ fn dataset3_create_edge_for_storage_test() { let start_vertex = Instant::now(); for olap_vertex in vertices_clone { - let _result = storage.create_vertex(&(), olap_vertex); + let _result = storage.create_vertex(&txn, olap_vertex); } let _duration_vertex = start_vertex.elapsed(); let start_edge = Instant::now(); for olap_edges in edges_clone { - let _result = storage.create_edge(&(), olap_edges); + let _result = storage.create_edge(&txn, olap_edges); } let duration_edge = start_edge.elapsed(); @@ -792,7 +876,8 @@ fn dataset3_edge_compaction_test() { #[test] #[ignore] fn dataset1_property_compaction_test() { - let storage = mock_olap_graph(2); + let storage = Arc::new(mock_olap_graph(2)); + let txn = mock_transaction(storage.clone()); let file_path = PATH.to_owned() + "title.episode.tsv"; let dataset = parse_title_episode_dataset(&file_path); @@ -804,13 +889,13 @@ fn dataset1_property_compaction_test() { let start_vertex = Instant::now(); for olap_vertex in vertices_clone { - let _result = storage.create_vertex(&(), olap_vertex); + let _result = storage.create_vertex(&txn, olap_vertex); } let _duration_vertex = start_vertex.elapsed(); let start_edge = Instant::now(); for olap_edges in edges_clone { - let _result = storage.create_edge(&(), olap_edges); + let _result = storage.create_edge(&txn, olap_edges); } let duration_edge = start_edge.elapsed(); @@ -829,7 +914,8 @@ fn dataset1_property_compaction_test() { #[test] #[ignore] fn dataset2_property_compaction_test() { - let storage = mock_olap_graph(2); + let storage = Arc::new(mock_olap_graph(2)); + let txn = mock_transaction(storage.clone()); let file_path = PATH.to_owned() + "title.crew.tsv"; let dataset = parse_title_crew_dataset(&file_path); @@ -841,13 +927,13 @@ fn dataset2_property_compaction_test() { let start_vertex = Instant::now(); for olap_vertex in vertices_clone { - let _result = storage.create_vertex(&(), olap_vertex); + let _result = storage.create_vertex(&txn, olap_vertex); } let _duration_vertex = start_vertex.elapsed(); let start_edge = Instant::now(); for olap_edges in edges_clone { - let _result = storage.create_edge(&(), olap_edges); + let _result = storage.create_edge(&txn, olap_edges); } let duration_edge = start_edge.elapsed(); @@ -866,7 +952,9 @@ fn dataset2_property_compaction_test() { #[test] #[ignore] fn dataset1_col_storage_analysis() { - let storage = mock_olap_graph(6); + let storage = Arc::new(mock_olap_graph(6)); + let txn = mock_transaction(storage.clone()); + let edge_path = PATH.to_owned() + "mooc_actions.tsv"; let property_path = PATH.to_owned() + "mooc_action_features.tsv"; @@ -879,13 +967,13 @@ fn dataset1_col_storage_analysis() { let start_vertex = Instant::now(); for olap_vertex in vertices_clone { - let _result = storage.create_vertex(&(), olap_vertex); + let _result = storage.create_vertex(&txn, olap_vertex); } let _duration_vertex = start_vertex.elapsed(); let start_edge = Instant::now(); for olap_edges in edges_clone { - let _result = storage.create_edge(&(), olap_edges); + let _result = storage.create_edge(&txn, olap_edges); } let duration_edge = start_edge.elapsed(); @@ -906,14 +994,13 @@ fn dataset1_col_storage_analysis() { let start_col_analysis1 = Instant::now(); for block in &x.get(2).unwrap().blocks { - for option in &block.values { - if option.is_none() { + for versions in &block.values { + if versions.is_empty() { break; } - _total1 += as Clone>::clone(option) - .unwrap() - .get_float64() - .unwrap(); + if let Some(s) = versions.last().and_then(|pv| pv.value.clone()) { + _total1 += s.get_float64().unwrap(); + } } } @@ -940,16 +1027,12 @@ fn dataset1_col_storage_analysis() { let start_col_analysis2 = Instant::now(); for block in &x.get(3).unwrap().blocks { - for option in &block.values { - if option.is_none() { + for versions in &block.values { + if versions.is_empty() { break; } - max1 = max1.max( - option - .clone() - .map(|s| s.get_float64().unwrap()) - .unwrap_or_default(), - ); + let latest = versions.last().and_then(|pv| pv.value.clone()); + max1 = max1.max(latest.map(|s| s.get_float64().unwrap()).unwrap_or_default()); } } @@ -978,16 +1061,12 @@ fn dataset1_col_storage_analysis() { let start_col_analysis3 = Instant::now(); for block in &x.get(4).unwrap().blocks { - for option in &block.values { - if option.is_none() { + for versions in &block.values { + if versions.is_empty() { break; } - min1 = min1.min( - option - .clone() - .map(|s| s.get_float64().unwrap()) - .unwrap_or_default(), - ); + let latest = versions.last().and_then(|pv| pv.value.clone()); + min1 = min1.min(latest.map(|s| s.get_float64().unwrap()).unwrap_or_default()); } } @@ -1012,7 +1091,8 @@ fn dataset1_col_storage_analysis() { } fn compress_storage_two_column_without_property(path: String, name: String) { - let storage = mock_olap_graph(0); + let storage = Arc::new(mock_olap_graph(0)); + let txn = mock_transaction(storage.clone()); let file_path = &path.clone(); let dataset = parse_two_column_dataset(file_path); @@ -1025,13 +1105,13 @@ fn compress_storage_two_column_without_property(path: String, name: String) { let start_vertex = Instant::now(); for olap_vertex in vertices_clone { - let _result = storage.create_vertex(&(), olap_vertex); + let _result = storage.create_vertex(&txn, olap_vertex); } let _duration_vertex = start_vertex.elapsed(); let start_edge = Instant::now(); for olap_edges in edges_clone { - let _result = storage.create_edge(&(), olap_edges); + let _result = storage.create_edge(&txn, olap_edges); } let duration_edge = start_edge.elapsed(); @@ -1238,20 +1318,20 @@ fn measure_memory_column(vec: &RwLock>) -> usize { let mut single_size = 0; for block in &column.blocks { total_size += size_of_val(&block.values); - for value in &block.values { - if value.is_none() { + for versions in &block.values { + if versions.is_empty() { total_size += single_size; } else { - let clone = value.clone(); - let size = match clone.unwrap() { - ScalarValue::Int32(_) => size_of::(), - ScalarValue::Int64(_) => size_of::(), - ScalarValue::Float32(_) => size_of::(), - ScalarValue::Float64(_) => size_of::(), - ScalarValue::String(Some(s)) => { + let latest = versions.last().and_then(|pv| pv.value.clone()); + let size = match latest { + Some(ScalarValue::Int32(_)) => size_of::(), + Some(ScalarValue::Int64(_)) => size_of::(), + Some(ScalarValue::Float32(_)) => size_of::(), + Some(ScalarValue::Float64(_)) => size_of::(), + Some(ScalarValue::String(Some(ref s))) => { size_of::() + s.len() * size_of::() } - ScalarValue::Boolean(_) => size_of::(), + Some(ScalarValue::Boolean(_)) => size_of::(), _ => 0, }; total_size += size; diff --git a/minigu/storage/tests/ap/mod.rs b/minigu/storage/tests/ap/mod.rs index 1f06a119..f07b12a5 100644 --- a/minigu/storage/tests/ap/mod.rs +++ b/minigu/storage/tests/ap/mod.rs @@ -1 +1,2 @@ pub mod ap_graph_test; +pub mod transaction_test; diff --git a/minigu/storage/tests/ap/transaction_test.rs b/minigu/storage/tests/ap/transaction_test.rs new file mode 100644 index 00000000..117a5771 --- /dev/null +++ b/minigu/storage/tests/ap/transaction_test.rs @@ -0,0 +1,1371 @@ +use std::num::NonZeroU32; +use std::sync::{Arc, Barrier, mpsc}; +use std::thread; + +use minigu_common::value::ScalarValue; +use minigu_storage::ap::olap_graph::{OlapEdge, OlapPropertyStore, OlapStorage, OlapVertex}; +use minigu_storage::ap::transaction::MemTransaction; +use minigu_storage::ap::{MutOlapGraph, OlapGraph}; +use minigu_storage::common::model::properties::PropertyRecord; +use minigu_storage::error::StorageError; +use minigu_transaction::{IsolationLevel, Timestamp}; + +fn make_storage() -> OlapStorage { + super::ap_graph_test::mock_olap_graph(0) +} + +/// Helper function to create test edges with different timestamps +fn create_test_edges( + storage: &Arc, + txn_id: Timestamp, + start_ts: Timestamp, + edge_offset: u32, +) { + // Create vertex first + let txn = MemTransaction::new(storage.clone(), txn_id, start_ts, IsolationLevel::Snapshot); + + let _ = storage.create_vertex( + &txn, + OlapVertex { + vid: 1, + properties: PropertyRecord::default(), + block_offset: 0, + }, + ); + + // Create edges + // Edge 1: label 100, dst 10 + let _ = storage.create_edge_in_txn( + &txn, + OlapEdge { + label_id: NonZeroU32::new(100 + edge_offset), + src_id: 1, + dst_id: 10, + properties: OlapPropertyStore::default(), + }, + ); + + // Edge 2: label 101, dst 20 + let _ = storage.create_edge_in_txn( + &txn, + OlapEdge { + label_id: NonZeroU32::new(101 + edge_offset), + src_id: 1, + dst_id: 20, + properties: OlapPropertyStore::default(), + }, + ); + + // Edge 3: label 102, dst 30 + let _ = storage.create_edge_in_txn( + &txn, + OlapEdge { + label_id: NonZeroU32::new(102 + edge_offset), + src_id: 1, + dst_id: 30, + properties: OlapPropertyStore::default(), + }, + ); + + txn.commit_at(Some(start_ts)) + .expect("Commit should succeed"); +} + +#[test] +fn test_ap_commit_replaces_txn_id() { + let storage = make_storage(); + let arc_storage = Arc::new(storage); + + let base_txn_id = Timestamp::with_ts(Timestamp::TXN_ID_START); + + let vertex_txn = MemTransaction::new( + arc_storage.clone(), + base_txn_id, + Timestamp::with_ts(0), + IsolationLevel::Snapshot, + ); + let _ = arc_storage.create_vertex( + &vertex_txn, + OlapVertex { + vid: 1, + properties: PropertyRecord::default(), + block_offset: 0, + }, + ); + vertex_txn + .commit_at(None) + .expect("Vertex commit should succeed"); + + let edge1_txn_id = Timestamp::with_ts(Timestamp::TXN_ID_START + 1); + let edge1_txn = MemTransaction::new( + arc_storage.clone(), + edge1_txn_id, + Timestamp::with_ts(1), + IsolationLevel::Snapshot, + ); + let _ = arc_storage.create_edge_in_txn( + &edge1_txn, + OlapEdge { + label_id: NonZeroU32::new(100), + src_id: 1, + dst_id: 42, + properties: OlapPropertyStore::default(), + }, + ); + let edge1_commit_ts = edge1_txn + .commit_at(None) + .expect("Edge1 commit should succeed"); + + let edge2_txn_id = Timestamp::with_ts(Timestamp::TXN_ID_START + 2); + let edge2_txn = MemTransaction::new( + arc_storage.clone(), + edge2_txn_id, + Timestamp::with_ts(2), + IsolationLevel::Snapshot, + ); + + let _ = arc_storage.create_edge_in_txn( + &edge2_txn, + OlapEdge { + label_id: NonZeroU32::new(100), + src_id: 1, + dst_id: 42, + properties: OlapPropertyStore::default(), + }, + ); + + let edge2_commit_ts = edge2_txn.commit_at(None).expect("commit should succeed"); + + let edges = arc_storage.edges.read().unwrap(); + let block = edges.first().unwrap(); + + assert_eq!(block.edges[0].commit_ts, edge2_commit_ts); + assert_eq!(block.min_ts, edge1_commit_ts); + assert_eq!(block.max_ts, edge2_commit_ts); +} + +#[test] +fn test_iter_edges_at_ts_filters() { + // Test 1: Basic visibility filtering with multiple timestamps + let storage = make_storage(); + let arc_storage = Arc::new(storage); + + let target_ts_50 = Timestamp::with_ts(50); + let txn50 = MemTransaction::new( + arc_storage.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 50), + target_ts_50, + IsolationLevel::Snapshot, + ); + + // Create edges at 100 timestamps + let txn_id1 = Timestamp::with_ts(Timestamp::TXN_ID_START + 100); + create_test_edges(&arc_storage, txn_id1, Timestamp::with_ts(100), 0); + + let target_ts_150 = Timestamp::with_ts(150); + let txn150 = MemTransaction::new( + arc_storage.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 150), + target_ts_150, + IsolationLevel::Snapshot, + ); + + // Create edges at 200 timestamps + let txn_id2 = Timestamp::with_ts(Timestamp::TXN_ID_START + 200); + create_test_edges(&arc_storage, txn_id2, Timestamp::with_ts(200), 100); + + let target_ts_250 = Timestamp::with_ts(250); + let txn250 = MemTransaction::new( + arc_storage.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 250), + target_ts_250, + IsolationLevel::Snapshot, + ); + + // Test visibility at different target timestamps + // At ts 50 (before any commits) - should see nothing + let iter50 = arc_storage.iter_edges_at_ts(&txn50).unwrap(); + let mut count50 = 0; + for _result in iter50 { + count50 += 1; + } + assert_eq!(count50, 0, "Should see no edges at ts 50"); + + // At ts 150 (after first commit, before second) - should see only first batch + let iter150 = arc_storage.iter_edges_at_ts(&txn150).unwrap(); + let mut count150 = 0; + for _ in iter150 { + count150 += 1; + } + assert_eq!( + count150, 3, + "Should see 3 edges from first transaction at ts 150" + ); + + // At ts 250 (after both commits) - should see all edges + let iter250 = arc_storage.iter_edges_at_ts(&txn250).unwrap(); + let mut count250 = 0; + for _ in iter250 { + count250 += 1; + } + assert_eq!( + count250, 6, + "Should see 6 edges from both transactions at ts 250" + ); +} + +#[test] +fn test_uncommitted_data_isolation() { + let storage2 = make_storage(); + let arc_storage2 = Arc::new(storage2); + + let txn_v_id = Timestamp::with_ts(Timestamp::TXN_ID_START + 500); + let txn500 = MemTransaction::new( + arc_storage2.clone(), + txn_v_id, + Timestamp::with_ts(500), + IsolationLevel::Snapshot, + ); + + // Create vertex + let _ = arc_storage2.create_vertex( + &txn500, + OlapVertex { + vid: 1, + properties: PropertyRecord::default(), + block_offset: 0, + }, + ); + + // Start transaction A (uncommitted) + let txn_a_id = Timestamp::with_ts(Timestamp::TXN_ID_START + 1000); + let txn_a = MemTransaction::new( + arc_storage2.clone(), + txn_a_id, + Timestamp::with_ts(1000), + IsolationLevel::Snapshot, + ); + + // Insert edge in transaction A (uncommitted) + let eid = arc_storage2 + .create_edge_in_txn( + &txn_a, + OlapEdge { + label_id: NonZeroU32::new(500), + src_id: 1, + dst_id: 100, + properties: OlapPropertyStore::default(), + }, + ) + .unwrap(); + + // Transaction A should see its own uncommitted edge + let edge_result = arc_storage2.get_edge_at_ts(&txn_a, eid); + assert!(edge_result.is_ok(), "Transaction A should see its own edge"); + assert!( + edge_result.unwrap().is_some(), + "Transaction A should see its own edge" + ); + + // Another transaction B should NOT see transaction A's uncommitted edge + let txn_b_id = Timestamp::with_ts(Timestamp::TXN_ID_START + 2000); + let txn_b = MemTransaction::new( + arc_storage2.clone(), + txn_b_id, + Timestamp::with_ts(2000), + IsolationLevel::Snapshot, + ); + let edge_result_b = arc_storage2.get_edge_at_ts(&txn_b, eid); + if let Ok(item) = edge_result_b { + assert!( + item.is_none(), + "Transaction B should not see transaction A's edge" + ); + } + // Test iter_edges_at_ts with uncommitted data + let iter_a = arc_storage2.iter_edges_at_ts(&txn_a).unwrap(); + let mut found_in_a = false; + for edge in iter_a { + if let Ok(e) = edge + && e.label_id == NonZeroU32::new(500) + { + found_in_a = true; + break; + } + } + assert!( + found_in_a, + "Transaction A should find its own edge in iterator" + ); + + let iter_b = arc_storage2.iter_edges_at_ts(&txn_b).unwrap(); + let mut found_in_b = false; + for edge in iter_b { + if let Ok(e) = edge + && e.label_id == NonZeroU32::new(500) + { + found_in_b = true; + break; + } + } + assert!( + !found_in_b, + "Transaction B should not find transaction A's edge in iterator" + ); +} + +#[test] +fn test_set_edge_property_in_txn_basic() { + let storage = make_storage(); + let arc_storage = Arc::new(storage); + let txn_id_1 = Timestamp::with_ts(Timestamp::TXN_ID_START + 1); + let txn_1 = MemTransaction::new( + arc_storage.clone(), + txn_id_1, + Timestamp::with_ts(1), + IsolationLevel::Snapshot, + ); + + // Create vertex and edge + let _ = arc_storage.create_vertex( + &txn_1, + OlapVertex { + vid: 1, + properties: PropertyRecord::default(), + block_offset: 0, + }, + ); + + let eid = arc_storage + .create_edge_in_txn( + &txn_1, + OlapEdge { + label_id: NonZeroU32::new(100), + src_id: 1, + dst_id: 10, + properties: OlapPropertyStore::new(vec![ + Some(ScalarValue::Int32(Some(42))), + Some(ScalarValue::String(Some("hello".to_string()))), + ]), + }, + ) + .unwrap(); + txn_1.commit_at(None).expect("Commit should succeed"); + + // Test setting a single property + let txn_id_2 = Timestamp::with_ts(Timestamp::TXN_ID_START + 2); + let txn_2 = MemTransaction::new( + arc_storage.clone(), + txn_id_2, + Timestamp::with_ts(2), + IsolationLevel::Snapshot, + ); + let result = arc_storage.set_edge_property_in_txn( + &txn_2, + eid, + vec![0], + vec![ScalarValue::Int32(Some(10086))], + ); + assert!(result.is_ok(), "Setting edge property should succeed"); + txn_2.commit_at(None).expect("Commit should succeed"); + + // Verify the property was updated + let txn_id_3 = Timestamp::with_ts(Timestamp::TXN_ID_START + 3); + let get_txn = MemTransaction::new( + arc_storage.clone(), + txn_id_3, + Timestamp::max_commit_ts(), + IsolationLevel::Snapshot, + ); + let edge = arc_storage.get_edge_at_ts(&get_txn, eid).unwrap(); + assert_eq!( + edge.as_ref().unwrap().properties.get(0), + Some(ScalarValue::Int32(Some(10086))), + "Property should be updated" + ); + assert_eq!( + edge.as_ref().unwrap().properties.get(1), + Some(ScalarValue::String(Some("hello".to_string()))), + "Other properties should remain unchanged" + ); +} + +#[test] +fn test_set_edge_property_in_txn_multiple_properties() { + let storage = make_storage(); + let arc_storage = Arc::new(storage); + let txn_id_1 = Timestamp::with_ts(Timestamp::TXN_ID_START + 1); + let txn_1 = MemTransaction::new( + arc_storage.clone(), + txn_id_1, + Timestamp::with_ts(1), + IsolationLevel::Snapshot, + ); + + // Create vertex and edge with multiple properties + let _ = arc_storage.create_vertex( + &txn_1, + OlapVertex { + vid: 1, + properties: PropertyRecord::default(), + block_offset: 0, + }, + ); + + let eid = arc_storage + .create_edge_in_txn( + &txn_1, + OlapEdge { + label_id: NonZeroU32::new(100), + src_id: 1, + dst_id: 10, + properties: OlapPropertyStore::new(vec![ + Some(ScalarValue::Int32(Some(42))), + Some(ScalarValue::String(Some("hello".to_string()))), + Some(ScalarValue::Boolean(Some(true))), + ]), + }, + ) + .unwrap(); + txn_1.commit_at(None).expect("Commit should succeed"); + + // Test setting multiple properties + let txn_id_2 = Timestamp::with_ts(Timestamp::TXN_ID_START + 2); + let txn_2 = MemTransaction::new( + arc_storage.clone(), + txn_id_2, + Timestamp::with_ts(2), + IsolationLevel::Snapshot, + ); + let result = arc_storage.set_edge_property_in_txn( + &txn_2, + eid, + vec![0, 2], + vec![ + ScalarValue::Int32(Some(10086)), + ScalarValue::Boolean(Some(false)), + ], + ); + txn_2.commit_at(None).expect("Commit should succeed"); + assert!( + result.is_ok(), + "Setting multiple edge properties should succeed" + ); + + // Verify all properties were updated + let get_txn_id = Timestamp::with_ts(Timestamp::TXN_ID_START + 3); + let get_txn = MemTransaction::new( + arc_storage.clone(), + get_txn_id, + Timestamp::max_commit_ts(), + IsolationLevel::Snapshot, + ); + let edge = arc_storage.get_edge_at_ts(&get_txn, eid).unwrap(); + assert_eq!( + edge.as_ref().unwrap().properties.get(0), + Some(ScalarValue::Int32(Some(10086))), + "First property should be updated" + ); + assert_eq!( + edge.as_ref().unwrap().properties.get(1), + Some(ScalarValue::String(Some("hello".to_string()))), + "Second property should remain unchanged" + ); + assert_eq!( + edge.as_ref().unwrap().properties.get(2), + Some(ScalarValue::Boolean(Some(false))), + "Third property should be updated" + ); +} + +#[test] +fn test_set_edge_property_in_txn_nonexistent_edge() { + let storage = make_storage(); + let arc_storage = Arc::new(storage); + let txn = MemTransaction::new( + arc_storage.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 1), + Timestamp::with_ts(1), + IsolationLevel::Snapshot, + ); + + // Try to set property on non-existent edge + let result = arc_storage.set_edge_property_in_txn( + &txn, + 999u64, + vec![0], + vec![ScalarValue::Int32(Some(10086))], + ); + assert!(result.is_err(), "Should return error for non-existent edge"); + assert!( + matches!(result.unwrap_err(), StorageError::EdgeNotFound(_)), + "Should return EdgeNotFound error" + ); +} + +#[test] +fn test_set_edge_property_in_txn_transaction_rollback() { + let storage = make_storage(); + let arc_storage = Arc::new(storage); + let txn_id_1 = Timestamp::with_ts(Timestamp::TXN_ID_START + 1); + let txn_1 = MemTransaction::new( + arc_storage.clone(), + txn_id_1, + Timestamp::with_ts(1), + IsolationLevel::Snapshot, + ); + + // Create vertex and edge + let _ = arc_storage.create_vertex( + &txn_1, + OlapVertex { + vid: 1, + properties: PropertyRecord::default(), + block_offset: 0, + }, + ); + + let eid = arc_storage + .create_edge_in_txn( + &txn_1, + OlapEdge { + label_id: NonZeroU32::new(100), + src_id: 1, + dst_id: 10, + properties: OlapPropertyStore::new(vec![Some(ScalarValue::Int32(Some(42)))]), + }, + ) + .unwrap(); + txn_1.commit_at(None).expect("Commit should succeed"); + + // Start a transaction to set property + let txn_id_2 = Timestamp::with_ts(Timestamp::TXN_ID_START + 2); + let txn_2 = MemTransaction::new( + arc_storage.clone(), + txn_id_2, + Timestamp::with_ts(2), + IsolationLevel::Snapshot, + ); + let _ = arc_storage.set_edge_property_in_txn( + &txn_2, + eid, + vec![0], + vec![ScalarValue::Int32(Some(10086))], + ); + + // Rollback the transaction + txn_2.abort().expect("Rollback should succeed"); + + // Verify the property was not changed + let get_txn_id = Timestamp::with_ts(Timestamp::TXN_ID_START + 3); + let get_txn = MemTransaction::new( + arc_storage.clone(), + get_txn_id, + Timestamp::max_commit_ts(), + IsolationLevel::Snapshot, + ); + let edge = arc_storage.get_edge_at_ts(&get_txn, eid).unwrap(); + assert_eq!( + edge.as_ref().unwrap().properties.get(0), + Some(ScalarValue::Int32(Some(42))), + "Property should remain unchanged after rollback" + ); +} + +#[test] +fn test_delete_edge_in_txn_basic() { + let storage = make_storage(); + let arc_storage = Arc::new(storage); + let txn_id = Timestamp::with_ts(Timestamp::TXN_ID_START + 1); + let txn = MemTransaction::new( + arc_storage.clone(), + txn_id, + Timestamp::with_ts(1), + IsolationLevel::Snapshot, + ); + + // Create vertex and edge + let _ = arc_storage.create_vertex( + &txn, + OlapVertex { + vid: 1, + properties: PropertyRecord::default(), + block_offset: 0, + }, + ); + + let eid = arc_storage + .create_edge_in_txn( + &txn, + OlapEdge { + label_id: NonZeroU32::new(100), + src_id: 1, + dst_id: 10, + properties: OlapPropertyStore::new(vec![Some(ScalarValue::Int32(Some(42)))]), + }, + ) + .unwrap(); + txn.commit_at(None).expect("Commit should succeed"); + + // Test deleting the edge + let delete_txn_id = Timestamp::with_ts(Timestamp::TXN_ID_START + 2); + let delete_txn = MemTransaction::new( + arc_storage.clone(), + delete_txn_id, + Timestamp::with_ts(2), + IsolationLevel::Snapshot, + ); + let result = arc_storage.delete_edge_in_txn(&delete_txn, eid); + assert!(result.is_ok(), "Deleting edge should succeed"); + delete_txn.commit_at(None).expect("Commit should succeed"); + + // Verify the edge is gone + let get_txn_id = Timestamp::with_ts(Timestamp::TXN_ID_START + 3); + let get_txn = MemTransaction::new( + arc_storage.clone(), + get_txn_id, + Timestamp::max_commit_ts(), + IsolationLevel::Snapshot, + ); + let edge = arc_storage.get_edge_at_ts(&get_txn, eid); + assert!( + matches!(edge, Ok(None)), + "Edge should not be found after deletion, got: {:?}", + edge + ); +} + +#[test] +fn test_iter_adjacency_at_ts_filters() { + // Test 1: Basic visibility filtering with multiple timestamps + let storage = make_storage(); + let arc_storage = Arc::new(storage); + + let target_ts_50 = Timestamp::with_ts(50); + let txn50 = MemTransaction::new( + arc_storage.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 50), + target_ts_50, + IsolationLevel::Snapshot, + ); + + // Create vertex + let _ = arc_storage.create_vertex( + &txn50, + OlapVertex { + vid: 1, + properties: PropertyRecord::default(), + block_offset: 0, + }, + ); + txn50.commit_at(None).expect("Vertex commit should succeed"); + + // Create edges at 100 timestamps + let txn_id1 = Timestamp::with_ts(Timestamp::TXN_ID_START + 100); + create_test_edges(&arc_storage, txn_id1, Timestamp::with_ts(100), 0); + + let target_ts_150 = Timestamp::with_ts(150); + let txn150 = MemTransaction::new( + arc_storage.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 150), + target_ts_150, + IsolationLevel::Snapshot, + ); + + // Create edges at 200 timestamps + let txn_id2 = Timestamp::with_ts(Timestamp::TXN_ID_START + 200); + create_test_edges(&arc_storage, txn_id2, Timestamp::with_ts(200), 100); + + let target_ts_250 = Timestamp::with_ts(250); + let txn250 = MemTransaction::new( + arc_storage.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 250), + target_ts_250, + IsolationLevel::Snapshot, + ); + + // Test visibility at different target timestamps + // At ts 50 (before any commits) - should see nothing + let iter50 = arc_storage.iter_adjacency_at_ts(&txn50, 1).unwrap(); + let mut count50 = 0; + for _result in iter50 { + count50 += 1; + } + assert_eq!(count50, 0, "Should see no edges at ts 50"); + + // At ts 150 (after first commit, before second) - should see only first batch + let iter150 = arc_storage.iter_adjacency_at_ts(&txn150, 1).unwrap(); + let mut count150 = 0; + for _ in iter150 { + count150 += 1; + } + assert_eq!( + count150, 3, + "Should see 3 edges from first transaction at ts 150" + ); + + // At ts 250 (after both commits) - should see all edges + let iter250 = arc_storage.iter_adjacency_at_ts(&txn250, 1).unwrap(); + let mut count250 = 0; + for _ in iter250 { + count250 += 1; + } + assert_eq!( + count250, 6, + "Should see 6 edges from both transactions at ts 250" + ); +} + +#[test] +fn test_delete_edge_in_txn_transaction_rollback() { + let storage = make_storage(); + let arc_storage = Arc::new(storage); + let txn_id = Timestamp::with_ts(Timestamp::TXN_ID_START + 1); + let txn = MemTransaction::new( + arc_storage.clone(), + txn_id, + Timestamp::with_ts(1), + IsolationLevel::Snapshot, + ); + + // Create vertex and edge + let _ = arc_storage.create_vertex( + &txn, + OlapVertex { + vid: 1, + properties: PropertyRecord::default(), + block_offset: 0, + }, + ); + + let eid = arc_storage + .create_edge_in_txn( + &txn, + OlapEdge { + label_id: NonZeroU32::new(100), + src_id: 1, + dst_id: 10, + properties: OlapPropertyStore::new(vec![Some(ScalarValue::Int32(Some(42)))]), + }, + ) + .unwrap(); + txn.commit_at(None).expect("Commit should succeed"); + + // Start a transaction to delete edge + let delete_txn_id = Timestamp::with_ts(Timestamp::TXN_ID_START + 2); + let delete_txn = MemTransaction::new( + arc_storage.clone(), + delete_txn_id, + Timestamp::with_ts(2), + IsolationLevel::Snapshot, + ); + let _ = arc_storage.delete_edge_in_txn(&delete_txn, eid); + + // Rollback the transaction + delete_txn.abort().expect("Rollback should succeed"); + + // Verify the edge is still there + let get_txn_id = Timestamp::with_ts(Timestamp::TXN_ID_START + 3); + let get_txn = MemTransaction::new( + arc_storage.clone(), + get_txn_id, + Timestamp::max_commit_ts(), + IsolationLevel::Snapshot, + ); + let edge = arc_storage.get_edge_at_ts(&get_txn, eid).unwrap(); + assert_eq!( + edge.as_ref().unwrap().properties.get(0), + Some(ScalarValue::Int32(Some(42))), + "Edge should still exist after rollback" + ); +} + +#[test] +fn test_delete_edge_in_txn_nonexistent_edge() { + let storage = make_storage(); + let arc_storage = Arc::new(storage); + let txn_id = Timestamp::with_ts(Timestamp::TXN_ID_START + 1); + let txn = MemTransaction::new( + arc_storage.clone(), + txn_id, + Timestamp::with_ts(1), + IsolationLevel::Snapshot, + ); + + // Try to delete non-existent edge + let result = arc_storage.delete_edge_in_txn(&txn, 999u64); + assert!(result.is_err(), "Should return error for non-existent edge"); + assert!( + matches!(result.unwrap_err(), StorageError::EdgeNotFound(_)), + "Should return EdgeNotFound error" + ); +} + +#[test] +fn test_delete_edge_in_txn_with_properties() { + let storage = make_storage(); + let arc_storage = Arc::new(storage); + let txn_id_1 = Timestamp::with_ts(Timestamp::TXN_ID_START + 1); + let txn_1 = MemTransaction::new( + arc_storage.clone(), + txn_id_1, + Timestamp::with_ts(1), + IsolationLevel::Snapshot, + ); + + // Create vertex and edge with multiple properties + let _ = arc_storage.create_vertex( + &txn_1, + OlapVertex { + vid: 1, + properties: PropertyRecord::default(), + block_offset: 0, + }, + ); + + let eid = arc_storage + .create_edge_in_txn( + &txn_1, + OlapEdge { + label_id: NonZeroU32::new(100), + src_id: 1, + dst_id: 10, + properties: OlapPropertyStore::new(vec![ + Some(ScalarValue::Int32(Some(42))), + Some(ScalarValue::String(Some("hello".to_string()))), + ]), + }, + ) + .unwrap(); + txn_1.commit_at(None).expect("Commit should succeed"); + + // Test deleting the edge + let delete_txn_id = Timestamp::with_ts(Timestamp::TXN_ID_START + 2); + let delete_txn = MemTransaction::new( + arc_storage.clone(), + delete_txn_id, + Timestamp::with_ts(2), + IsolationLevel::Snapshot, + ); + let result = arc_storage.delete_edge_in_txn(&delete_txn, eid); + assert!( + result.is_ok(), + "Deleting edge with properties should succeed" + ); + delete_txn.commit_at(None).expect("Commit should succeed"); + + // Verify the edge is gone + let get_txn_id = Timestamp::with_ts(Timestamp::TXN_ID_START + 3); + let get_txn = MemTransaction::new( + arc_storage.clone(), + get_txn_id, + Timestamp::max_commit_ts(), + IsolationLevel::Snapshot, + ); + let edge = arc_storage.get_edge_at_ts(&get_txn, eid); + assert!( + matches!(edge, Ok(None)), + "Edge should not be found after deletion, got: {:?}", + edge + ); +} + +#[test] +fn test_concurrent_set_and_delete_serializes_writes() { + let storage = make_storage(); + let arc_storage = Arc::new(storage); + + let setup_txn = MemTransaction::new( + arc_storage.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 10), + Timestamp::with_ts(10), + IsolationLevel::Snapshot, + ); + let _ = arc_storage.create_vertex( + &setup_txn, + OlapVertex { + vid: 1, + properties: PropertyRecord::default(), + block_offset: 0, + }, + ); + let eid = arc_storage + .create_edge_in_txn( + &setup_txn, + OlapEdge { + label_id: NonZeroU32::new(100), + src_id: 1, + dst_id: 10, + properties: OlapPropertyStore::new(vec![Some(ScalarValue::Int32(Some(1)))]), + }, + ) + .unwrap(); + setup_txn + .commit_at(None) + .expect("Setup commit should succeed"); + + let barrier = Arc::new(Barrier::new(2)); + + let storage_a = arc_storage.clone(); + let barrier_a = barrier.clone(); + let handle_a = thread::spawn(move || { + let txn = MemTransaction::new( + storage_a.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 11), + Timestamp::with_ts(11), + IsolationLevel::Snapshot, + ); + barrier_a.wait(); + let _ = storage_a.set_edge_property_in_txn( + &txn, + eid, + vec![0], + vec![ScalarValue::Int32(Some(111))], + ); + txn.commit_at(None).expect("Set commit should succeed"); + }); + + let storage_b = arc_storage.clone(); + let barrier_b = barrier.clone(); + let handle_b = thread::spawn(move || { + let txn = MemTransaction::new( + storage_b.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 12), + Timestamp::with_ts(12), + IsolationLevel::Snapshot, + ); + barrier_b.wait(); + let _ = storage_b.delete_edge_in_txn(&txn, eid); + txn.commit_at(None).expect("Delete commit should succeed"); + }); + + handle_a.join().expect("Thread A should finish"); + handle_b.join().expect("Thread B should finish"); + + let read_txn = MemTransaction::new( + arc_storage.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 13), + Timestamp::max_commit_ts(), + IsolationLevel::Snapshot, + ); + let edge = arc_storage.get_edge_at_ts(&read_txn, eid).unwrap(); + assert!( + edge.is_none(), + "Edge should be deleted after concurrent writes" + ); +} + +#[test] +fn test_concurrent_read_hides_uncommitted_edge() { + let storage = make_storage(); + let arc_storage = Arc::new(storage); + + let setup_txn = MemTransaction::new( + arc_storage.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 20), + Timestamp::with_ts(20), + IsolationLevel::Snapshot, + ); + let _ = arc_storage.create_vertex( + &setup_txn, + OlapVertex { + vid: 1, + properties: PropertyRecord::default(), + block_offset: 0, + }, + ); + setup_txn + .commit_at(None) + .expect("Setup commit should succeed"); + + let (ready_tx, ready_rx) = mpsc::channel::(); + let (done_tx, done_rx) = mpsc::channel::<()>(); + + let storage_writer = arc_storage.clone(); + let handle_writer = thread::spawn(move || { + let txn = MemTransaction::new( + storage_writer.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 21), + Timestamp::with_ts(21), + IsolationLevel::Snapshot, + ); + let eid = storage_writer + .create_edge_in_txn( + &txn, + OlapEdge { + label_id: NonZeroU32::new(200), + src_id: 1, + dst_id: 100, + properties: OlapPropertyStore::default(), + }, + ) + .unwrap(); + ready_tx.send(eid).expect("Ready send should succeed"); + done_rx.recv().expect("Done recv should succeed"); + txn.commit_at(None).expect("Writer commit should succeed"); + eid + }); + + let storage_reader = arc_storage.clone(); + let handle_reader = thread::spawn(move || { + let eid = ready_rx.recv().expect("Ready recv should succeed"); + let txn = MemTransaction::new( + storage_reader.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 22), + Timestamp::with_ts(22), + IsolationLevel::Snapshot, + ); + let edge = storage_reader.get_edge_at_ts(&txn, eid).unwrap(); + assert!(edge.is_none(), "Reader should not see uncommitted edge"); + + let iter = storage_reader.iter_edges_at_ts(&txn).unwrap(); + let mut found = false; + for next in iter { + if let Ok(e) = next + && e.label_id == NonZeroU32::new(200) + { + found = true; + break; + } + } + assert!(!found, "Iterator should not see uncommitted edge"); + done_tx.send(()).expect("Done send should succeed"); + }); + + handle_reader.join().expect("Reader thread should finish"); + let eid = handle_writer.join().expect("Writer thread should finish"); + + let read_txn = MemTransaction::new( + arc_storage.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 23), + Timestamp::max_commit_ts(), + IsolationLevel::Snapshot, + ); + let edge = arc_storage.get_edge_at_ts(&read_txn, eid).unwrap(); + assert!(edge.is_some(), "Edge should be visible after commit"); +} + +#[test] +fn test_concurrent_insert_and_set_preserves_properties() { + let storage = make_storage(); + let arc_storage = Arc::new(storage); + + let setup_txn = MemTransaction::new( + arc_storage.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 30), + Timestamp::with_ts(30), + IsolationLevel::Snapshot, + ); + let _ = arc_storage.create_vertex( + &setup_txn, + OlapVertex { + vid: 1, + properties: PropertyRecord::default(), + block_offset: 0, + }, + ); + let eid = arc_storage + .create_edge_in_txn( + &setup_txn, + OlapEdge { + label_id: NonZeroU32::new(300), + src_id: 1, + dst_id: 20, + properties: OlapPropertyStore::new(vec![Some(ScalarValue::Int32(Some(1)))]), + }, + ) + .unwrap(); + setup_txn + .commit_at(None) + .expect("Setup commit should succeed"); + + let barrier = Arc::new(Barrier::new(2)); + + let storage_set = arc_storage.clone(); + let barrier_set = barrier.clone(); + let handle_set = thread::spawn(move || { + let txn = MemTransaction::new( + storage_set.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 31), + Timestamp::with_ts(31), + IsolationLevel::Snapshot, + ); + barrier_set.wait(); + let _ = storage_set.set_edge_property_in_txn( + &txn, + eid, + vec![0], + vec![ScalarValue::Int32(Some(999))], + ); + txn.commit_at(None).expect("Set commit should succeed"); + }); + + let storage_insert = arc_storage.clone(); + let barrier_insert = barrier.clone(); + let handle_insert = thread::spawn(move || { + let txn = MemTransaction::new( + storage_insert.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 32), + Timestamp::with_ts(32), + IsolationLevel::Snapshot, + ); + barrier_insert.wait(); + let _ = storage_insert + .create_edge_in_txn( + &txn, + OlapEdge { + label_id: NonZeroU32::new(301), + src_id: 1, + dst_id: 10, + properties: OlapPropertyStore::new(vec![Some(ScalarValue::Int32(Some(2)))]), + }, + ) + .unwrap(); + txn.commit_at(None).expect("Insert commit should succeed"); + }); + + handle_set.join().expect("Set thread should finish"); + handle_insert.join().expect("Insert thread should finish"); + + let read_txn = MemTransaction::new( + arc_storage.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 33), + Timestamp::max_commit_ts(), + IsolationLevel::Snapshot, + ); + let edge = arc_storage.get_edge_at_ts(&read_txn, eid).unwrap(); + let props = edge.unwrap().properties; + assert_eq!( + props.get(0), + Some(ScalarValue::Int32(Some(999))), + "Property update should remain on the original edge" + ); +} + +#[test] +fn test_concurrent_commit_and_abort_preserve_committed_value() { + let storage = make_storage(); + let arc_storage = Arc::new(storage); + + let setup_txn = MemTransaction::new( + arc_storage.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 40), + Timestamp::with_ts(40), + IsolationLevel::Snapshot, + ); + let _ = arc_storage.create_vertex( + &setup_txn, + OlapVertex { + vid: 1, + properties: PropertyRecord::default(), + block_offset: 0, + }, + ); + let eid = arc_storage + .create_edge_in_txn( + &setup_txn, + OlapEdge { + label_id: NonZeroU32::new(400), + src_id: 1, + dst_id: 10, + properties: OlapPropertyStore::new(vec![Some(ScalarValue::Int32(Some(10)))]), + }, + ) + .unwrap(); + setup_txn + .commit_at(None) + .expect("Setup commit should succeed"); + + let barrier = Arc::new(Barrier::new(2)); + + let storage_commit = arc_storage.clone(); + let barrier_commit = barrier.clone(); + let handle_commit = thread::spawn(move || { + let txn = MemTransaction::new( + storage_commit.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 41), + Timestamp::with_ts(41), + IsolationLevel::Snapshot, + ); + barrier_commit.wait(); + let _ = storage_commit.set_edge_property_in_txn( + &txn, + eid, + vec![0], + vec![ScalarValue::Int32(Some(1111))], + ); + txn.commit_at(None).expect("Commit txn should succeed"); + }); + + let storage_abort = arc_storage.clone(); + let barrier_abort = barrier.clone(); + let handle_abort = thread::spawn(move || { + let txn = MemTransaction::new( + storage_abort.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 42), + Timestamp::with_ts(42), + IsolationLevel::Snapshot, + ); + barrier_abort.wait(); + let _ = storage_abort.set_edge_property_in_txn( + &txn, + eid, + vec![0], + vec![ScalarValue::Int32(Some(2222))], + ); + txn.abort().expect("Abort txn should succeed"); + }); + + handle_commit.join().expect("Commit thread should finish"); + handle_abort.join().expect("Abort thread should finish"); + + let read_txn = MemTransaction::new( + arc_storage.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 43), + Timestamp::max_commit_ts(), + IsolationLevel::Snapshot, + ); + let edge = arc_storage.get_edge_at_ts(&read_txn, eid).unwrap(); + let props = edge.unwrap().properties; + assert_eq!( + props.get(0), + Some(ScalarValue::Int32(Some(1111))), + "Committed value should win over aborted update" + ); +} + +#[test] +fn test_abort_create_edge_keeps_property_alignment() { + let storage = make_storage(); + let arc_storage = Arc::new(storage); + + let setup_txn = MemTransaction::new( + arc_storage.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 50), + Timestamp::with_ts(50), + IsolationLevel::Snapshot, + ); + let _ = arc_storage.create_vertex( + &setup_txn, + OlapVertex { + vid: 1, + properties: PropertyRecord::default(), + block_offset: 0, + }, + ); + setup_txn + .commit_at(None) + .expect("Setup commit should succeed"); + + let base_txn = MemTransaction::new( + arc_storage.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 51), + Timestamp::with_ts(51), + IsolationLevel::Snapshot, + ); + + let eid10 = arc_storage + .create_edge_in_txn( + &base_txn, + OlapEdge { + label_id: NonZeroU32::new(600), + src_id: 1, + dst_id: 10, + properties: OlapPropertyStore::new(vec![Some(ScalarValue::Int32(Some(10)))]), + }, + ) + .unwrap(); + let eid20 = arc_storage + .create_edge_in_txn( + &base_txn, + OlapEdge { + label_id: NonZeroU32::new(601), + src_id: 1, + dst_id: 20, + properties: OlapPropertyStore::new(vec![Some(ScalarValue::Int32(Some(20)))]), + }, + ) + .unwrap(); + let eid30 = arc_storage + .create_edge_in_txn( + &base_txn, + OlapEdge { + label_id: NonZeroU32::new(602), + src_id: 1, + dst_id: 30, + properties: OlapPropertyStore::new(vec![Some(ScalarValue::Int32(Some(30)))]), + }, + ) + .unwrap(); + base_txn + .commit_at(None) + .expect("Base commit should succeed"); + + let abort_txn = MemTransaction::new( + arc_storage.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 52), + Timestamp::with_ts(52), + IsolationLevel::Snapshot, + ); + let aborted_eid = arc_storage + .create_edge_in_txn( + &abort_txn, + OlapEdge { + label_id: NonZeroU32::new(603), + src_id: 1, + dst_id: 15, + properties: OlapPropertyStore::new(vec![Some(ScalarValue::Int32(Some(999)))]), + }, + ) + .unwrap(); + abort_txn.abort().expect("Abort should succeed"); + + assert!( + arc_storage.edge_id_map.get(&aborted_eid).is_none(), + "Aborted edge should not remain in edge_id_map" + ); + + let read_txn = MemTransaction::new( + arc_storage.clone(), + Timestamp::with_ts(Timestamp::TXN_ID_START + 53), + Timestamp::max_commit_ts(), + IsolationLevel::Snapshot, + ); + + let edge10 = arc_storage + .get_edge_at_ts(&read_txn, eid10) + .unwrap() + .unwrap(); + let edge20 = arc_storage + .get_edge_at_ts(&read_txn, eid20) + .unwrap() + .unwrap(); + let edge30 = arc_storage + .get_edge_at_ts(&read_txn, eid30) + .unwrap() + .unwrap(); + + assert_eq!(edge10.properties.get(0), Some(ScalarValue::Int32(Some(10)))); + assert_eq!(edge20.properties.get(0), Some(ScalarValue::Int32(Some(20)))); + assert_eq!(edge30.properties.get(0), Some(ScalarValue::Int32(Some(30)))); + + let locations = [ + (eid10, ScalarValue::Int32(Some(10))), + (eid20, ScalarValue::Int32(Some(20))), + (eid30, ScalarValue::Int32(Some(30))), + ] + .into_iter() + .map(|(eid, expected)| { + let (block_idx, offset) = *arc_storage.edge_id_map.get(&eid).unwrap().value(); + (block_idx, offset, expected) + }) + .collect::>(); + + let property_columns = arc_storage.property_columns.read().unwrap(); + let prop_col = &property_columns[0]; + for (block_idx, offset, expected) in locations { + let block = prop_col.blocks.get(block_idx).unwrap(); + let last = block.values[offset].last().unwrap(); + assert_eq!(last.value, Some(expected)); + } +}