Skip to content
Open
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
1a2a8de
Add logging and replay for add_updates for edges
fabubaker Feb 12, 2026
6263303
Remove separate add_edge_properties wal entry
fabubaker Feb 13, 2026
eb97fed
Use set_or_unify_id_and_dtype during replay
fabubaker Feb 13, 2026
d110fb9
Add logging and replay for AddEdgeMetadata
fabubaker Feb 13, 2026
4df7734
chore: apply tidy-public auto-fixes
github-actions[bot] Feb 13, 2026
d5564ee
Remove src_id and dst_id for add_metadata
fabubaker Feb 18, 2026
34bba81
Use AddEdgeMetadata logging methods
fabubaker Feb 18, 2026
2039f22
Add set_lsn method for writers
fabubaker Feb 18, 2026
1fd78f2
Use correct mapper for metadata
fabubaker Feb 18, 2026
74fd32b
Create add_metadata_impl and reuse
fabubaker Feb 18, 2026
2cc6064
Merge branch 'db_v4' into db_v4_/wal-edge-view
fabubaker Feb 18, 2026
86e5d5a
chore: apply tidy-public auto-fixes
github-actions[bot] Feb 18, 2026
1d92c7a
Add wal logging for NodeView.add_updates
fabubaker Feb 19, 2026
b672f93
Create add_metadata_impl for NodeView and reuse
fabubaker Feb 19, 2026
8030746
Add wal logging and replay for NodeView.add_metadata
fabubaker Feb 19, 2026
5d43d37
chore: apply tidy-public auto-fixes
github-actions[bot] Feb 19, 2026
d936726
Add wal for set_node_type
fabubaker Feb 20, 2026
e2842e6
Add FIXME for actual set_node_type logging
fabubaker Feb 20, 2026
e1bf5ae
Add logging and replay for graph_props
fabubaker Feb 20, 2026
a46cd98
Implement set_lsn for GraphPropWriter
fabubaker Feb 20, 2026
acf70ad
Return GraphPropWriterT
fabubaker Feb 20, 2026
d4a0c53
Call wal.log in add_properties
fabubaker Feb 20, 2026
f9db442
Call wal.log in add_properties
fabubaker Feb 20, 2026
fd2e3d8
Remove some unwraps and collects in replay
fabubaker Feb 23, 2026
67e623b
Merge branch 'db_v4' of github.com:Pometry/Raphtory into db_v4_/wal-e…
fabubaker Feb 23, 2026
ef2d9f3
Fix some more merge conflicts
fabubaker Feb 23, 2026
8873415
Add logging for set_node_type
fabubaker Feb 23, 2026
734c2df
Run fmt
fabubaker Feb 23, 2026
ebee5da
Run fmt
fabubaker Feb 23, 2026
3d45e82
Cleanup and fix reserve_and_lock_segment
fabubaker Feb 23, 2026
3f469a2
Remove Option from WriteLockedGraphPropPages writer
fabubaker Feb 23, 2026
d73f3f8
Check lsn during graph prop replay
fabubaker Feb 23, 2026
20a3b77
Refactor graph props add_metadata
fabubaker Feb 23, 2026
cca29fb
Implement logging and replay for graph metadata
fabubaker Feb 23, 2026
72200d6
Use is_update correctly
fabubaker Feb 23, 2026
9f95da8
Implement logging and replay for delete_edge
fabubaker Feb 23, 2026
d30f910
Run fmt
fabubaker Feb 23, 2026
f69e1bf
Simplify some comments
fabubaker Feb 23, 2026
13a9304
Run fmt
fabubaker Feb 24, 2026
e17ae0f
fix some review comments
fabianmurariu Feb 24, 2026
f957390
fmt
fabianmurariu Feb 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
405 changes: 359 additions & 46 deletions db4-graph/src/replay.rs

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion db4-storage/src/api/graph_props.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{error::StorageError, segments::graph_prop::segment::MemGraphPropSegment};
use crate::{error::StorageError, segments::graph_prop::segment::MemGraphPropSegment, wal::LSN};
use parking_lot::{RwLockReadGuard, RwLockWriteGuard};
use raphtory_api::core::entities::properties::{meta::Meta, prop::Prop, tprop::TPropOps};
use std::{fmt::Debug, path::Path, sync::Arc};
Expand Down Expand Up @@ -31,6 +31,9 @@ where

fn set_dirty(&self, dirty: bool);

/// Returns the latest lsn for the immutable part of this segment.
fn immut_lsn(&self) -> LSN;

fn notify_write(
&self,
mem_segment: &mut RwLockWriteGuard<'_, MemGraphPropSegment>,
Expand Down
8 changes: 7 additions & 1 deletion db4-storage/src/pages/edge_page/writer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
LocalPOS, api::edges::EdgeSegmentOps, error::StorageError, pages::layer_counter::GraphStats,
segments::edge::segment::MemEdgeSegment,
segments::edge::segment::MemEdgeSegment, wal::LSN,
};
use raphtory_api::core::entities::{
VID,
Expand Down Expand Up @@ -72,9 +72,11 @@ impl<'a, MP: DerefMut<Target = MemEdgeSegment> + std::fmt::Debug, ES: EdgeSegmen
let existing_edge = self
.page
.contains_edge(edge_pos, layer_id, self.writer.deref());

if !existing_edge {
self.increment_layer_num_edges(layer_id);
}

self.graph_stats.update_time(t.t());
self.writer
.delete_edge_internal(t, edge_pos, src, dst, layer_id);
Expand Down Expand Up @@ -166,6 +168,10 @@ impl<'a, MP: DerefMut<Target = MemEdgeSegment> + std::fmt::Debug, ES: EdgeSegmen
self.page.get_edge(edge_pos, layer_id, self.writer.deref())
}

pub fn set_lsn(&mut self, lsn: LSN) {
self.writer.set_lsn(lsn);
}

pub fn check_metadata(
&self,
edge_pos: LocalPOS,
Expand Down
14 changes: 9 additions & 5 deletions db4-storage/src/pages/graph_prop_page/writer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
api::graph_props::GraphPropSegmentOps, error::StorageError,
segments::graph_prop::segment::MemGraphPropSegment,
segments::graph_prop::segment::MemGraphPropSegment, wal::LSN,
};
use parking_lot::RwLockWriteGuard;
use raphtory_api::core::entities::properties::prop::Prop;
Expand Down Expand Up @@ -35,16 +35,20 @@ impl<'a, GS: GraphPropSegmentOps> GraphPropWriter<'a, GS> {
self.graph_props.set_dirty(true);
}

pub fn check_metadata(&self, props: &[(usize, Prop)]) -> Result<(), StorageError> {
self.mem_segment.check_metadata(props)
}

pub fn update_metadata(&mut self, props: impl IntoIterator<Item = (usize, Prop)>) {
let add = self.mem_segment.update_metadata(props);

self.graph_props.increment_est_size(add);
self.graph_props.set_dirty(true);
}

pub fn check_metadata(&self, props: &[(usize, Prop)]) -> Result<(), StorageError> {
self.mem_segment.check_metadata(props)
}

pub fn set_lsn(&mut self, lsn: LSN) {
self.mem_segment.set_lsn(lsn);
}
}

impl<GS: GraphPropSegmentOps> Drop for GraphPropWriter<'_, GS> {
Expand Down
21 changes: 9 additions & 12 deletions db4-storage/src/pages/locked/graph_props.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
api::graph_props::GraphPropSegmentOps, segments::graph_prop::segment::MemGraphPropSegment,
wal::LSN,
};
use parking_lot::RwLockWriteGuard;
use raphtory_api::core::entities::properties::prop::Prop;
Expand Down Expand Up @@ -43,6 +44,10 @@ impl<'a, GS: GraphPropSegmentOps> LockedGraphPropPage<'a, GS> {
self.page.increment_est_size(add);
self.page.set_dirty(true);
}

pub fn set_lsn(&mut self, lsn: LSN) {
self.lock.set_lsn(lsn);
}
}

impl<GS: GraphPropSegmentOps> Drop for LockedGraphPropPage<'_, GS> {
Expand All @@ -54,23 +59,15 @@ impl<GS: GraphPropSegmentOps> Drop for LockedGraphPropPage<'_, GS> {
}

pub struct WriteLockedGraphPropPages<'a, GS: GraphPropSegmentOps> {
writer: Option<LockedGraphPropPage<'a, GS>>,
}

impl<GS: GraphPropSegmentOps> Default for WriteLockedGraphPropPages<'_, GS> {
fn default() -> Self {
Self { writer: None }
}
writer: LockedGraphPropPage<'a, GS>,
}

impl<'a, GS: GraphPropSegmentOps> WriteLockedGraphPropPages<'a, GS> {
pub fn new(writer: LockedGraphPropPage<'a, GS>) -> Self {
Self {
writer: Some(writer),
}
Self { writer }
}

pub fn writer(&mut self) -> Option<&mut LockedGraphPropPage<'a, GS>> {
self.writer.as_mut()
pub fn writer(&mut self) -> &mut LockedGraphPropPage<'a, GS> {
&mut self.writer
}
}
3 changes: 3 additions & 0 deletions db4-storage/src/pages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,15 @@ impl<
pub fn flush(&self) -> Result<(), StorageError> {
let node_types = self.nodes.prop_meta().get_all_node_types();
let config = self.ext.config().with_node_types(node_types);

if let Some(graph_dir) = self.graph_dir.as_ref() {
config.save_to_dir(graph_dir)?;
}

self.nodes.flush()?;
self.edges.flush()?;
self.graph_props.flush()?;

Ok(())
}
}
Expand Down
13 changes: 12 additions & 1 deletion db4-storage/src/pages/node_page/writer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
LocalPOS, api::nodes::NodeSegmentOps, error::StorageError, pages::layer_counter::GraphStats,
segments::node::segment::MemNodeSegment,
segments::node::segment::MemNodeSegment, wal::LSN,
};
use raphtory_api::core::entities::{
EID, GID, VID,
Expand Down Expand Up @@ -222,6 +222,10 @@ impl<'a, MP: DerefMut<Target = MemNodeSegment> + 'a, NS: NodeSegmentOps> NodeWri
pub fn has_node(&self, node: LocalPOS, layer_id: usize) -> bool {
self.mut_segment.has_node(node, layer_id) || self.page.has_node(node, layer_id)
}

pub fn set_lsn(&mut self, lsn: LSN) {
self.mut_segment.set_lsn(lsn);
}
}

pub fn node_info_as_props(
Expand Down Expand Up @@ -260,4 +264,11 @@ impl<'a, MP: DerefMut<Target = MemNodeSegment>, NS: NodeSegmentOps> NodeWriters<
pub fn get_mut_dst(&mut self) -> &mut NodeWriter<'a, MP, NS> {
self.dst.as_mut().unwrap_or(&mut self.src)
}

pub fn set_lsn(&mut self, lsn: LSN) {
self.src.set_lsn(lsn);
if let Some(dst) = &mut self.dst {
dst.set_lsn(lsn);
}
}
}
88 changes: 37 additions & 51 deletions db4-storage/src/pages/node_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ pub const N: usize = 32;
pub struct NodeStorageInner<NS, EXT> {
segments: boxcar::Vec<Arc<NS>>,
stats: Arc<GraphStats>,

/// Contains ids of segments that can accomodate new nodes.
free_segments: Box<[RwLock<usize>; N]>,

nodes_path: Option<PathBuf>,
node_meta: Arc<Meta>,
edge_meta: Arc<Meta>,
Expand Down Expand Up @@ -281,59 +284,47 @@ impl<NS: NodeSegmentOps<Extension = EXT>, EXT: PersistenceStrategy> NodeStorageI
}
}

/// Reserves space in a segment and returns a locked writer for that segment.
///
/// This method implements a lock-free reservation strategy with lazy segment allocation:
///
/// **What it does:**
/// 1. Uses the `row` parameter to determine a starting slot in the `free_segments` array (via modulo N)
/// 2. Attempts to acquire a write lock on the segment at that slot
/// 3. If successful, tries to atomically reserve `required_space` rows in that segment
/// 4. If the segment is full or locked, tries the next slot (round-robin fashion)
/// 5. When a segment becomes full, creates a new segment and updates the slot atomically
/// 6. Returns both the reserved position and a locked writer for immediate use
///
/// **Why it works this way:**
/// - **Lock-free fast path**: Uses `try_writer` to avoid blocking on contended segments
/// - **Load distribution**: The `row % N` ensures different threads naturally distribute across slots
/// - **Atomic reservation**: `reserve_segment_rows` uses atomic compare-and-swap to reserve space
/// without holding locks during the reservation attempt
/// - **Double-check locking**: When creating a new segment, re-checks if another thread already
/// created one (via `if *slot == page_id`) to avoid unnecessary allocations
/// - **Keeps writer locked**: Returns the writer still locked so the caller can immediately write
/// to the reserved space without risk of concurrent modification
/// - **Circular probing**: If a slot is contended or full, moves to the next slot rather than
/// blocking, maximizing throughput in multi-threaded scenarios
///
/// This design prioritizes:
/// - High concurrency: Multiple threads can reserve space simultaneously in different segments
/// - Low latency: Avoids blocking whenever possible by trying different segments
/// - Memory efficiency: Only creates new segments when actually needed
/// Select a segment using `row` as a hint and reserves `num_rows` in that segment.
/// Returns the reserved position and a locked writer for that segment.
pub fn reserve_and_lock_segment(
&self,
row: usize,
required_space: u32,
num_rows: u32,
) -> (
LocalPOS,
NodeWriter<'_, RwLockWriteGuard<'_, MemNodeSegment>, NS>,
) {
let mut slot_idx = row % N;
let mut page_id = *self.free_segments[slot_idx].read_recursive();
let mut segment_id = *self.free_segments[slot_idx].read_recursive();

// Iterate through `free_segments` until we can acquire a write lock on a free segment.
// With the write lock, try reserving `num_rows` in that segment.
// If unsuccessful, find another free segment and try again, creating a new one if needed.
loop {
match self.try_writer(page_id) {
match self.try_writer(segment_id) {
None => {
// The current segment is being written to, round-robin to the next slot.
slot_idx = (slot_idx + 1) % N;
page_id = *self.free_segments[slot_idx].read_recursive();
let slot = self.free_segments[slot_idx].read_recursive();
segment_id = *slot;
}
Some(writer) => {
match self.reserve_segment_rows(writer.page, required_space) {
match self.reserve_segment_rows(writer.page, num_rows) {
None => {
// segment is full, we need to create a new one
// The current segment is full, drop its lock and try to find
// another free segment.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not trying to find a free segment here, we are going to replace the segment at this slot with a new one (unless another thread managed to replace it before we get to it)

drop(writer);

let mut slot = self.free_segments[slot_idx].write();
if *slot == page_id {
// page_id is unchanged, no other thread created a new segment before we got the lock
page_id = self.push_new_segment();
*slot = page_id;

// Check our slot to see if some other thread has pushed a new free segment.
if *slot == segment_id {
// The segment is still the same, so we need to create a new one.
segment_id = self.push_new_segment();
*slot = segment_id;
} else {
// There is another free segment in the slot, retry using it.
segment_id = *slot;
}
}
Some(local_pos) => return (LocalPOS(local_pos), writer),
Expand All @@ -344,10 +335,13 @@ impl<NS: NodeSegmentOps<Extension = EXT>, EXT: PersistenceStrategy> NodeStorageI
}

/// Reserves a single row in the given segment and returns the position if successful.
/// Returns `None` if the segment is full.
pub fn reserve_segment_row(&self, segment: &NS) -> Option<u32> {
self.reserve_segment_rows(segment, 1)
}

/// Reserves `rows` in the given segment and returns the position if successful.
/// Returns `None` if the segment is full.
fn reserve_segment_rows(&self, segment: &NS, rows: u32) -> Option<u32> {
increment_and_clamp(segment.nodes_counter(), rows, self.max_segment_len())
}
Expand Down Expand Up @@ -637,25 +631,17 @@ impl<NS: NodeSegmentOps<Extension = EXT>, EXT: PersistenceStrategy> NodeStorageI
}
}

/// Atomically increments a counter and returns the previous value, but only if the result stays within bounds.
///
/// 1. Atomically reads the current counter value
/// 2. Computes `current + increment`
/// 3. If the result is ≤ `max_segment_len`, updates the counter and returns the *previous* value
/// 4. If the result would exceed the limit, leaves the counter unchanged and returns `None`
///
pub fn increment_and_clamp(
counter: &AtomicU32,
increment: u32,
max_segment_len: u32,
) -> Option<u32> {
/// Atomically increments `counter` and returns the previous value, but only if the result stays
/// within bounds.
/// If the result exceeds `limit`, leaves the counter unchanged and returns `None`.
pub fn increment_and_clamp(counter: &AtomicU32, increment: u32, limit: u32) -> Option<u32> {
counter
.fetch_update(
std::sync::atomic::Ordering::Relaxed,
std::sync::atomic::Ordering::Relaxed,
|current| {
let updated = current + increment;
if updated <= max_segment_len {
if updated <= limit {
Some(updated)
} else {
None
Expand Down
42 changes: 6 additions & 36 deletions db4-storage/src/pages/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,11 @@ use crate::{
};
use parking_lot::RwLockWriteGuard;
use raphtory_api::core::{
entities::properties::{
meta::{NODE_ID_IDX, STATIC_GRAPH_LAYER_ID},
prop::Prop,
},
entities::properties::{meta::STATIC_GRAPH_LAYER_ID, prop::Prop},
storage::dict_mapper::MaybeNew,
};
use raphtory_core::{
entities::{EID, ELID, GidRef, VID},
entities::{EID, ELID, VID},
storage::timeindex::AsTime,
};

Expand Down Expand Up @@ -52,30 +49,6 @@ impl<
}
}

pub fn store_src_node_info(&mut self, vid: impl Into<VID>, node_id: Option<GidRef>) {
if let Some(id) = node_id {
let pos = self.graph.nodes().resolve_pos(vid.into()).1;

self.node_writers().get_mut_src().update_c_props(
pos,
STATIC_GRAPH_LAYER_ID,
[(NODE_ID_IDX, id.into())],
);
}
}

pub fn store_dst_node_info(&mut self, vid: impl Into<VID>, node_id: Option<GidRef>) {
if let Some(id) = node_id {
let pos = self.graph.nodes().resolve_pos(vid.into()).1;

self.node_writers().get_mut_dst().update_c_props(
pos,
STATIC_GRAPH_LAYER_ID,
[(NODE_ID_IDX, id.into())],
);
}
}

pub fn add_edge_into_layer<T: AsTime>(
&mut self,
t: T,
Expand Down Expand Up @@ -168,6 +141,7 @@ impl<
self.node_writers
.get_mut_src()
.add_outbound_edge(Some(t), src_pos, dst, edge_id);

self.node_writers
.get_mut_dst()
.add_inbound_edge(Some(t), dst_pos, src, edge_id);
Expand All @@ -176,6 +150,7 @@ impl<
self.node_writers
.get_mut_src()
.update_deletion_time(t, src_pos, e_id);

self.node_writers
.get_mut_dst()
.update_deletion_time(t, dst_pos, e_id);
Expand Down Expand Up @@ -226,12 +201,7 @@ impl<
}

pub fn set_lsn(&mut self, lsn: LSN) {
self.node_writers.src.mut_segment.set_lsn(lsn);

if let Some(dst) = &mut self.node_writers.dst {
dst.mut_segment.set_lsn(lsn);
}

self.edge_writer.writer.set_lsn(lsn);
self.node_writers.set_lsn(lsn);
self.edge_writer.set_lsn(lsn);
}
}
Loading
Loading