@@ -16,8 +16,7 @@ use raphtory_api::core::{
1616use storage:: {
1717 api:: { edges:: EdgeSegmentOps , graph_props:: GraphPropSegmentOps , nodes:: NodeSegmentOps } ,
1818 error:: StorageError ,
19- pages:: resolve_pos,
20- persist:: { config:: ConfigOps , strategy:: PersistenceStrategy } ,
19+ persist:: strategy:: PersistenceStrategy ,
2120 resolver:: GIDResolverOps ,
2221 wal:: { GraphReplay , TransactionID , LSN } ,
2322 ES , GS , NS ,
3332 fn replay_add_edge (
3433 & mut self ,
3534 lsn : LSN ,
36- transaction_id : TransactionID ,
35+ _transaction_id : TransactionID ,
3736 t : EventTime ,
3837 src_name : Option < GID > ,
3938 src_id : VID ,
4443 layer_id : usize ,
4544 props : Vec < ( String , usize , Prop ) > ,
4645 ) -> Result < ( ) , StorageError > {
47- let node_max_page_len = self . graph ( ) . extension ( ) . config ( ) . max_node_page_len ( ) ;
48- let edge_max_page_len = self . graph ( ) . extension ( ) . config ( ) . max_edge_page_len ( ) ;
49-
50- // 1. Insert prop ids into edge meta.
51- // No need to validate props again since they are already validated before
52- // being logged to the WAL.
53-
54- // 2. Insert node ids into resolver.
46+ // Insert node ids into resolver.
5547 if let Some ( src_name) = src_name. as_ref ( ) {
5648 self . graph ( )
5749 . logical_to_physical
@@ -64,16 +56,16 @@ where
6456 . set ( dst_name. as_ref ( ) , dst_id) ?;
6557 }
6658
67- // 4. Grab src writer and add edge data.
68- let ( src_segment_id, src_pos) = resolve_pos ( src_id, node_max_page_len) ;
69- let resize_vid = VID :: from ( src_id. index ( ) + 1 ) ;
70- self . resize_chunks_to_vid ( resize_vid) ; // Create enough segments.
59+ // Grab src writer and add edge data.
60+ let ( src_segment_id, src_pos) = self . graph ( ) . storage ( ) . nodes ( ) . resolve_pos ( src_id) ;
61+ self . resize_segments_to_vid ( src_id) ; // Create enough segments.
7162
7263 let segment = self
7364 . graph ( )
7465 . storage ( )
7566 . nodes ( )
7667 . get_or_create_segment ( src_segment_id) ;
68+
7769 let immut_lsn = segment. immut_lsn ( ) ;
7870
7971 // Replay this entry only if it doesn't exist in immut.
@@ -112,16 +104,16 @@ where
112104 drop ( src_writer) ;
113105 }
114106
115- // 5. Grab dst writer and add edge data.
116- let ( dst_segment_id, dst_pos) = resolve_pos ( dst_id, node_max_page_len) ;
117- let resize_vid = VID :: from ( dst_id. index ( ) + 1 ) ;
118- self . resize_chunks_to_vid ( resize_vid) ;
107+ // Grab dst writer and add edge data.
108+ let ( dst_segment_id, dst_pos) = self . graph ( ) . storage ( ) . nodes ( ) . resolve_pos ( dst_id) ;
109+ self . resize_segments_to_vid ( dst_id) ;
119110
120111 let segment = self
121112 . graph ( )
122113 . storage ( )
123114 . nodes ( )
124115 . get_or_create_segment ( dst_segment_id) ;
116+
125117 let immut_lsn = segment. immut_lsn ( ) ;
126118
127119 // Replay this entry only if it doesn't exist in immut.
@@ -157,24 +149,26 @@ where
157149 drop ( dst_writer) ;
158150 }
159151
160- // 6. Grab edge writer and add temporal props & metadata.
161- let ( edge_segment_id, edge_pos) = resolve_pos ( eid, edge_max_page_len) ;
162- let resize_eid = EID :: from ( eid. index ( ) + 1 ) ;
163- self . resize_chunks_to_eid ( resize_eid) ;
152+ // Grab edge writer and add temporal props & metadata.
153+ let ( edge_segment_id, edge_pos) = self . graph ( ) . storage ( ) . edges ( ) . resolve_pos ( eid) ;
154+ self . resize_segments_to_eid ( eid) ;
164155
165156 let segment = self
166157 . graph ( )
167158 . storage ( )
168159 . edges ( )
169160 . get_or_create_segment ( edge_segment_id) ;
161+
170162 let immut_lsn = segment. immut_lsn ( ) ;
171163
172164 // Replay this entry only if it doesn't exist in immut.
173165 if immut_lsn < lsn {
174166 let edge_meta = self . graph ( ) . edge_meta ( ) ;
175167
168+ // Insert prop ids into edge meta.
176169 for ( prop_name, prop_id, prop_value) in & props {
177170 let prop_mapper = edge_meta. temporal_prop_mapper ( ) ;
171+
178172 match prop_mapper. get_dtype ( * prop_id) {
179173 None => {
180174 prop_mapper. set_id_and_dtype (
@@ -194,7 +188,7 @@ where
194188 }
195189 }
196190
197- // 3. Insert layer id into the layer meta of both edge and node.
191+ // Insert layer id into the layer meta of both edge and node.
198192 let node_meta = self . graph ( ) . node_meta ( ) ;
199193
200194 edge_meta
@@ -233,4 +227,95 @@ where
233227
234228 Ok ( ( ) )
235229 }
230+
231+ fn replay_add_node (
232+ & mut self ,
233+ lsn : LSN ,
234+ _transaction_id : TransactionID ,
235+ t : EventTime ,
236+ node_name : Option < GID > ,
237+ node_id : VID ,
238+ node_type_and_id : Option < ( String , usize ) > ,
239+ props : Vec < ( String , usize , Prop ) > ,
240+ ) -> Result < ( ) , StorageError > {
241+ // Insert node id into resolver.
242+ if let Some ( ref name) = node_name {
243+ self . graph ( )
244+ . logical_to_physical
245+ . set ( name. as_ref ( ) , node_id) ?;
246+ }
247+
248+ // Resolve segment and check LSN.
249+ let ( segment_id, pos) = self . graph ( ) . storage ( ) . nodes ( ) . resolve_pos ( node_id) ;
250+ self . resize_segments_to_vid ( node_id) ;
251+
252+ let segment = self
253+ . graph ( )
254+ . storage ( )
255+ . nodes ( )
256+ . get_or_create_segment ( segment_id) ;
257+
258+ let immut_lsn = segment. immut_lsn ( ) ;
259+
260+ // Replay this entry only if it doesn't exist in immut.
261+ if immut_lsn < lsn {
262+ let node_meta = self . graph ( ) . node_meta ( ) ;
263+
264+ for ( prop_name, prop_id, prop_value) in & props {
265+ let prop_mapper = node_meta. temporal_prop_mapper ( ) ;
266+ match prop_mapper. get_dtype ( * prop_id) {
267+ None => {
268+ prop_mapper. set_id_and_dtype (
269+ prop_name. as_str ( ) ,
270+ * prop_id,
271+ prop_value. dtype ( ) ,
272+ ) ;
273+ }
274+ Some ( old_dtype) => {
275+ let dtype = prop_value. dtype ( ) ;
276+ let mut unified = false ;
277+ let new_dtype = unify_types ( & old_dtype, & dtype, & mut unified) ?;
278+ if unified {
279+ prop_mapper. set_dtype ( * prop_id, new_dtype) ;
280+ }
281+ }
282+ }
283+ }
284+
285+ // Set node type metadata early to prevent issues with borrowing node_writer.
286+ if let Some ( ( ref node_type, node_type_id) ) = node_type_and_id {
287+ node_meta
288+ . node_type_meta ( )
289+ . set_id ( node_type. as_str ( ) , node_type_id) ;
290+ }
291+
292+ let mut node_writer = self . nodes . get_mut ( segment_id) . unwrap ( ) . writer ( ) ;
293+
294+ if !node_writer. has_node ( pos, STATIC_GRAPH_LAYER_ID ) {
295+ node_writer. increment_seg_num_nodes ( ) ;
296+ }
297+
298+ if let Some ( name) = node_name {
299+ node_writer. store_node_id ( pos, STATIC_GRAPH_LAYER_ID , name) ;
300+ }
301+
302+ if let Some ( ( _, node_type_id) ) = node_type_and_id {
303+ node_writer. store_node_type ( pos, STATIC_GRAPH_LAYER_ID , node_type_id) ;
304+ }
305+
306+ // Add the node with its timestamp and props.
307+ node_writer. add_props (
308+ t,
309+ pos,
310+ STATIC_GRAPH_LAYER_ID ,
311+ props
312+ . into_iter ( )
313+ . map ( |( _, prop_id, prop_value) | ( prop_id, prop_value) ) ,
314+ ) ;
315+
316+ node_writer. mut_segment . set_lsn ( lsn) ;
317+ }
318+
319+ Ok ( ( ) )
320+ }
236321}
0 commit comments