feat(storage): add block level timestamp to OLAP storage#136
feat(storage): add block level timestamp to OLAP storage#136date727 wants to merge 7 commits intoTuGraph-family:masterfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds block-level timestamp support to the OLAP storage engine to enable Multi-Version Concurrency Control (MVCC). The implementation introduces a new transaction module for AP storage, adds unique edge identifiers with timestamp tracking, and implements property versioning.
Changes:
- Introduces
MemTransactionfor AP storage with commit/abort semantics and undo buffer management - Adds
EdgeIdas unique edge identifier andcommit_tstimestamp to edges for MVCC visibility - Implements property versioning with
PropertyVersionstruct containing timestamp and value pairs - Adds timestamp-aware iterators (
EdgeIterAtTs,AdjacencyIteratorAtTs) with visibility filtering
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 21 comments.
Show a summary per file
| File | Description |
|---|---|
| minigu/storage/src/ap/transaction.rs | New transaction implementation with commit/abort and undo buffer for AP storage |
| minigu/storage/src/ap/olap_graph.rs | Core changes adding EdgeId, timestamps, property versioning, and transactional methods |
| minigu/storage/src/ap/olap_storage.rs | Trait updates adding txn_id method and timestamp-aware iterators |
| minigu/storage/src/ap/iterators/edge_iterator.rs | Adds EdgeIterAtTs with timestamp-based visibility filtering |
| minigu/storage/src/ap/iterators/adjacency_iterator.rs | Adds AdjacencyIteratorAtTs with timestamp-based visibility filtering |
| minigu/storage/src/ap/mod.rs | Exports new transaction module |
| minigu/storage/tests/ap/transaction_test.rs | Comprehensive tests for transactional operations and MVCC behavior |
| minigu/storage/tests/ap/ap_graph_test.rs | Updates existing tests to use new transaction API and EdgeId mappings |
| minigu/storage/tests/ap/mod.rs | Exports new transaction test module |
| minigu/storage/src/tp/memory_graph.rs | Replaces unreachable! with empty match arms for flexibility |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 11 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if let Some(last) = pb.values[offset].last() | ||
| && last.ts == self.txn_id | ||
| { | ||
| pb.values[offset].pop(); | ||
| } |
There was a problem hiding this comment.
abort() assumes the aborted txn’s property version is the last element (pb.values[offset].last().ts == self.txn_id) before popping it. With concurrent writers, another txn can append a newer version and the aborted txn’s version won’t be removed, potentially leaving uncommitted data or corrupting state. This needs stronger write serialization/conflict detection or removal of all versions with ts == self.txn_id.
| if let Some(last) = pb.values[offset].last() | |
| && last.ts == self.txn_id | |
| { | |
| pb.values[offset].pop(); | |
| } | |
| // Remove any versions written by this (now aborting) transaction, | |
| // in case concurrent writers have appended newer versions. | |
| pb.values[offset].retain(|v| v.ts != self.txn_id); |
| // 2.1 Scan next block once scanned empty edge | ||
| if raw.label_id == NonZeroU32::new(1) && raw.dst_id == 1 { | ||
| self.offset = 0; | ||
| self.block_idx += 1; | ||
| continue; | ||
| } |
There was a problem hiding this comment.
This iterator treats (label_id==1 && dst_id==1) as an end-of-block sentinel. Transactional deletes also write the same tombstone values, so hitting a deleted slot mid-block will terminate iteration early and skip remaining edges. Use edge_counter/eid==0 for termination and explicitly skip tombstones while continuing the scan.
| // 2.1 Scan next block once scanned empty edge | |
| if raw.label_id == NonZeroU32::new(1) && raw.dst_id == 1 { | |
| self.offset = 0; | |
| self.block_idx += 1; | |
| continue; | |
| } | |
| // 2.1 Determine logical end of block and skip tombstones | |
| // Use eid == 0 as the end-of-block sentinel to avoid | |
| // confusing transactional tombstones with padding. | |
| if raw.eid == 0 { | |
| // No more valid edges in this block; move to next block. | |
| self.offset = 0; | |
| self.block_idx += 1; | |
| continue; | |
| } | |
| // Transactional delete tombstone: skip this edge but continue scanning. | |
| if raw.label_id == NonZeroU32::new(1) && raw.dst_id == 1 { | |
| self.offset += 1; | |
| continue; | |
| } |
| // update properties | ||
| let mut property_columns = self.property_columns.write().unwrap(); | ||
| for (i, prop) in indices.into_iter().zip(props.into_iter()) { | ||
| let column = &mut property_columns[i]; | ||
| // ensure property block exists for this block index |
There was a problem hiding this comment.
set_edge_property_in_txn indexes property_columns[i] directly, but indices comes from the caller and is not bounds-checked. An out-of-range index will panic and crash the process; this should be validated and turned into a StorageError (also consider validating indices.len() == props.len() instead of silently dropping extras via zip).
| let raw: &OlapStorageEdge = &block.edges[self.offset]; | ||
| // Scan next block once scanned empty edge | ||
| if raw.label_id == NonZeroU32::new(1) && raw.dst_id == 1 { | ||
| self.offset = 0; | ||
| self.block_idx = if let Some(idx) = block.pre_block_index { | ||
| idx | ||
| self.block_idx = block.pre_block_index.unwrap_or(usize::MAX); |
There was a problem hiding this comment.
This iterator treats (label_id==1 && dst_id==1) as an end-of-block sentinel, but transactional deletes also write those tombstone values. That can stop adjacency scans early and miss valid edges after the deleted slot. Prefer stopping by edge_counter/eid==0 and skipping tombstones instead of treating them as the terminator.
| PropertyBlock { | ||
| values: vec![None; BLOCK_CAPACITY], | ||
| values: vec![Vec::new(); BLOCK_CAPACITY], | ||
| min_ts: Timestamp::max_commit_ts(), | ||
| max_ts: Timestamp::with_ts(0), | ||
| }, |
There was a problem hiding this comment.
After inserting the initial PropertyVersion entries (ts=0) for a committed edge, PropertyBlock.min_ts/max_ts remain at their sentinel values (max_commit_ts/0). This makes timestamp metadata inconsistent with the actual stored versions and can break timestamp pruning/compression. Update min_ts/max_ts when writing the initial versions.
|
|
||
| // ensure property columns exist based on edge properties | ||
| let mut property_columns = self.property_columns.write().unwrap(); | ||
| while property_columns.len() <= edge.properties.properties.len() { |
There was a problem hiding this comment.
while property_columns.len() <= edge.properties.properties.len() grows the column vector one past the number of properties (e.g., 0 props still allocates 1 column). This looks like an off-by-one and can cause unnecessary columns / misalignment. Use < and/or base this on the max index you intend to write.
| while property_columns.len() <= edge.properties.properties.len() { | |
| while property_columns.len() < edge.properties.properties.len() { |
| let mut props_vec: Vec<minigu_common::value::ScalarValue> = Vec::new(); | ||
| { | ||
| let property_columns = self.property_columns.read().unwrap(); | ||
| for col in property_columns.iter() { | ||
| if let Some(pb) = col.blocks.get(block_idx) | ||
| && let Some(versions) = pb.values.get(offset) | ||
| && let Some(v) = latest_prop_value(versions) | ||
| { | ||
| props_vec.push(v); | ||
| } else { | ||
| props_vec.push(minigu_common::value::ScalarValue::Null); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
delete_edge_in_txn builds props_vec but never uses it. Either remove this collection or include it in the undo information so abort can restore properties consistently.
| let mut props_vec: Vec<minigu_common::value::ScalarValue> = Vec::new(); | |
| { | |
| let property_columns = self.property_columns.read().unwrap(); | |
| for col in property_columns.iter() { | |
| if let Some(pb) = col.blocks.get(block_idx) | |
| && let Some(versions) = pb.values.get(offset) | |
| && let Some(v) = latest_prop_value(versions) | |
| { | |
| props_vec.push(v); | |
| } else { | |
| props_vec.push(minigu_common::value::ScalarValue::Null); | |
| } | |
| } | |
| } |
|
|
||
| Ok(commit_ts) | ||
| } | ||
|
|
||
| pub fn abort(&self) -> StorageResult<()> { |
There was a problem hiding this comment.
commit_at doesn’t clear undo_buffer, and abort() doesn’t check whether the transaction was already committed. That makes it possible to commit successfully and later call abort() to roll back committed data. Consider clearing undo_buffer on commit and returning an error from abort() when commit_ts is set.
| Ok(commit_ts) | |
| } | |
| pub fn abort(&self) -> StorageResult<()> { | |
| // Clear undo buffer so that committed transactions cannot be rolled back | |
| self.undo_buffer.write().clear(); | |
| Ok(commit_ts) | |
| } | |
| pub fn abort(&self) -> StorageResult<()> { | |
| // Prevent abort from running after the transaction has been committed | |
| if let Some(commit_ts) = self.commit_ts.get().copied() { | |
| return Err(StorageError::Transaction( | |
| TransactionError::TransactionAlreadyCommitted(format!( | |
| "abort called on already committed transaction at {:?}", | |
| commit_ts | |
| )), | |
| )); | |
| } |
feat(storage): add block level timestamp to OLAP storage
Type
feat: (new feature)fix: (bug fix)docs: (doc update)refactor: (refactor code)test: (test code)chore: (other updates)Scope
query: (query engine)parser: (frontend parser)planner: (frontend planner)optimizer: (query optimizer)executor: (execution engine)op: (operators)storage: (storage engine)mvcc: (multi version concurrency control)schema: (graph model and topology)tool: (tools)cli: (cli)sdk: (sdk)none: (N/A)Description
Issue: #111
Checklist
masterbranch.