22//! Allows for fast replay by making use of one-time lock acquisition for
33//! all the segments in the graph.
44
5- use crate :: WriteLockedGraph ;
5+ use crate :: { TemporalGraph , WriteLockedGraph } ;
6+ use raphtory_api:: core:: entities:: properties:: meta:: Meta ;
67use raphtory_api:: core:: {
78 entities:: {
89 properties:: { meta:: STATIC_GRAPH_LAYER_ID , prop:: Prop } ,
@@ -175,16 +176,7 @@ where
175176 let edge_meta = self . graph ( ) . edge_meta ( ) ;
176177
177178 // Insert prop ids into edge meta.
178- for ( prop_name, prop_id, prop_value) in & props {
179- let prop_mapper = edge_meta. temporal_prop_mapper ( ) ;
180- let mut write_locked_mapper = prop_mapper. write_locked ( ) ;
181-
182- write_locked_mapper. set_or_unify_id_and_dtype (
183- prop_name. as_ref ( ) ,
184- * prop_id,
185- prop_value. dtype ( ) ,
186- ) ?;
187- }
179+ unify_types ( edge_meta, & props, true ) ?;
188180
189181 // Insert layer id into the layer meta of both edge and node.
190182 let node_meta = self . graph ( ) . node_meta ( ) ;
@@ -254,16 +246,7 @@ where
254246 if immut_lsn < lsn {
255247 let edge_meta = self . graph ( ) . edge_meta ( ) ;
256248
257- for ( prop_name, prop_id, prop_value) in & props {
258- let prop_mapper = edge_meta. metadata_mapper ( ) ;
259- let mut write_locked_mapper = prop_mapper. write_locked ( ) ;
260-
261- write_locked_mapper. set_or_unify_id_and_dtype (
262- prop_name. as_ref ( ) ,
263- * prop_id,
264- prop_value. dtype ( ) ,
265- ) ?;
266- }
249+ unify_types ( edge_meta, & props, false ) ;
267250
268251 let edge_writer = self . edges . get_mut ( edge_segment_id) . ok_or_else ( || {
269252 StorageError :: GenericFailure ( format ! (
@@ -279,7 +262,7 @@ where
279262 ) )
280263 } ) ?;
281264
282- let props = props. iter ( ) . map ( |( _, id, p) | ( * id, p. clone ( ) ) ) ;
265+ let props = props. into_iter ( ) . map ( |( _, id, p) | ( id, p) ) ;
283266
284267 // No need to check metadata since the operation was logged after validation.
285268 edge_writer. update_c_props ( edge_pos, src, dst, layer_id, props) ;
@@ -422,16 +405,7 @@ where
422405 if immut_lsn < lsn {
423406 let node_meta = self . graph ( ) . node_meta ( ) ;
424407
425- for ( prop_name, prop_id, prop_value) in & props {
426- let prop_mapper = node_meta. temporal_prop_mapper ( ) ;
427- let mut write_locked_mapper = prop_mapper. write_locked ( ) ;
428-
429- write_locked_mapper. set_or_unify_id_and_dtype (
430- prop_name. as_ref ( ) ,
431- * prop_id,
432- prop_value. dtype ( ) ,
433- ) ?;
434- }
408+ unify_types ( node_meta, & props, true ) ?;
435409
436410 // Set node type metadata early to prevent issues with borrowing node_writer.
437411 if let Some ( ( ref node_type, node_type_id) ) = node_type_and_id {
@@ -497,16 +471,7 @@ where
497471 if immut_lsn < lsn {
498472 let node_meta = self . graph ( ) . node_meta ( ) ;
499473
500- for ( prop_name, prop_id, prop_value) in & props {
501- let prop_mapper = node_meta. metadata_mapper ( ) ;
502- let mut write_locked_mapper = prop_mapper. write_locked ( ) ;
503-
504- write_locked_mapper. set_or_unify_id_and_dtype (
505- prop_name. as_ref ( ) ,
506- * prop_id,
507- prop_value. dtype ( ) ,
508- ) ?;
509- }
474+ unify_types ( & node_meta, & props, false ) ?;
510475
511476 let node_writer = self . nodes . get_mut ( segment_id) . ok_or_else ( || {
512477 StorageError :: GenericFailure ( format ! (
@@ -515,7 +480,7 @@ where
515480 } ) ?;
516481
517482 let mut node_writer = node_writer. writer ( ) ;
518- let props = props. iter ( ) . map ( |( _, id, p) | ( * id, p. clone ( ) ) ) ;
483+ let props = props. into_iter ( ) . map ( |( _, id, p) | ( id, p) ) ;
519484
520485 // No need to check metadata since the operation was logged after validation.
521486 node_writer. update_c_props ( pos, STATIC_GRAPH_LAYER_ID , props) ;
@@ -578,19 +543,10 @@ where
578543 if immut_lsn < lsn {
579544 let graph_props_meta = self . graph ( ) . graph_props_meta ( ) ;
580545
581- for ( prop_name, prop_id, prop_value) in & props {
582- let prop_mapper = graph_props_meta. temporal_prop_mapper ( ) ;
583- let mut write_locked_mapper = prop_mapper. write_locked ( ) ;
584-
585- write_locked_mapper. set_or_unify_id_and_dtype (
586- prop_name. as_ref ( ) ,
587- * prop_id,
588- prop_value. dtype ( ) ,
589- ) ?;
590- }
546+ unify_types ( graph_props_meta, & props, true ) ?;
591547
592548 let writer = self . graph_props . writer ( ) ;
593- let props = props. iter ( ) . map ( |( _, id, p) | ( * id, p. clone ( ) ) ) ;
549+ let props = props. into_iter ( ) . map ( |( _, id, p) | ( id, p) ) ;
594550
595551 writer. add_properties ( t, props) ;
596552 writer. set_lsn ( lsn) ;
@@ -611,19 +567,10 @@ where
611567 if immut_lsn < lsn {
612568 let graph_props_meta = self . graph ( ) . graph_props_meta ( ) ;
613569
614- for ( prop_name, prop_id, prop_value) in & props {
615- let prop_mapper = graph_props_meta. metadata_mapper ( ) ;
616- let mut write_locked_mapper = prop_mapper. write_locked ( ) ;
617-
618- write_locked_mapper. set_or_unify_id_and_dtype (
619- prop_name. as_ref ( ) ,
620- * prop_id,
621- prop_value. dtype ( ) ,
622- ) ?;
623- }
570+ unify_types ( graph_props_meta, & props, false ) ?;
624571
625572 let writer = self . graph_props . writer ( ) ;
626- let props = props. iter ( ) . map ( |( _, id, p) | ( * id, p. clone ( ) ) ) ;
573+ let props = props. into_iter ( ) . map ( |( _, id, p) | ( id, p) ) ;
627574
628575 writer. update_metadata ( props) ;
629576 writer. set_lsn ( lsn) ;
@@ -632,3 +579,16 @@ where
632579 Ok ( ( ) )
633580 }
634581}
582+
583+ fn unify_types ( meta : & Meta , props : & [ ( String , usize , Prop ) ] , temporal : bool ) -> Result < ( ) , StorageError > {
584+ let prop_mapper = if !temporal { meta. metadata_mapper ( ) } else { meta. temporal_prop_mapper ( ) } ;
585+ let mut write_locked_mapper = prop_mapper. write_locked ( ) ;
586+ for ( prop_name, prop_id, prop_value) in props {
587+ write_locked_mapper. set_or_unify_id_and_dtype (
588+ prop_name. as_ref ( ) ,
589+ * prop_id,
590+ prop_value. dtype ( ) ,
591+ ) ?;
592+ }
593+ Ok ( ( ) )
594+ }
0 commit comments