@@ -57,11 +57,25 @@ impl CostModelStorageLayer for BackendManager {
5757 Ok ( insert_res. last_insert_id )
5858 }
5959
60- async fn update_stats_from_catalog (
61- & self ,
62- c : CatalogSource ,
63- epoch_id : Self :: EpochId ,
64- ) -> StorageResult < ( ) > {
60+ /// TODO: documentation
61+ async fn update_stats_from_catalog ( & self , c : CatalogSource ) -> StorageResult < Self :: EpochId > {
62+ let transaction = self . db . begin ( ) . await ?;
63+ let source = match c {
64+ CatalogSource :: Mock => "Mock" ,
65+ CatalogSource :: Iceberg ( ) => "Iceberg" ,
66+ } ;
67+ let new_event = event:: ActiveModel {
68+ source_variant : sea_orm:: ActiveValue :: Set ( source. to_string ( ) ) ,
69+ timestamp : sea_orm:: ActiveValue :: Set ( Utc :: now ( ) ) ,
70+ data : sea_orm:: ActiveValue :: Set ( sea_orm:: JsonValue :: String (
71+ "Update stats from catalog" . to_string ( ) ,
72+ ) ) ,
73+ ..Default :: default ( )
74+ } ;
75+ let epoch_id = Event :: insert ( new_event)
76+ . exec ( & transaction)
77+ . await ?
78+ . last_insert_id ;
6579 match c {
6680 CatalogSource :: Mock => {
6781 let mock_catalog = MockCatalog :: new ( ) ;
@@ -72,7 +86,7 @@ impl CostModelStorageLayer for BackendManager {
7286 ..Default :: default ( )
7387 }
7488 } ) )
75- . exec ( & self . db )
89+ . exec ( & transaction )
7690 . await ?;
7791 NamespaceMetadata :: insert_many ( mock_catalog. namespaces . iter ( ) . map ( |namespace| {
7892 namespace_metadata:: ActiveModel {
@@ -82,7 +96,7 @@ impl CostModelStorageLayer for BackendManager {
8296 ..Default :: default ( )
8397 }
8498 } ) )
85- . exec ( & self . db )
99+ . exec ( & transaction )
86100 . await ?;
87101 TableMetadata :: insert_many ( mock_catalog. tables . iter ( ) . map ( |table| {
88102 table_metadata:: ActiveModel {
@@ -92,7 +106,7 @@ impl CostModelStorageLayer for BackendManager {
92106 ..Default :: default ( )
93107 }
94108 } ) )
95- . exec ( & self . db )
109+ . exec ( & transaction )
96110 . await ?;
97111 Attribute :: insert_many ( mock_catalog. attributes . iter ( ) . map ( |attr| {
98112 attribute:: ActiveModel {
@@ -107,7 +121,7 @@ impl CostModelStorageLayer for BackendManager {
107121 ..Default :: default ( )
108122 }
109123 } ) )
110- . exec ( & self . db )
124+ . exec ( & transaction )
111125 . await ?;
112126 Statistic :: insert_many ( mock_catalog. statistics . iter ( ) . map ( |stat| {
113127 statistic:: ActiveModel {
@@ -122,7 +136,29 @@ impl CostModelStorageLayer for BackendManager {
122136 ..Default :: default ( )
123137 }
124138 } ) )
125- . exec ( & self . db )
139+ . exec ( & transaction)
140+ . await ?;
141+ VersionedStatistic :: insert_many ( mock_catalog. statistics . iter ( ) . map ( |stat| {
142+ versioned_statistic:: ActiveModel {
143+ epoch_id : sea_orm:: ActiveValue :: Set ( epoch_id) ,
144+ statistic_id : sea_orm:: ActiveValue :: Set ( stat. id ) ,
145+ statistic_value : sea_orm:: ActiveValue :: Set ( stat. stat_value . clone ( ) ) ,
146+ ..Default :: default ( )
147+ }
148+ } ) )
149+ . exec ( & transaction)
150+ . await ?;
151+ StatisticToAttributeJunction :: insert_many ( mock_catalog. statistics . iter ( ) . flat_map (
152+ |stat| {
153+ stat. attr_ids . iter ( ) . map ( move |attr_id| {
154+ statistic_to_attribute_junction:: ActiveModel {
155+ statistic_id : sea_orm:: ActiveValue :: Set ( stat. id ) ,
156+ attribute_id : sea_orm:: ActiveValue :: Set ( * attr_id) ,
157+ }
158+ } )
159+ } ,
160+ ) )
161+ . exec ( & transaction)
126162 . await ?;
127163 IndexMetadata :: insert_many (
128164 mock_catalog
@@ -146,12 +182,13 @@ impl CostModelStorageLayer for BackendManager {
146182 ..Default :: default ( )
147183 } ) ,
148184 )
149- . exec ( & self . db )
185+ . exec ( & transaction )
150186 . await ?;
151- Ok ( ( ) )
152187 }
153188 CatalogSource :: Iceberg ( ) => todo ! ( ) ,
154189 }
190+ transaction. commit ( ) . await ?;
191+ Ok ( epoch_id)
155192 }
156193
157194 /* Update the statistics in the database.
@@ -562,15 +599,28 @@ mod tests {
562599 let mut binding = super :: BackendManager :: new ( Some ( & database_url) ) . await ;
563600 let backend_manager = binding. as_mut ( ) . unwrap ( ) ;
564601 let res = backend_manager
565- . update_stats_from_catalog ( super :: CatalogSource :: Mock , 1 )
602+ . update_stats_from_catalog ( super :: CatalogSource :: Mock )
566603 . await ;
567604 println ! ( "{:?}" , res) ;
568605 assert ! ( res. is_ok( ) ) ;
606+ let epoch_id = res. unwrap ( ) ;
607+ assert_eq ! ( epoch_id, 1 ) ;
569608
570609 let lookup_res = Statistic :: find ( ) . all ( & backend_manager. db ) . await . unwrap ( ) ;
571610 println ! ( "{:?}" , lookup_res) ;
572611 assert_eq ! ( lookup_res. len( ) , 3 ) ;
573612
613+ let stat_res = backend_manager
614+ . get_stats_for_table ( 1 , StatType :: Count as i32 , Some ( epoch_id) )
615+ . await ;
616+ assert ! ( stat_res. is_ok( ) ) ;
617+ assert_eq ! ( stat_res. unwrap( ) . unwrap( ) , json!( 300 ) ) ;
618+ let stat_res = backend_manager
619+ . get_stats_for_attr ( [ 2 ] . to_vec ( ) , StatType :: Count as i32 , None )
620+ . await ;
621+ assert ! ( stat_res. is_ok( ) ) ;
622+ assert_eq ! ( stat_res. unwrap( ) . unwrap( ) , json!( 200 ) ) ;
623+
574624 remove_db_file ( DATABASE_FILE ) ;
575625 }
576626
0 commit comments