@@ -57,11 +57,25 @@ impl CostModelStorageLayer for BackendManager {
5757 Ok ( insert_res. last_insert_id )
5858 }
5959
60- async fn update_stats_from_catalog (
61- & self ,
62- c : CatalogSource ,
63- epoch_id : Self :: EpochId ,
64- ) -> StorageResult < ( ) > {
60+ /// TODO: documentation
61+ async fn update_stats_from_catalog ( & self , c : CatalogSource ) -> StorageResult < Self :: EpochId > {
62+ let transaction = self . db . begin ( ) . await ?;
63+ let source = match c {
64+ CatalogSource :: Mock => "Mock" ,
65+ CatalogSource :: Iceberg ( ) => "Iceberg" ,
66+ } ;
67+ let new_event = event:: ActiveModel {
68+ source_variant : sea_orm:: ActiveValue :: Set ( source. to_string ( ) ) ,
69+ timestamp : sea_orm:: ActiveValue :: Set ( Utc :: now ( ) ) ,
70+ data : sea_orm:: ActiveValue :: Set ( sea_orm:: JsonValue :: String (
71+ "Update stats from catalog" . to_string ( ) ,
72+ ) ) ,
73+ ..Default :: default ( )
74+ } ;
75+ let epoch_id = Event :: insert ( new_event)
76+ . exec ( & transaction)
77+ . await ?
78+ . last_insert_id ;
6579 match c {
6680 CatalogSource :: Mock => {
6781 let mock_catalog = MockCatalog :: new ( ) ;
@@ -72,7 +86,7 @@ impl CostModelStorageLayer for BackendManager {
7286 ..Default :: default ( )
7387 }
7488 } ) )
75- . exec ( & self . db )
89+ . exec ( & transaction )
7690 . await ?;
7791 NamespaceMetadata :: insert_many ( mock_catalog. namespaces . iter ( ) . map ( |namespace| {
7892 namespace_metadata:: ActiveModel {
@@ -82,7 +96,7 @@ impl CostModelStorageLayer for BackendManager {
8296 ..Default :: default ( )
8397 }
8498 } ) )
85- . exec ( & self . db )
99+ . exec ( & transaction )
86100 . await ?;
87101 TableMetadata :: insert_many ( mock_catalog. tables . iter ( ) . map ( |table| {
88102 table_metadata:: ActiveModel {
@@ -92,7 +106,7 @@ impl CostModelStorageLayer for BackendManager {
92106 ..Default :: default ( )
93107 }
94108 } ) )
95- . exec ( & self . db )
109+ . exec ( & transaction )
96110 . await ?;
97111 Attribute :: insert_many ( mock_catalog. attributes . iter ( ) . map ( |attr| {
98112 attribute:: ActiveModel {
@@ -107,7 +121,7 @@ impl CostModelStorageLayer for BackendManager {
107121 ..Default :: default ( )
108122 }
109123 } ) )
110- . exec ( & self . db )
124+ . exec ( & transaction )
111125 . await ?;
112126 Statistic :: insert_many ( mock_catalog. statistics . iter ( ) . map ( |stat| {
113127 statistic:: ActiveModel {
@@ -122,7 +136,29 @@ impl CostModelStorageLayer for BackendManager {
122136 ..Default :: default ( )
123137 }
124138 } ) )
125- . exec ( & self . db )
139+ . exec ( & transaction)
140+ . await ?;
141+ VersionedStatistic :: insert_many ( mock_catalog. statistics . iter ( ) . map ( |stat| {
142+ versioned_statistic:: ActiveModel {
143+ epoch_id : sea_orm:: ActiveValue :: Set ( epoch_id) ,
144+ statistic_id : sea_orm:: ActiveValue :: Set ( stat. id ) ,
145+ statistic_value : sea_orm:: ActiveValue :: Set ( stat. stat_value . clone ( ) ) ,
146+ ..Default :: default ( )
147+ }
148+ } ) )
149+ . exec ( & transaction)
150+ . await ?;
151+ StatisticToAttributeJunction :: insert_many ( mock_catalog. statistics . iter ( ) . flat_map (
152+ |stat| {
153+ stat. attr_ids . iter ( ) . map ( move |attr_id| {
154+ statistic_to_attribute_junction:: ActiveModel {
155+ statistic_id : sea_orm:: ActiveValue :: Set ( stat. id ) ,
156+ attribute_id : sea_orm:: ActiveValue :: Set ( * attr_id) ,
157+ }
158+ } )
159+ } ,
160+ ) )
161+ . exec ( & transaction)
126162 . await ?;
127163 IndexMetadata :: insert_many (
128164 mock_catalog
@@ -146,12 +182,13 @@ impl CostModelStorageLayer for BackendManager {
146182 ..Default :: default ( )
147183 } ) ,
148184 )
149- . exec ( & self . db )
185+ . exec ( & transaction )
150186 . await ?;
151- Ok ( ( ) )
152187 }
153188 CatalogSource :: Iceberg ( ) => todo ! ( ) ,
154189 }
190+ transaction. commit ( ) . await ?;
191+ Ok ( epoch_id)
155192 }
156193
157194 /* Update the statistics in the database.
@@ -481,6 +518,8 @@ impl CostModelStorageLayer for BackendManager {
481518
482519#[ cfg( test) ]
483520mod tests {
521+ use std:: os:: macos:: raw:: stat;
522+
484523 use crate :: cost_model:: interface:: { EpochOption , StatType } ;
485524 use crate :: { cost_model:: interface:: Stat , migrate, CostModelStorageLayer } ;
486525 use crate :: { get_sqlite_url, TEST_DATABASE_FILE } ;
@@ -562,15 +601,28 @@ mod tests {
562601 let mut binding = super :: BackendManager :: new ( Some ( & database_url) ) . await ;
563602 let backend_manager = binding. as_mut ( ) . unwrap ( ) ;
564603 let res = backend_manager
565- . update_stats_from_catalog ( super :: CatalogSource :: Mock , 1 )
604+ . update_stats_from_catalog ( super :: CatalogSource :: Mock )
566605 . await ;
567606 println ! ( "{:?}" , res) ;
568607 assert ! ( res. is_ok( ) ) ;
608+ let epoch_id = res. unwrap ( ) ;
609+ assert_eq ! ( epoch_id, 1 ) ;
569610
570611 let lookup_res = Statistic :: find ( ) . all ( & backend_manager. db ) . await . unwrap ( ) ;
571612 println ! ( "{:?}" , lookup_res) ;
572613 assert_eq ! ( lookup_res. len( ) , 3 ) ;
573614
615+ let stat_res = backend_manager
616+ . get_stats_for_table ( 1 , StatType :: Count as i32 , Some ( epoch_id) )
617+ . await ;
618+ assert ! ( stat_res. is_ok( ) ) ;
619+ assert_eq ! ( stat_res. unwrap( ) . unwrap( ) , json!( 300 ) ) ;
620+ let stat_res = backend_manager
621+ . get_stats_for_attr ( [ 2 ] . to_vec ( ) , StatType :: Count as i32 , None )
622+ . await ;
623+ assert ! ( stat_res. is_ok( ) ) ;
624+ assert_eq ! ( stat_res. unwrap( ) . unwrap( ) , json!( 200 ) ) ;
625+
574626 remove_db_file ( DATABASE_FILE ) ;
575627 }
576628
0 commit comments