Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions minigu/storage/src/ap/iterators/adjacency_iterator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::num::NonZeroU32;
use std::sync::OnceLock;

use minigu_common::types::VertexId;
use minigu_transaction::Timestamp;

use crate::ap::olap_graph::{OlapEdge, OlapPropertyStore, OlapStorage, OlapStorageEdge};
use crate::error::StorageError;
Expand Down Expand Up @@ -102,3 +104,129 @@ impl Iterator for AdjacencyIterator<'_> {
None
}
}

#[allow(dead_code)]
pub struct AdjacencyIteratorAtTs<'a> {
pub storage: &'a OlapStorage,
// Vertex ID
pub vertex_id: VertexId,
// Index of the current block
pub block_idx: usize,
// Offset within block
pub offset: usize,
pub txn_id: Option<Timestamp>,
pub commit_ts: OnceLock<Timestamp>,
}
impl Iterator for AdjacencyIteratorAtTs<'_> {
type Item = Result<OlapEdge, StorageError>;

fn next(&mut self) -> Option<Self::Item> {
while self.block_idx != usize::MAX {
let temporary = self.storage.edges.read().unwrap();
let option = temporary.get(self.block_idx);

// Return if none,should not happen
let _v = option?;

let block = option.unwrap();
// Return if tombstone
if block.is_tombstone {
if option?.pre_block_index.is_none() {
self.block_idx = usize::MAX;
return None;
}

self.block_idx = block.pre_block_index.unwrap();
continue;
}
if let Some(ts) = self.txn_id
&& ts.raw() < block.min_ts.raw()
{
self.block_idx = if let Some(idx) = block.pre_block_index {
idx
} else {
usize::MAX
};
self.offset = 0;
continue;
}
// Move to next block
if self.offset == BLOCK_CAPACITY {
self.offset = 0;
self.block_idx = if let Some(idx) = block.pre_block_index {
idx
} else {
usize::MAX
};
continue;
}

if self.offset < BLOCK_CAPACITY {
let raw: &OlapStorageEdge = &block.edges[self.offset];
// Scan next block once scanned empty edge
if raw.label_id == NonZeroU32::new(1) && raw.dst_id == 1 {
self.offset = 0;
self.block_idx = if let Some(idx) = block.pre_block_index {
idx
} else {
usize::MAX
};
continue;
}
// Visibility filtering by edge commit_ts when target_ts is provided
if let Some(target) = self.commit_ts.get() {
if raw.commit_ts.is_txn_id() {
if let Some(txn_id) = self.txn_id {
if raw.commit_ts != txn_id {
self.offset += 1;
continue;
}
} else {
self.offset += 1;
continue;
}
} else if raw.commit_ts.raw() > target.raw() {
self.offset += 1;
continue;
}
}
// Build edge result
let edge = OlapEdge {
label_id: raw.label_id,
src_id: block.src_id,
dst_id: raw.dst_id,
properties: {
let mut props = OlapPropertyStore::default();

for (col_idx, column) in self
.storage
.property_columns
.read()
.unwrap()
.iter()
.enumerate()
{
if let Some(val) = column
.blocks
.get(self.block_idx)
.and_then(|blk| blk.values.get(self.offset))
.cloned()
{
props.set_prop(col_idx, val);
}
}
props
},
};
self.offset += 1;
return Some(Ok(edge));
}
self.block_idx = if let Some(idx) = block.pre_block_index {
idx
} else {
usize::MAX
};
}
None
}
}
111 changes: 111 additions & 0 deletions minigu/storage/src/ap/iterators/edge_iterator.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::num::NonZeroU32;
use std::sync::OnceLock;

use minigu_transaction::Timestamp;

use crate::ap::olap_graph::{OlapEdge, OlapPropertyStore, OlapStorage, OlapStorageEdge};
use crate::error::StorageError;

const BLOCK_CAPACITY: usize = 256;

pub struct EdgeIter<'a> {
pub storage: &'a OlapStorage,
// Index of the current block
Expand Down Expand Up @@ -84,3 +88,110 @@ impl Iterator for EdgeIter<'_> {
None
}
}

pub struct EdgeIterAtTs<'a> {
pub storage: &'a OlapStorage,
// Index of the current block
pub block_idx: usize,
// Offset within block
pub offset: usize,
pub txn_id: Option<Timestamp>,
pub commit_ts: OnceLock<Timestamp>,
}
impl Iterator for EdgeIterAtTs<'_> {
type Item = Result<OlapEdge, StorageError>;

fn next(&mut self) -> Option<Self::Item> {
// 1. Scan Block
let edges = self.storage.edges.read().unwrap();
while self.block_idx < edges.len() {
// 1.1 If none,move to next block
let borrow = self.storage.edges.read().unwrap();
let block = match borrow.get(self.block_idx) {
Some(block) => block,
None => {
self.block_idx += 1;
self.offset = 0;
continue;
}
};
if block.is_tombstone {
self.block_idx += 1;
self.offset = 0;
continue;
}
// Block-level timestamp filter
if let Some(ts) = self.txn_id
&& ts.raw() < block.min_ts.raw()
{
self.block_idx += 1;
self.offset = 0;
continue;
}
// 1.2 If one block has been finished,move to next
if self.offset == BLOCK_CAPACITY {
self.offset = 0;
self.block_idx += 1;
continue;
}
// 2. Scan within block
if self.offset < block.edges.len() {
let raw: &OlapStorageEdge = &block.edges[self.offset];
// 2.1 Scan next block once scanned empty edge
if raw.label_id == NonZeroU32::new(1) && raw.dst_id == 1 {
self.offset = 0;
self.block_idx += 1;
continue;
}
// 2.2 Visibility filtering by edge commit_ts
let is_visible = if self.commit_ts.get().is_some() {
if raw.commit_ts.is_txn_id() {
self.txn_id == Some(raw.commit_ts)
} else {
raw.commit_ts.raw() <= self.commit_ts.get().unwrap().raw()
}
} else {
raw.commit_ts.is_txn_id() && (self.txn_id == Some(raw.commit_ts))
};

if !is_visible {
self.offset += 1;
continue;
}

// 2.3 Build edge result
let edge = OlapEdge {
label_id: raw.label_id,
src_id: block.src_id,
dst_id: raw.dst_id,
properties: {
let mut props = OlapPropertyStore::default();

for (col_idx, column) in self
.storage
.property_columns
.read()
.unwrap()
.iter()
.enumerate()
{
if let Some(val) = column
.blocks
.get(self.block_idx)
.and_then(|blk| blk.values.get(self.offset))
.cloned()
{
props.set_prop(col_idx, val);
}
}
props
},
};
// 2.4 Increase offset
self.offset += 1;
return Some(Ok(edge));
}
}
None
}
}
1 change: 1 addition & 0 deletions minigu/storage/src/ap/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod iterators;
pub mod olap_graph;
pub mod olap_storage;
pub mod transaction;

pub use olap_storage::{MutOlapGraph, OlapGraph, StorageTransaction};
Loading