@@ -51,11 +51,24 @@ impl CostModelStorageLayer for BackendManager {
5151 }
5252
5353 /// TODO: documentation
54- async fn update_stats_from_catalog (
55- & self ,
56- c : CatalogSource ,
57- epoch_id : Self :: EpochId ,
58- ) -> StorageResult < ( ) > {
54+ async fn update_stats_from_catalog ( & self , c : CatalogSource ) -> StorageResult < Self :: EpochId > {
55+ let transaction = self . db . begin ( ) . await ?;
56+ let source = match c {
57+ CatalogSource :: Mock => "Mock" ,
58+ CatalogSource :: Iceberg ( ) => "Iceberg" ,
59+ } ;
60+ let new_event = event:: ActiveModel {
61+ source_variant : sea_orm:: ActiveValue :: Set ( source. to_string ( ) ) ,
62+ timestamp : sea_orm:: ActiveValue :: Set ( Utc :: now ( ) ) ,
63+ data : sea_orm:: ActiveValue :: Set ( sea_orm:: JsonValue :: String (
64+ "Update stats from catalog" . to_string ( ) ,
65+ ) ) ,
66+ ..Default :: default ( )
67+ } ;
68+ let epoch_id = Event :: insert ( new_event)
69+ . exec ( & transaction)
70+ . await ?
71+ . last_insert_id ;
5972 match c {
6073 CatalogSource :: Mock => {
6174 let mock_catalog = MockCatalog :: new ( ) ;
@@ -66,7 +79,7 @@ impl CostModelStorageLayer for BackendManager {
6679 ..Default :: default ( )
6780 }
6881 } ) )
69- . exec ( & self . db )
82+ . exec ( & transaction )
7083 . await ?;
7184 NamespaceMetadata :: insert_many ( mock_catalog. namespaces . iter ( ) . map ( |namespace| {
7285 namespace_metadata:: ActiveModel {
@@ -76,7 +89,7 @@ impl CostModelStorageLayer for BackendManager {
7689 ..Default :: default ( )
7790 }
7891 } ) )
79- . exec ( & self . db )
92+ . exec ( & transaction )
8093 . await ?;
8194 TableMetadata :: insert_many ( mock_catalog. tables . iter ( ) . map ( |table| {
8295 table_metadata:: ActiveModel {
@@ -86,7 +99,7 @@ impl CostModelStorageLayer for BackendManager {
8699 ..Default :: default ( )
87100 }
88101 } ) )
89- . exec ( & self . db )
102+ . exec ( & transaction )
90103 . await ?;
91104 Attribute :: insert_many ( mock_catalog. attributes . iter ( ) . map ( |attr| {
92105 attribute:: ActiveModel {
@@ -101,7 +114,7 @@ impl CostModelStorageLayer for BackendManager {
101114 ..Default :: default ( )
102115 }
103116 } ) )
104- . exec ( & self . db )
117+ . exec ( & transaction )
105118 . await ?;
106119 Statistic :: insert_many ( mock_catalog. statistics . iter ( ) . map ( |stat| {
107120 statistic:: ActiveModel {
@@ -116,7 +129,29 @@ impl CostModelStorageLayer for BackendManager {
116129 ..Default :: default ( )
117130 }
118131 } ) )
119- . exec ( & self . db )
132+ . exec ( & transaction)
133+ . await ?;
134+ VersionedStatistic :: insert_many ( mock_catalog. statistics . iter ( ) . map ( |stat| {
135+ versioned_statistic:: ActiveModel {
136+ epoch_id : sea_orm:: ActiveValue :: Set ( epoch_id) ,
137+ statistic_id : sea_orm:: ActiveValue :: Set ( stat. id ) ,
138+ statistic_value : sea_orm:: ActiveValue :: Set ( stat. stat_value . clone ( ) ) ,
139+ ..Default :: default ( )
140+ }
141+ } ) )
142+ . exec ( & transaction)
143+ . await ?;
144+ StatisticToAttributeJunction :: insert_many ( mock_catalog. statistics . iter ( ) . flat_map (
145+ |stat| {
146+ stat. attr_ids . iter ( ) . map ( move |attr_id| {
147+ statistic_to_attribute_junction:: ActiveModel {
148+ statistic_id : sea_orm:: ActiveValue :: Set ( stat. id ) ,
149+ attribute_id : sea_orm:: ActiveValue :: Set ( * attr_id) ,
150+ }
151+ } )
152+ } ,
153+ ) )
154+ . exec ( & transaction)
120155 . await ?;
121156 IndexMetadata :: insert_many (
122157 mock_catalog
@@ -140,12 +175,14 @@ impl CostModelStorageLayer for BackendManager {
140175 ..Default :: default ( )
141176 } ) ,
142177 )
143- . exec ( & self . db )
178+ . exec ( & transaction )
144179 . await ?;
145- Ok ( ( ) )
180+ // TODO: initialize constraints
146181 }
147182 CatalogSource :: Iceberg ( ) => todo ! ( ) ,
148183 }
184+ transaction. commit ( ) . await ?;
185+ Ok ( epoch_id)
149186 }
150187
151188 /// TODO: improve the documentation
@@ -551,15 +588,28 @@ mod tests {
551588 let mut binding = super :: BackendManager :: new ( Some ( & database_url) ) . await ;
552589 let backend_manager = binding. as_mut ( ) . unwrap ( ) ;
553590 let res = backend_manager
554- . update_stats_from_catalog ( super :: CatalogSource :: Mock , 1 )
591+ . update_stats_from_catalog ( super :: CatalogSource :: Mock )
555592 . await ;
556593 println ! ( "{:?}" , res) ;
557594 assert ! ( res. is_ok( ) ) ;
595+ let epoch_id = res. unwrap ( ) ;
596+ assert_eq ! ( epoch_id, 1 ) ;
558597
559598 let lookup_res = Statistic :: find ( ) . all ( & backend_manager. db ) . await . unwrap ( ) ;
560599 println ! ( "{:?}" , lookup_res) ;
561600 assert_eq ! ( lookup_res. len( ) , 3 ) ;
562601
602+ let stat_res = backend_manager
603+ . get_stats_for_table ( 1 , StatType :: TableRowCount as i32 , Some ( epoch_id) )
604+ . await ;
605+ assert ! ( stat_res. is_ok( ) ) ;
606+ assert_eq ! ( stat_res. unwrap( ) . unwrap( ) , json!( 300 ) ) ;
607+ let stat_res = backend_manager
608+ . get_stats_for_attr ( [ 2 ] . to_vec ( ) , StatType :: NotNullCount as i32 , None )
609+ . await ;
610+ assert ! ( stat_res. is_ok( ) ) ;
611+ assert_eq ! ( stat_res. unwrap( ) . unwrap( ) , json!( 200 ) ) ;
612+
563613 remove_db_file ( DATABASE_FILE ) ;
564614 }
565615
@@ -575,7 +625,7 @@ mod tests {
575625 . await
576626 . unwrap ( ) ;
577627 let stat = Stat {
578- stat_type : StatType :: Count as i32 ,
628+ stat_type : StatType :: NotNullCount as i32 ,
579629 stat_value : json ! ( 100 ) ,
580630 attr_ids : vec ! [ 1 ] ,
581631 table_id : None ,
@@ -594,7 +644,7 @@ mod tests {
594644 println ! ( "{:?}" , stat_res) ;
595645 assert_eq ! ( stat_res[ 0 ] . number_of_attributes, 1 ) ;
596646 assert_eq ! ( stat_res[ 0 ] . description, "1" . to_string( ) ) ;
597- assert_eq ! ( stat_res[ 0 ] . variant_tag, StatType :: Count as i32 ) ;
647+ assert_eq ! ( stat_res[ 0 ] . variant_tag, StatType :: NotNullCount as i32 ) ;
598648 let stat_attr_res = StatisticToAttributeJunction :: find ( )
599649 . filter ( statistic_to_attribute_junction:: Column :: StatisticId . eq ( stat_res[ 0 ] . id ) )
600650 . all ( & backend_manager. db )
@@ -647,7 +697,7 @@ mod tests {
647697 . await
648698 . unwrap ( ) ;
649699 let stat2 = Stat {
650- stat_type : StatType :: Count as i32 ,
700+ stat_type : StatType :: NotNullCount as i32 ,
651701 stat_value : json ! ( 200 ) ,
652702 attr_ids : vec ! [ 1 ] ,
653703 table_id : None ,
@@ -701,7 +751,7 @@ mod tests {
701751 // 3. Update existed stat with the same value
702752 let epoch_num = Event :: find ( ) . all ( & backend_manager. db ) . await . unwrap ( ) . len ( ) ;
703753 let stat3 = Stat {
704- stat_type : StatType :: Count as i32 ,
754+ stat_type : StatType :: NotNullCount as i32 ,
705755 stat_value : json ! ( 200 ) ,
706756 attr_ids : vec ! [ 1 ] ,
707757 table_id : None ,
@@ -742,21 +792,21 @@ mod tests {
742792
743793 let statistics: Vec < Stat > = vec ! [
744794 Stat {
745- stat_type: StatType :: Count as i32 ,
795+ stat_type: StatType :: TableRowCount as i32 ,
746796 stat_value: json!( 0 ) ,
747797 attr_ids: vec![ ] ,
748798 table_id: Some ( 1 ) ,
749799 name: "row_count" . to_string( ) ,
750800 } ,
751801 Stat {
752- stat_type: StatType :: Count as i32 ,
802+ stat_type: StatType :: TableRowCount as i32 ,
753803 stat_value: json!( 20 ) ,
754804 attr_ids: vec![ ] ,
755805 table_id: Some ( 1 ) ,
756806 name: "row_count" . to_string( ) ,
757807 } ,
758808 Stat {
759- stat_type: StatType :: Count as i32 ,
809+ stat_type: StatType :: TableRowCount as i32 ,
760810 stat_value: json!( 100 ) ,
761811 attr_ids: vec![ ] ,
762812 table_id: Some ( table_inserted_res. last_insert_id) ,
@@ -920,7 +970,7 @@ mod tests {
920970 let backend_manager = binding. as_mut ( ) . unwrap ( ) ;
921971 let epoch_id = 1 ;
922972 let table_id = 1 ;
923- let stat_type = StatType :: Count as i32 ;
973+ let stat_type = StatType :: TableRowCount as i32 ;
924974
925975 // Get initial stats
926976 let res = backend_manager
@@ -937,7 +987,7 @@ mod tests {
937987 . await
938988 . unwrap ( ) ;
939989 let stat = Stat {
940- stat_type : StatType :: Count as i32 ,
990+ stat_type : StatType :: TableRowCount as i32 ,
941991 stat_value : json ! ( 100 ) ,
942992 attr_ids : vec ! [ ] ,
943993 table_id : Some ( table_id) ,
0 commit comments