@@ -51,11 +51,24 @@ impl CostModelStorageLayer for BackendManager {
5151 }
5252
5353 /// TODO: documentation
54- async fn update_stats_from_catalog (
55- & self ,
56- c : CatalogSource ,
57- epoch_id : Self :: EpochId ,
58- ) -> StorageResult < ( ) > {
54+ async fn update_stats_from_catalog ( & self , c : CatalogSource ) -> StorageResult < Self :: EpochId > {
55+ let transaction = self . db . begin ( ) . await ?;
56+ let source = match c {
57+ CatalogSource :: Mock => "Mock" ,
58+ CatalogSource :: Iceberg ( ) => "Iceberg" ,
59+ } ;
60+ let new_event = event:: ActiveModel {
61+ source_variant : sea_orm:: ActiveValue :: Set ( source. to_string ( ) ) ,
62+ timestamp : sea_orm:: ActiveValue :: Set ( Utc :: now ( ) ) ,
63+ data : sea_orm:: ActiveValue :: Set ( sea_orm:: JsonValue :: String (
64+ "Update stats from catalog" . to_string ( ) ,
65+ ) ) ,
66+ ..Default :: default ( )
67+ } ;
68+ let epoch_id = Event :: insert ( new_event)
69+ . exec ( & transaction)
70+ . await ?
71+ . last_insert_id ;
5972 match c {
6073 CatalogSource :: Mock => {
6174 let mock_catalog = MockCatalog :: new ( ) ;
@@ -66,7 +79,7 @@ impl CostModelStorageLayer for BackendManager {
6679 ..Default :: default ( )
6780 }
6881 } ) )
69- . exec ( & self . db )
82+ . exec ( & transaction )
7083 . await ?;
7184 NamespaceMetadata :: insert_many ( mock_catalog. namespaces . iter ( ) . map ( |namespace| {
7285 namespace_metadata:: ActiveModel {
@@ -76,7 +89,7 @@ impl CostModelStorageLayer for BackendManager {
7689 ..Default :: default ( )
7790 }
7891 } ) )
79- . exec ( & self . db )
92+ . exec ( & transaction )
8093 . await ?;
8194 TableMetadata :: insert_many ( mock_catalog. tables . iter ( ) . map ( |table| {
8295 table_metadata:: ActiveModel {
@@ -86,7 +99,7 @@ impl CostModelStorageLayer for BackendManager {
8699 ..Default :: default ( )
87100 }
88101 } ) )
89- . exec ( & self . db )
102+ . exec ( & transaction )
90103 . await ?;
91104 Attribute :: insert_many ( mock_catalog. attributes . iter ( ) . map ( |attr| {
92105 attribute:: ActiveModel {
@@ -101,7 +114,7 @@ impl CostModelStorageLayer for BackendManager {
101114 ..Default :: default ( )
102115 }
103116 } ) )
104- . exec ( & self . db )
117+ . exec ( & transaction )
105118 . await ?;
106119 Statistic :: insert_many ( mock_catalog. statistics . iter ( ) . map ( |stat| {
107120 statistic:: ActiveModel {
@@ -116,7 +129,29 @@ impl CostModelStorageLayer for BackendManager {
116129 ..Default :: default ( )
117130 }
118131 } ) )
119- . exec ( & self . db )
132+ . exec ( & transaction)
133+ . await ?;
134+ VersionedStatistic :: insert_many ( mock_catalog. statistics . iter ( ) . map ( |stat| {
135+ versioned_statistic:: ActiveModel {
136+ epoch_id : sea_orm:: ActiveValue :: Set ( epoch_id) ,
137+ statistic_id : sea_orm:: ActiveValue :: Set ( stat. id ) ,
138+ statistic_value : sea_orm:: ActiveValue :: Set ( stat. stat_value . clone ( ) ) ,
139+ ..Default :: default ( )
140+ }
141+ } ) )
142+ . exec ( & transaction)
143+ . await ?;
144+ StatisticToAttributeJunction :: insert_many ( mock_catalog. statistics . iter ( ) . flat_map (
145+ |stat| {
146+ stat. attr_ids . iter ( ) . map ( move |attr_id| {
147+ statistic_to_attribute_junction:: ActiveModel {
148+ statistic_id : sea_orm:: ActiveValue :: Set ( stat. id ) ,
149+ attribute_id : sea_orm:: ActiveValue :: Set ( * attr_id) ,
150+ }
151+ } )
152+ } ,
153+ ) )
154+ . exec ( & transaction)
120155 . await ?;
121156 IndexMetadata :: insert_many (
122157 mock_catalog
@@ -140,12 +175,13 @@ impl CostModelStorageLayer for BackendManager {
140175 ..Default :: default ( )
141176 } ) ,
142177 )
143- . exec ( & self . db )
178+ . exec ( & transaction )
144179 . await ?;
145- Ok ( ( ) )
146180 }
147181 CatalogSource :: Iceberg ( ) => todo ! ( ) ,
148182 }
183+ transaction. commit ( ) . await ?;
184+ Ok ( epoch_id)
149185 }
150186
151187 /// TODO: improve the documentation
@@ -551,15 +587,28 @@ mod tests {
551587 let mut binding = super :: BackendManager :: new ( Some ( & database_url) ) . await ;
552588 let backend_manager = binding. as_mut ( ) . unwrap ( ) ;
553589 let res = backend_manager
554- . update_stats_from_catalog ( super :: CatalogSource :: Mock , 1 )
590+ . update_stats_from_catalog ( super :: CatalogSource :: Mock )
555591 . await ;
556592 println ! ( "{:?}" , res) ;
557593 assert ! ( res. is_ok( ) ) ;
594+ let epoch_id = res. unwrap ( ) ;
595+ assert_eq ! ( epoch_id, 1 ) ;
558596
559597 let lookup_res = Statistic :: find ( ) . all ( & backend_manager. db ) . await . unwrap ( ) ;
560598 println ! ( "{:?}" , lookup_res) ;
561599 assert_eq ! ( lookup_res. len( ) , 3 ) ;
562600
601+ let stat_res = backend_manager
602+ . get_stats_for_table ( 1 , StatType :: Count as i32 , Some ( epoch_id) )
603+ . await ;
604+ assert ! ( stat_res. is_ok( ) ) ;
605+ assert_eq ! ( stat_res. unwrap( ) . unwrap( ) , json!( 300 ) ) ;
606+ let stat_res = backend_manager
607+ . get_stats_for_attr ( [ 2 ] . to_vec ( ) , StatType :: Count as i32 , None )
608+ . await ;
609+ assert ! ( stat_res. is_ok( ) ) ;
610+ assert_eq ! ( stat_res. unwrap( ) . unwrap( ) , json!( 200 ) ) ;
611+
563612 remove_db_file ( DATABASE_FILE ) ;
564613 }
565614
0 commit comments