From 0a5b988a6cc60345676152e369a5620da99f73b5 Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Sun, 3 Nov 2024 11:40:06 -0500 Subject: [PATCH 01/14] First version of ORM trait --- optd-persistent/src/entities/prelude.rs | 1 + optd-persistent/src/lib.rs | 3 + optd-persistent/src/memo_table.rs | 123 ++++++++++++++++++++++++ optd-persistent/src/orm_manager.rs | 93 ++++++++++++++++++ 4 files changed, 220 insertions(+) create mode 100644 optd-persistent/src/memo_table.rs create mode 100644 optd-persistent/src/orm_manager.rs diff --git a/optd-persistent/src/entities/prelude.rs b/optd-persistent/src/entities/prelude.rs index ca8ca48..5ee9d0e 100644 --- a/optd-persistent/src/entities/prelude.rs +++ b/optd-persistent/src/entities/prelude.rs @@ -1,4 +1,5 @@ //! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0 +#![allow(dead_code, unused_imports, unused_variables)] pub use super::attribute_stat::Entity as AttributeStat; pub use super::attribute_stats_junction::Entity as AttributeStatsJunction; diff --git a/optd-persistent/src/lib.rs b/optd-persistent/src/lib.rs index 74538b5..d97ac8a 100644 --- a/optd-persistent/src/lib.rs +++ b/optd-persistent/src/lib.rs @@ -2,6 +2,9 @@ use sea_orm::*; use sea_orm_migration::prelude::*; mod migrator; +mod orm_manager; +mod memo_table; +mod entities; use migrator::Migrator; pub const DATABASE_URL: &str = "sqlite:./sqlite.db?mode=rwc"; diff --git a/optd-persistent/src/memo_table.rs b/optd-persistent/src/memo_table.rs new file mode 100644 index 0000000..94e19c0 --- /dev/null +++ b/optd-persistent/src/memo_table.rs @@ -0,0 +1,123 @@ +#![allow(dead_code, unused_imports)] + +use std::sync::Arc; +use crate::entities::cascades_group; +use crate::entities::event::Model as event_model; +use crate::entities::physical_expression; +use crate::entities::logical_expression; +use sea_orm::*; +use sea_orm_migration::prelude::*; +use serde_json::json; + +pub type GroupId = i32; +pub type ExprId = i32; +pub type EpochId = i32; + +pub enum CatalogSource { + Iceberg(), +} + +pub enum Expression { + LogicalExpression(logical_expression::Model), + PhysicalExpression(physical_expression::Model) +} + +// TODO +// A dummy WinnerInfo struct +// pub struct WinnerInfo { +// pub expr_id: ExprId, +// pub total_weighted_cost: f64, +// pub operation_weighted_cost: f64, +// pub total_cost: Cost, +// pub operation_cost: Cost, +// pub statistics: Arc, +// } +// The optd WinnerInfo struct makes everything too coupled. +pub struct WinnerInfo { +} + +pub trait MemoTable { + // TODO: Change EpochId to event::Model::epoch_id + async fn create_new_epoch(&self) -> EpochId; + async fn update_stats_from_catalog(&self, c:CatalogSource, epoch_id:EpochId) -> Result<(), ()>; + // i32 in `stats:i32` is a placeholder for the stats type + async fn update_stats(&self, stats:i32, epoch_id:EpochId) -> Result<(), ()>; + async fn store_cost(&self, expr_id:ExprId, cost:i32, epoch_id:EpochId) -> Result<(), ()>; + // table_id, attr_id OR expr_id and return a vector? + async fn get_stats_analysis(&self, table_id:i32, attr_id:Option, epoch_id:EpochId) -> Option; + async fn get_stats(&self, table_id:i32, attr_id:Option) -> Option; + async fn get_cost_analysis(&self, expr_id:ExprId, epoch_id:EpochId) -> Option; + async fn get_cost(&self, expr_id:ExprId) -> Option; + + async fn get_group_winner_from_group_id(&self, group_id:i32) -> Option; + + /// Add an expression to the memo table. If the expression already exists, it will return the existing group id and + /// expr id. Otherwise, a new group and expr will be created. + async fn add_new_expr(&mut self, expr:Expression) -> (GroupId, ExprId); + + /// Add a new expression to an existing group. If the expression is a group, it will merge the two groups. Otherwise, + /// it will add the expression to the group. Returns the expr id if the expression is not a group. + async fn add_expr_to_group(&mut self, expr: Expression, group_id: GroupId) -> Option; + + /// Get the group id of an expression. + /// The group id is volatile, depending on whether the groups are merged. + async fn get_group_id(&self, expr_id: ExprId) -> GroupId; + + /// Get the memoized representation of a node. + async fn get_expr_memoed(&self, expr_id: ExprId) -> Expression; + + /// Get all groups IDs in the memo table. + async fn get_all_group_ids(&self) -> Vec; + + /// Get a group by ID + async fn get_group(&self, group_id: GroupId) -> cascades_group::ActiveModel; + + /// Update the group winner. + async fn update_group_winner(&mut self, group_id: GroupId, latest_winner:Option); + + // The below functions can be overwritten by the memo table implementation if there + // are more efficient way to retrieve the information. + + /// Get all expressions in the group. + async fn get_all_exprs_in_group(&self, group_id: GroupId) -> Vec; + + /// Get winner info for a group id + async fn get_group_info(&self, group_id: GroupId) -> &Option; + + // TODO: + /// Get the best group binding based on the cost + // fn get_best_group_binding( + // &self, + // group_id: GroupId, + // mut post_process: impl FnMut(Arc, GroupId, &WinnerInfo), + // ) -> Result; + // { + // // let info: &GroupInfo = this.get_group_info(group_id); + // // if let Winner::Full(info @ WinnerInfo { expr_id, .. }) = &info.winner { + // // let expr = this.get_expr_memoed(*expr_id); + // // let mut children = Vec::with_capacity(expr.children.len()); + // // for child in &expr.children { + // // children.push( + // // get_best_group_binding_inner(this, *child, post_process) + // // .with_context(|| format!("when processing expr {}", expr_id))?, + // // ); + // // } + // // let node = Arc::new(RelNode { + // // typ: expr.typ.clone(), + // // children, + // // data: expr.data.clone(), + // // }); + // // post_process(node.clone(), group_id, info); + // // return Ok(node); + // // } + // // bail!("no best group binding for group {}", group_id) + // }; + + + /// Get all bindings of a predicate group. Will panic if the group contains more than one bindings. + async fn get_predicate_binding(&self, group_id: GroupId) -> Option; + + /// Get all bindings of a predicate group. Returns None if the group contains zero or more than one bindings. + async fn try_get_predicate_binding(&self, group_id: GroupId) -> Option; + +} diff --git a/optd-persistent/src/orm_manager.rs b/optd-persistent/src/orm_manager.rs new file mode 100644 index 0000000..a3fcf48 --- /dev/null +++ b/optd-persistent/src/orm_manager.rs @@ -0,0 +1,93 @@ +#![allow(dead_code, unused_imports, unused_variables)] + +use sea_orm::DatabaseConnection; +use crate::memo_table::{self, EpochId, MemoTable}; +use crate::entities::physical_expression; + +pub struct ORMManager { + db_conn: DatabaseConnection, + // TODO: Change EpochId to event::Model::epoch_id + latest_epoch_id: EpochId, +} + +impl MemoTable for ORMManager { + async fn create_new_epoch(&self) -> memo_table::EpochId { + todo!() + } + + async fn update_stats_from_catalog(&self, c:memo_table::CatalogSource, epoch_id:memo_table::EpochId) -> Result<(), ()> { + todo!() + } + + async fn update_stats(&self, stats:i32, epoch_id:memo_table::EpochId) -> Result<(), ()> { + todo!() + } + + async fn store_cost(&self, expr_id:memo_table::ExprId, cost:i32, epoch_id:memo_table::EpochId) -> Result<(), ()> { + todo!() + } + + async fn get_stats_analysis(&self, table_id:i32, attr_id:Option, epoch_id:memo_table::EpochId) -> Option { + todo!() + } + + async fn get_stats(&self, table_id:i32, attr_id:Option) -> Option { + todo!() + } + + async fn get_cost_analysis(&self, expr_id:memo_table::ExprId, epoch_id:memo_table::EpochId) -> Option { + todo!() + } + + async fn get_cost(&self, expr_id:memo_table::ExprId) -> Option { + todo!() + } + + async fn get_group_winner_from_group_id(&self, group_id:i32) -> Option { + todo!() + } + + async fn add_new_expr(&mut self, expr:memo_table::Expression) -> (memo_table::GroupId, memo_table::ExprId) { + todo!() + } + + async fn add_expr_to_group(&mut self, expr: memo_table::Expression, group_id: memo_table::GroupId) -> Option { + todo!() + } + + async fn get_group_id(&self, expr_id: memo_table::ExprId) -> memo_table::GroupId { + todo!() + } + + async fn get_expr_memoed(&self, expr_id: memo_table::ExprId) -> memo_table::Expression { + todo!() + } + + async fn get_all_group_ids(&self) -> Vec { + todo!() + } + + async fn get_group(&self, group_id: memo_table::GroupId) -> crate::entities::cascades_group::ActiveModel { + todo!() + } + + async fn update_group_winner(&mut self, group_id: memo_table::GroupId, latest_winner:Option) { + todo!() + } + + async fn get_all_exprs_in_group(&self, group_id: memo_table::GroupId) -> Vec { + todo!() + } + + async fn get_group_info(&self, group_id: memo_table::GroupId) -> &Option { + todo!() + } + + async fn get_predicate_binding(&self, group_id: memo_table::GroupId) -> Option { + todo!() + } + + async fn try_get_predicate_binding(&self, group_id: memo_table::GroupId) -> Option { + todo!() + } +} \ No newline at end of file From 29f22c5f33f64a75fb6851c66d0bdf5e29c634bc Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Sun, 3 Nov 2024 20:32:06 -0500 Subject: [PATCH 02/14] Rename MemoTable to StorageLayer --- optd-persistent/src/lib.rs | 2 +- optd-persistent/src/orm_manager.rs | 40 +++++++++---------- .../src/{memo_table.rs => storage_layer.rs} | 2 +- 3 files changed, 22 insertions(+), 22 deletions(-) rename optd-persistent/src/{memo_table.rs => storage_layer.rs} (99%) diff --git a/optd-persistent/src/lib.rs b/optd-persistent/src/lib.rs index d97ac8a..fb8a678 100644 --- a/optd-persistent/src/lib.rs +++ b/optd-persistent/src/lib.rs @@ -3,7 +3,7 @@ use sea_orm_migration::prelude::*; mod migrator; mod orm_manager; -mod memo_table; +mod storage_layer; mod entities; use migrator::Migrator; diff --git a/optd-persistent/src/orm_manager.rs b/optd-persistent/src/orm_manager.rs index a3fcf48..1b1e56d 100644 --- a/optd-persistent/src/orm_manager.rs +++ b/optd-persistent/src/orm_manager.rs @@ -1,7 +1,7 @@ #![allow(dead_code, unused_imports, unused_variables)] use sea_orm::DatabaseConnection; -use crate::memo_table::{self, EpochId, MemoTable}; +use crate::storage_layer::{self, EpochId, StorageLayer}; use crate::entities::physical_expression; pub struct ORMManager { @@ -10,24 +10,24 @@ pub struct ORMManager { latest_epoch_id: EpochId, } -impl MemoTable for ORMManager { - async fn create_new_epoch(&self) -> memo_table::EpochId { +impl StorageLayer for ORMManager { + async fn create_new_epoch(&self) -> storage_layer::EpochId { todo!() } - async fn update_stats_from_catalog(&self, c:memo_table::CatalogSource, epoch_id:memo_table::EpochId) -> Result<(), ()> { + async fn update_stats_from_catalog(&self, c:storage_layer::CatalogSource, epoch_id:storage_layer::EpochId) -> Result<(), ()> { todo!() } - async fn update_stats(&self, stats:i32, epoch_id:memo_table::EpochId) -> Result<(), ()> { + async fn update_stats(&self, stats:i32, epoch_id:storage_layer::EpochId) -> Result<(), ()> { todo!() } - async fn store_cost(&self, expr_id:memo_table::ExprId, cost:i32, epoch_id:memo_table::EpochId) -> Result<(), ()> { + async fn store_cost(&self, expr_id:storage_layer::ExprId, cost:i32, epoch_id:storage_layer::EpochId) -> Result<(), ()> { todo!() } - async fn get_stats_analysis(&self, table_id:i32, attr_id:Option, epoch_id:memo_table::EpochId) -> Option { + async fn get_stats_analysis(&self, table_id:i32, attr_id:Option, epoch_id:storage_layer::EpochId) -> Option { todo!() } @@ -35,11 +35,11 @@ impl MemoTable for ORMManager { todo!() } - async fn get_cost_analysis(&self, expr_id:memo_table::ExprId, epoch_id:memo_table::EpochId) -> Option { + async fn get_cost_analysis(&self, expr_id:storage_layer::ExprId, epoch_id:storage_layer::EpochId) -> Option { todo!() } - async fn get_cost(&self, expr_id:memo_table::ExprId) -> Option { + async fn get_cost(&self, expr_id:storage_layer::ExprId) -> Option { todo!() } @@ -47,47 +47,47 @@ impl MemoTable for ORMManager { todo!() } - async fn add_new_expr(&mut self, expr:memo_table::Expression) -> (memo_table::GroupId, memo_table::ExprId) { + async fn add_new_expr(&mut self, expr:storage_layer::Expression) -> (storage_layer::GroupId, storage_layer::ExprId) { todo!() } - async fn add_expr_to_group(&mut self, expr: memo_table::Expression, group_id: memo_table::GroupId) -> Option { + async fn add_expr_to_group(&mut self, expr: storage_layer::Expression, group_id: storage_layer::GroupId) -> Option { todo!() } - async fn get_group_id(&self, expr_id: memo_table::ExprId) -> memo_table::GroupId { + async fn get_group_id(&self, expr_id: storage_layer::ExprId) -> storage_layer::GroupId { todo!() } - async fn get_expr_memoed(&self, expr_id: memo_table::ExprId) -> memo_table::Expression { + async fn get_expr_memoed(&self, expr_id: storage_layer::ExprId) -> storage_layer::Expression { todo!() } - async fn get_all_group_ids(&self) -> Vec { + async fn get_all_group_ids(&self) -> Vec { todo!() } - async fn get_group(&self, group_id: memo_table::GroupId) -> crate::entities::cascades_group::ActiveModel { + async fn get_group(&self, group_id: storage_layer::GroupId) -> crate::entities::cascades_group::ActiveModel { todo!() } - async fn update_group_winner(&mut self, group_id: memo_table::GroupId, latest_winner:Option) { + async fn update_group_winner(&mut self, group_id: storage_layer::GroupId, latest_winner:Option) { todo!() } - async fn get_all_exprs_in_group(&self, group_id: memo_table::GroupId) -> Vec { + async fn get_all_exprs_in_group(&self, group_id: storage_layer::GroupId) -> Vec { todo!() } - async fn get_group_info(&self, group_id: memo_table::GroupId) -> &Option { + async fn get_group_info(&self, group_id: storage_layer::GroupId) -> &Option { todo!() } - async fn get_predicate_binding(&self, group_id: memo_table::GroupId) -> Option { + async fn get_predicate_binding(&self, group_id: storage_layer::GroupId) -> Option { todo!() } - async fn try_get_predicate_binding(&self, group_id: memo_table::GroupId) -> Option { + async fn try_get_predicate_binding(&self, group_id: storage_layer::GroupId) -> Option { todo!() } } \ No newline at end of file diff --git a/optd-persistent/src/memo_table.rs b/optd-persistent/src/storage_layer.rs similarity index 99% rename from optd-persistent/src/memo_table.rs rename to optd-persistent/src/storage_layer.rs index 94e19c0..910312f 100644 --- a/optd-persistent/src/memo_table.rs +++ b/optd-persistent/src/storage_layer.rs @@ -36,7 +36,7 @@ pub enum Expression { pub struct WinnerInfo { } -pub trait MemoTable { +pub trait StorageLayer { // TODO: Change EpochId to event::Model::epoch_id async fn create_new_epoch(&self) -> EpochId; async fn update_stats_from_catalog(&self, c:CatalogSource, epoch_id:EpochId) -> Result<(), ()>; From c86ab14c0d138281517bafd295f70681b2d6fb5d Mon Sep 17 00:00:00 2001 From: unw9527 <1041593558@qq.com> Date: Mon, 4 Nov 2024 14:59:46 -0500 Subject: [PATCH 03/14] fix: rust fmt --- optd-persistent/src/lib.rs | 2 +- optd-persistent/src/orm_manager.rs | 85 +++++++++++++++++++++------- optd-persistent/src/storage_layer.rs | 51 ++++++++++------- 3 files changed, 97 insertions(+), 41 deletions(-) diff --git a/optd-persistent/src/lib.rs b/optd-persistent/src/lib.rs index fb8a678..08df138 100644 --- a/optd-persistent/src/lib.rs +++ b/optd-persistent/src/lib.rs @@ -1,10 +1,10 @@ use sea_orm::*; use sea_orm_migration::prelude::*; +mod entities; mod migrator; mod orm_manager; mod storage_layer; -mod entities; use migrator::Migrator; pub const DATABASE_URL: &str = "sqlite:./sqlite.db?mode=rwc"; diff --git a/optd-persistent/src/orm_manager.rs b/optd-persistent/src/orm_manager.rs index 1b1e56d..498891f 100644 --- a/optd-persistent/src/orm_manager.rs +++ b/optd-persistent/src/orm_manager.rs @@ -1,8 +1,8 @@ #![allow(dead_code, unused_imports, unused_variables)] -use sea_orm::DatabaseConnection; -use crate::storage_layer::{self, EpochId, StorageLayer}; use crate::entities::physical_expression; +use crate::storage_layer::{self, EpochId, StorageLayer}; +use sea_orm::DatabaseConnection; pub struct ORMManager { db_conn: DatabaseConnection, @@ -15,43 +15,71 @@ impl StorageLayer for ORMManager { todo!() } - async fn update_stats_from_catalog(&self, c:storage_layer::CatalogSource, epoch_id:storage_layer::EpochId) -> Result<(), ()> { + async fn update_stats_from_catalog( + &self, + c: storage_layer::CatalogSource, + epoch_id: storage_layer::EpochId, + ) -> Result<(), ()> { todo!() } - async fn update_stats(&self, stats:i32, epoch_id:storage_layer::EpochId) -> Result<(), ()> { + async fn update_stats(&self, stats: i32, epoch_id: storage_layer::EpochId) -> Result<(), ()> { todo!() } - async fn store_cost(&self, expr_id:storage_layer::ExprId, cost:i32, epoch_id:storage_layer::EpochId) -> Result<(), ()> { + async fn store_cost( + &self, + expr_id: storage_layer::ExprId, + cost: i32, + epoch_id: storage_layer::EpochId, + ) -> Result<(), ()> { todo!() } - async fn get_stats_analysis(&self, table_id:i32, attr_id:Option, epoch_id:storage_layer::EpochId) -> Option { + async fn get_stats_analysis( + &self, + table_id: i32, + attr_id: Option, + epoch_id: storage_layer::EpochId, + ) -> Option { todo!() } - async fn get_stats(&self, table_id:i32, attr_id:Option) -> Option { + async fn get_stats(&self, table_id: i32, attr_id: Option) -> Option { todo!() } - async fn get_cost_analysis(&self, expr_id:storage_layer::ExprId, epoch_id:storage_layer::EpochId) -> Option { + async fn get_cost_analysis( + &self, + expr_id: storage_layer::ExprId, + epoch_id: storage_layer::EpochId, + ) -> Option { todo!() } - async fn get_cost(&self, expr_id:storage_layer::ExprId) -> Option { + async fn get_cost(&self, expr_id: storage_layer::ExprId) -> Option { todo!() } - async fn get_group_winner_from_group_id(&self, group_id:i32) -> Option { + async fn get_group_winner_from_group_id( + &self, + group_id: i32, + ) -> Option { todo!() } - async fn add_new_expr(&mut self, expr:storage_layer::Expression) -> (storage_layer::GroupId, storage_layer::ExprId) { + async fn add_new_expr( + &mut self, + expr: storage_layer::Expression, + ) -> (storage_layer::GroupId, storage_layer::ExprId) { todo!() } - async fn add_expr_to_group(&mut self, expr: storage_layer::Expression, group_id: storage_layer::GroupId) -> Option { + async fn add_expr_to_group( + &mut self, + expr: storage_layer::Expression, + group_id: storage_layer::GroupId, + ) -> Option { todo!() } @@ -67,27 +95,46 @@ impl StorageLayer for ORMManager { todo!() } - async fn get_group(&self, group_id: storage_layer::GroupId) -> crate::entities::cascades_group::ActiveModel { + async fn get_group( + &self, + group_id: storage_layer::GroupId, + ) -> crate::entities::cascades_group::ActiveModel { todo!() } - async fn update_group_winner(&mut self, group_id: storage_layer::GroupId, latest_winner:Option) { + async fn update_group_winner( + &mut self, + group_id: storage_layer::GroupId, + latest_winner: Option, + ) { todo!() } - async fn get_all_exprs_in_group(&self, group_id: storage_layer::GroupId) -> Vec { + async fn get_all_exprs_in_group( + &self, + group_id: storage_layer::GroupId, + ) -> Vec { todo!() } - async fn get_group_info(&self, group_id: storage_layer::GroupId) -> &Option { + async fn get_group_info( + &self, + group_id: storage_layer::GroupId, + ) -> &Option { todo!() } - async fn get_predicate_binding(&self, group_id: storage_layer::GroupId) -> Option { + async fn get_predicate_binding( + &self, + group_id: storage_layer::GroupId, + ) -> Option { todo!() } - async fn try_get_predicate_binding(&self, group_id: storage_layer::GroupId) -> Option { + async fn try_get_predicate_binding( + &self, + group_id: storage_layer::GroupId, + ) -> Option { todo!() } -} \ No newline at end of file +} diff --git a/optd-persistent/src/storage_layer.rs b/optd-persistent/src/storage_layer.rs index 910312f..e4578ca 100644 --- a/optd-persistent/src/storage_layer.rs +++ b/optd-persistent/src/storage_layer.rs @@ -1,13 +1,13 @@ #![allow(dead_code, unused_imports)] -use std::sync::Arc; use crate::entities::cascades_group; use crate::entities::event::Model as event_model; -use crate::entities::physical_expression; use crate::entities::logical_expression; +use crate::entities::physical_expression; use sea_orm::*; use sea_orm_migration::prelude::*; use serde_json::json; +use std::sync::Arc; pub type GroupId = i32; pub type ExprId = i32; @@ -19,7 +19,7 @@ pub enum CatalogSource { pub enum Expression { LogicalExpression(logical_expression::Model), - PhysicalExpression(physical_expression::Model) + PhysicalExpression(physical_expression::Model), } // TODO @@ -33,27 +33,38 @@ pub enum Expression { // pub statistics: Arc, // } // The optd WinnerInfo struct makes everything too coupled. -pub struct WinnerInfo { -} +pub struct WinnerInfo {} pub trait StorageLayer { // TODO: Change EpochId to event::Model::epoch_id async fn create_new_epoch(&self) -> EpochId; - async fn update_stats_from_catalog(&self, c:CatalogSource, epoch_id:EpochId) -> Result<(), ()>; + async fn update_stats_from_catalog( + &self, + c: CatalogSource, + epoch_id: EpochId, + ) -> Result<(), ()>; // i32 in `stats:i32` is a placeholder for the stats type - async fn update_stats(&self, stats:i32, epoch_id:EpochId) -> Result<(), ()>; - async fn store_cost(&self, expr_id:ExprId, cost:i32, epoch_id:EpochId) -> Result<(), ()>; + async fn update_stats(&self, stats: i32, epoch_id: EpochId) -> Result<(), ()>; + async fn store_cost(&self, expr_id: ExprId, cost: i32, epoch_id: EpochId) -> Result<(), ()>; // table_id, attr_id OR expr_id and return a vector? - async fn get_stats_analysis(&self, table_id:i32, attr_id:Option, epoch_id:EpochId) -> Option; - async fn get_stats(&self, table_id:i32, attr_id:Option) -> Option; - async fn get_cost_analysis(&self, expr_id:ExprId, epoch_id:EpochId) -> Option; - async fn get_cost(&self, expr_id:ExprId) -> Option; - - async fn get_group_winner_from_group_id(&self, group_id:i32) -> Option; + async fn get_stats_analysis( + &self, + table_id: i32, + attr_id: Option, + epoch_id: EpochId, + ) -> Option; + async fn get_stats(&self, table_id: i32, attr_id: Option) -> Option; + async fn get_cost_analysis(&self, expr_id: ExprId, epoch_id: EpochId) -> Option; + async fn get_cost(&self, expr_id: ExprId) -> Option; + + async fn get_group_winner_from_group_id( + &self, + group_id: i32, + ) -> Option; /// Add an expression to the memo table. If the expression already exists, it will return the existing group id and /// expr id. Otherwise, a new group and expr will be created. - async fn add_new_expr(&mut self, expr:Expression) -> (GroupId, ExprId); + async fn add_new_expr(&mut self, expr: Expression) -> (GroupId, ExprId); /// Add a new expression to an existing group. If the expression is a group, it will merge the two groups. Otherwise, /// it will add the expression to the group. Returns the expr id if the expression is not a group. @@ -73,13 +84,13 @@ pub trait StorageLayer { async fn get_group(&self, group_id: GroupId) -> cascades_group::ActiveModel; /// Update the group winner. - async fn update_group_winner(&mut self, group_id: GroupId, latest_winner:Option); + async fn update_group_winner(&mut self, group_id: GroupId, latest_winner: Option); // The below functions can be overwritten by the memo table implementation if there // are more efficient way to retrieve the information. /// Get all expressions in the group. - async fn get_all_exprs_in_group(&self, group_id: GroupId) -> Vec; + async fn get_all_exprs_in_group(&self, group_id: GroupId) -> Vec; /// Get winner info for a group id async fn get_group_info(&self, group_id: GroupId) -> &Option; @@ -111,13 +122,11 @@ pub trait StorageLayer { // // return Ok(node); // // } // // bail!("no best group binding for group {}", group_id) - // }; - + // }; /// Get all bindings of a predicate group. Will panic if the group contains more than one bindings. - async fn get_predicate_binding(&self, group_id: GroupId) -> Option; + async fn get_predicate_binding(&self, group_id: GroupId) -> Option; /// Get all bindings of a predicate group. Returns None if the group contains zero or more than one bindings. async fn try_get_predicate_binding(&self, group_id: GroupId) -> Option; - } From 57983d13eb4d04a8e7608c8ff4b4223365d4aaf0 Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Mon, 4 Nov 2024 16:09:00 -0500 Subject: [PATCH 04/14] Implement create_new_epoch --- optd-persistent/src/bin/migrate_test.rs | 16 ++++++ optd-persistent/src/lib.rs | 1 + optd-persistent/src/orm_manager.rs | 65 +++++++++++++++++++++++-- optd-persistent/src/storage_layer.rs | 2 +- 4 files changed, 79 insertions(+), 5 deletions(-) create mode 100644 optd-persistent/src/bin/migrate_test.rs diff --git a/optd-persistent/src/bin/migrate_test.rs b/optd-persistent/src/bin/migrate_test.rs new file mode 100644 index 0000000..a7aae41 --- /dev/null +++ b/optd-persistent/src/bin/migrate_test.rs @@ -0,0 +1,16 @@ +use optd_persistent::{migrate, TEST_DATABASE_URL}; +use sea_orm::*; +use sea_orm_migration::prelude::*; + +#[tokio::main] +async fn main() { + let _ = std::fs::remove_file(TEST_DATABASE_URL); + + let db = Database::connect(TEST_DATABASE_URL) + .await + .expect("Unable to connect to the database"); + + migrate(&db) + .await + .expect("Something went wrong during migration"); +} diff --git a/optd-persistent/src/lib.rs b/optd-persistent/src/lib.rs index 08df138..4b9d1f6 100644 --- a/optd-persistent/src/lib.rs +++ b/optd-persistent/src/lib.rs @@ -8,6 +8,7 @@ mod storage_layer; use migrator::Migrator; pub const DATABASE_URL: &str = "sqlite:./sqlite.db?mode=rwc"; +pub const TEST_DATABASE_URL: &str = "sqlite:./test.db?mode=rwc"; pub async fn migrate(db: &DatabaseConnection) -> Result<(), DbErr> { let schema_manager = SchemaManager::new(db); diff --git a/optd-persistent/src/orm_manager.rs b/optd-persistent/src/orm_manager.rs index 498891f..88892f0 100644 --- a/optd-persistent/src/orm_manager.rs +++ b/optd-persistent/src/orm_manager.rs @@ -1,8 +1,13 @@ #![allow(dead_code, unused_imports, unused_variables)] -use crate::entities::physical_expression; +use sea_orm::*; +use sea_orm_migration::prelude::*; +use sqlx::types::chrono::Utc; +use crate::entities::event::Entity as Event; +use crate::entities::{event, physical_expression}; use crate::storage_layer::{self, EpochId, StorageLayer}; -use sea_orm::DatabaseConnection; +use crate::DATABASE_URL; +use sea_orm::{Database, DatabaseConnection}; pub struct ORMManager { db_conn: DatabaseConnection, @@ -10,9 +15,30 @@ pub struct ORMManager { latest_epoch_id: EpochId, } +impl ORMManager { + pub async fn new(database_url: Option<&str>) -> Self { + let latest_epoch_id = -1; + let db_conn = Database::connect(database_url.unwrap_or(DATABASE_URL)).await.unwrap(); + Self { db_conn, latest_epoch_id } + } +} + impl StorageLayer for ORMManager { - async fn create_new_epoch(&self) -> storage_layer::EpochId { - todo!() + async fn create_new_epoch(&mut self, source:String, data:String) -> Result { + let new_event = event::ActiveModel { + source_variant: sea_orm::ActiveValue::Set(source), + create_timestamp: sea_orm::ActiveValue::Set(Utc::now()), + data: sea_orm::ActiveValue::Set(sea_orm::JsonValue::String(data)), + ..Default::default() + }; + let res = Event::insert(new_event).exec(&self.db_conn).await; + match res { + Ok(insert_res) => { + self.latest_epoch_id = insert_res.last_insert_id; + Ok(self.latest_epoch_id) + }, + Err(_) => Err(()), + } } async fn update_stats_from_catalog( @@ -138,3 +164,34 @@ impl StorageLayer for ORMManager { todo!() } } + +// NOTE: Please run `cargo run --bin migrate_test` before you want to run this test. +#[cfg(test)] +mod tests { + use sea_orm::{EntityTrait, ModelTrait}; + use serde_json::de; + + use crate::entities::event::Entity as Event; + use crate::storage_layer::StorageLayer; + use crate::TEST_DATABASE_URL; + + async fn delete_all_events(orm_manager: &mut super::ORMManager) { + let events = super::Event::find().all(&orm_manager.db_conn).await.unwrap(); + for event in events { + event.delete(&orm_manager.db_conn).await.unwrap(); + } + } + + #[tokio::test] + async fn test_create_new_epoch() { + let mut orm_manager = super::ORMManager::new(Some(TEST_DATABASE_URL)).await; + delete_all_events(&mut orm_manager).await; + let res = orm_manager.create_new_epoch("source".to_string(), "data".to_string()).await; + println!("{:?}", res); + assert!(res.is_ok()); + assert_eq!(super::Event::find().all(&orm_manager.db_conn).await.unwrap().len(), 1); + println!("{:?}", super::Event::find().all(&orm_manager.db_conn).await.unwrap()[0]); + assert_eq!(super::Event::find().all(&orm_manager.db_conn).await.unwrap()[0].epoch_id, res.unwrap()); + delete_all_events(&mut orm_manager).await; + } +} \ No newline at end of file diff --git a/optd-persistent/src/storage_layer.rs b/optd-persistent/src/storage_layer.rs index e4578ca..fd7f28d 100644 --- a/optd-persistent/src/storage_layer.rs +++ b/optd-persistent/src/storage_layer.rs @@ -37,7 +37,7 @@ pub struct WinnerInfo {} pub trait StorageLayer { // TODO: Change EpochId to event::Model::epoch_id - async fn create_new_epoch(&self) -> EpochId; + async fn create_new_epoch(&mut self, source:String, data:String) -> Result; async fn update_stats_from_catalog( &self, c: CatalogSource, From b4599f916d64212a6d43b2ba52d6d65b3cc6442f Mon Sep 17 00:00:00 2001 From: unw9527 <1041593558@qq.com> Date: Mon, 4 Nov 2024 17:13:48 -0500 Subject: [PATCH 05/14] add: store cost --- optd-persistent/src/bin/migrate_test.rs | 10 ++- optd-persistent/src/orm_manager.rs | 104 ++++++++++++++++++++---- optd-persistent/src/storage_layer.rs | 4 +- 3 files changed, 97 insertions(+), 21 deletions(-) diff --git a/optd-persistent/src/bin/migrate_test.rs b/optd-persistent/src/bin/migrate_test.rs index a7aae41..4f2b5af 100644 --- a/optd-persistent/src/bin/migrate_test.rs +++ b/optd-persistent/src/bin/migrate_test.rs @@ -2,8 +2,7 @@ use optd_persistent::{migrate, TEST_DATABASE_URL}; use sea_orm::*; use sea_orm_migration::prelude::*; -#[tokio::main] -async fn main() { +async fn run_migration() -> Result<(), Box> { let _ = std::fs::remove_file(TEST_DATABASE_URL); let db = Database::connect(TEST_DATABASE_URL) @@ -13,4 +12,11 @@ async fn main() { migrate(&db) .await .expect("Something went wrong during migration"); + + Ok(()) +} + +#[tokio::main] +async fn main() { + run_migration().await.expect("Migration failed"); } diff --git a/optd-persistent/src/orm_manager.rs b/optd-persistent/src/orm_manager.rs index 88892f0..708ddc4 100644 --- a/optd-persistent/src/orm_manager.rs +++ b/optd-persistent/src/orm_manager.rs @@ -1,13 +1,11 @@ #![allow(dead_code, unused_imports, unused_variables)] -use sea_orm::*; -use sea_orm_migration::prelude::*; -use sqlx::types::chrono::Utc; use crate::entities::event::Entity as Event; -use crate::entities::{event, physical_expression}; +use crate::entities::{prelude::*, *}; use crate::storage_layer::{self, EpochId, StorageLayer}; use crate::DATABASE_URL; -use sea_orm::{Database, DatabaseConnection}; +use sea_orm::*; +use sqlx::types::chrono::Utc; pub struct ORMManager { db_conn: DatabaseConnection, @@ -18,13 +16,22 @@ pub struct ORMManager { impl ORMManager { pub async fn new(database_url: Option<&str>) -> Self { let latest_epoch_id = -1; - let db_conn = Database::connect(database_url.unwrap_or(DATABASE_URL)).await.unwrap(); - Self { db_conn, latest_epoch_id } + let db_conn = Database::connect(database_url.unwrap_or(DATABASE_URL)) + .await + .unwrap(); + Self { + db_conn, + latest_epoch_id, + } } } impl StorageLayer for ORMManager { - async fn create_new_epoch(&mut self, source:String, data:String) -> Result { + async fn create_new_epoch( + &mut self, + source: String, + data: String, + ) -> Result { let new_event = event::ActiveModel { source_variant: sea_orm::ActiveValue::Set(source), create_timestamp: sea_orm::ActiveValue::Set(Utc::now()), @@ -36,7 +43,7 @@ impl StorageLayer for ORMManager { Ok(insert_res) => { self.latest_epoch_id = insert_res.last_insert_id; Ok(self.latest_epoch_id) - }, + } Err(_) => Err(()), } } @@ -58,8 +65,19 @@ impl StorageLayer for ORMManager { expr_id: storage_layer::ExprId, cost: i32, epoch_id: storage_layer::EpochId, - ) -> Result<(), ()> { - todo!() + ) -> Result<(), DbErr> { + let new_cost = cost::ActiveModel { + expr_id: ActiveValue::Set(expr_id), + epoch_id: ActiveValue::Set(epoch_id), + cost: ActiveValue::Set(cost), + valid: ActiveValue::Set(true), + ..Default::default() + }; + let res = Cost::insert(new_cost).exec(&self.db_conn).await; + match res { + Ok(_) => Ok(()), + Err(_) => Err(DbErr::RecordNotInserted), + } } async fn get_stats_analysis( @@ -176,22 +194,74 @@ mod tests { use crate::TEST_DATABASE_URL; async fn delete_all_events(orm_manager: &mut super::ORMManager) { - let events = super::Event::find().all(&orm_manager.db_conn).await.unwrap(); + let events = super::Event::find() + .all(&orm_manager.db_conn) + .await + .unwrap(); for event in events { event.delete(&orm_manager.db_conn).await.unwrap(); } } + async fn delete_all_costs(orm_manager: &mut super::ORMManager) { + let costs = super::Cost::find().all(&orm_manager.db_conn).await.unwrap(); + for cost in costs { + cost.delete(&orm_manager.db_conn).await.unwrap(); + } + } + #[tokio::test] async fn test_create_new_epoch() { let mut orm_manager = super::ORMManager::new(Some(TEST_DATABASE_URL)).await; delete_all_events(&mut orm_manager).await; - let res = orm_manager.create_new_epoch("source".to_string(), "data".to_string()).await; + let res = orm_manager + .create_new_epoch("source".to_string(), "data".to_string()) + .await; println!("{:?}", res); assert!(res.is_ok()); - assert_eq!(super::Event::find().all(&orm_manager.db_conn).await.unwrap().len(), 1); - println!("{:?}", super::Event::find().all(&orm_manager.db_conn).await.unwrap()[0]); - assert_eq!(super::Event::find().all(&orm_manager.db_conn).await.unwrap()[0].epoch_id, res.unwrap()); + assert_eq!( + super::Event::find() + .all(&orm_manager.db_conn) + .await + .unwrap() + .len(), + 1 + ); + println!( + "{:?}", + super::Event::find() + .all(&orm_manager.db_conn) + .await + .unwrap()[0] + ); + assert_eq!( + super::Event::find() + .all(&orm_manager.db_conn) + .await + .unwrap()[0] + .epoch_id, + res.unwrap() + ); delete_all_events(&mut orm_manager).await; } -} \ No newline at end of file + + #[tokio::test] + async fn test_store_cost() { + let mut orm_manager = super::ORMManager::new(Some(TEST_DATABASE_URL)).await; + delete_all_costs(&mut orm_manager).await; + let epoch_id = orm_manager + .create_new_epoch("source".to_string(), "data".to_string()) + .await + .unwrap(); + let expr_id = 1; + let cost = 42; + let res = orm_manager.store_cost(expr_id, cost, epoch_id).await; + assert!(res.is_ok()); + let costs = super::Cost::find().all(&orm_manager.db_conn).await.unwrap(); + assert_eq!(costs.len(), 1); + assert_eq!(costs[0].epoch_id, epoch_id); + assert_eq!(costs[0].expr_id, expr_id); + assert_eq!(costs[0].cost, cost); + delete_all_events(&mut orm_manager).await; + } +} diff --git a/optd-persistent/src/storage_layer.rs b/optd-persistent/src/storage_layer.rs index fd7f28d..1d35eda 100644 --- a/optd-persistent/src/storage_layer.rs +++ b/optd-persistent/src/storage_layer.rs @@ -37,7 +37,7 @@ pub struct WinnerInfo {} pub trait StorageLayer { // TODO: Change EpochId to event::Model::epoch_id - async fn create_new_epoch(&mut self, source:String, data:String) -> Result; + async fn create_new_epoch(&mut self, source: String, data: String) -> Result; async fn update_stats_from_catalog( &self, c: CatalogSource, @@ -45,7 +45,7 @@ pub trait StorageLayer { ) -> Result<(), ()>; // i32 in `stats:i32` is a placeholder for the stats type async fn update_stats(&self, stats: i32, epoch_id: EpochId) -> Result<(), ()>; - async fn store_cost(&self, expr_id: ExprId, cost: i32, epoch_id: EpochId) -> Result<(), ()>; + async fn store_cost(&self, expr_id: ExprId, cost: i32, epoch_id: EpochId) -> Result<(), DbErr>; // table_id, attr_id OR expr_id and return a vector? async fn get_stats_analysis( &self, From 72c48282aa664f71d6332e1a59a4d37f242eaa14 Mon Sep 17 00:00:00 2001 From: unw9527 <1041593558@qq.com> Date: Mon, 4 Nov 2024 22:33:04 -0500 Subject: [PATCH 06/14] restore migrate test --- optd-persistent/src/bin/migrate_test.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/optd-persistent/src/bin/migrate_test.rs b/optd-persistent/src/bin/migrate_test.rs index 4f2b5af..a7aae41 100644 --- a/optd-persistent/src/bin/migrate_test.rs +++ b/optd-persistent/src/bin/migrate_test.rs @@ -2,7 +2,8 @@ use optd_persistent::{migrate, TEST_DATABASE_URL}; use sea_orm::*; use sea_orm_migration::prelude::*; -async fn run_migration() -> Result<(), Box> { +#[tokio::main] +async fn main() { let _ = std::fs::remove_file(TEST_DATABASE_URL); let db = Database::connect(TEST_DATABASE_URL) @@ -12,11 +13,4 @@ async fn run_migration() -> Result<(), Box> { migrate(&db) .await .expect("Something went wrong during migration"); - - Ok(()) -} - -#[tokio::main] -async fn main() { - run_migration().await.expect("Migration failed"); } From f57a6bb3454a07ca44b284637e2518f0e3bb3266 Mon Sep 17 00:00:00 2001 From: unw9527 <1041593558@qq.com> Date: Tue, 5 Nov 2024 11:01:44 -0500 Subject: [PATCH 07/14] wip: crud operations related to the cost table --- optd-persistent/src/bin/migrate_test.rs | 7 ++ optd-persistent/src/lib.rs | 2 + optd-persistent/src/orm_manager.rs | 116 ++++++++++++++++-------- 3 files changed, 88 insertions(+), 37 deletions(-) diff --git a/optd-persistent/src/bin/migrate_test.rs b/optd-persistent/src/bin/migrate_test.rs index a7aae41..f4c9001 100644 --- a/optd-persistent/src/bin/migrate_test.rs +++ b/optd-persistent/src/bin/migrate_test.rs @@ -13,4 +13,11 @@ async fn main() { migrate(&db) .await .expect("Something went wrong during migration"); + + db.execute(sea_orm::Statement::from_string( + sea_orm::DatabaseBackend::Sqlite, + "PRAGMA foreign_keys = ON;".to_owned(), + )) + .await + .expect("Unable to enable foreign keys"); } diff --git a/optd-persistent/src/lib.rs b/optd-persistent/src/lib.rs index 4b9d1f6..460b1c2 100644 --- a/optd-persistent/src/lib.rs +++ b/optd-persistent/src/lib.rs @@ -22,6 +22,8 @@ pub async fn migrate(db: &DatabaseConnection) -> Result<(), DbErr> { assert!(schema_manager.has_table("physical_expression").await?); assert!(schema_manager.has_table("physical_property").await?); assert!(schema_manager.has_table("physical_group_junction").await?); + assert!(schema_manager.has_table("event").await?); + assert!(schema_manager.has_table("cost").await?); Ok(()) } diff --git a/optd-persistent/src/orm_manager.rs b/optd-persistent/src/orm_manager.rs index 708ddc4..bc07274 100644 --- a/optd-persistent/src/orm_manager.rs +++ b/optd-persistent/src/orm_manager.rs @@ -1,7 +1,7 @@ #![allow(dead_code, unused_imports, unused_variables)] -use crate::entities::event::Entity as Event; use crate::entities::{prelude::*, *}; +use crate::orm_manager::{Cost, Event}; use crate::storage_layer::{self, EpochId, StorageLayer}; use crate::DATABASE_URL; use sea_orm::*; @@ -60,12 +60,43 @@ impl StorageLayer for ORMManager { todo!() } + async fn get_stats_analysis( + &self, + table_id: i32, + attr_id: Option, + epoch_id: storage_layer::EpochId, + ) -> Option { + todo!() + } + + async fn get_stats(&self, table_id: i32, attr_id: Option) -> Option { + todo!() + } + async fn store_cost( &self, expr_id: storage_layer::ExprId, cost: i32, epoch_id: storage_layer::EpochId, ) -> Result<(), DbErr> { + // TODO: update PhysicalExpression and Event tables + // Check if expr_id exists in PhysicalExpression table + let expr_exists = PhysicalExpression::find_by_id(expr_id) + .one(&self.db_conn) + .await?; + if expr_exists.is_none() { + return Err(DbErr::RecordNotFound( + "ExprId not found in PhysicalExpression table".to_string(), + )); + } + + // Check if epoch_id exists in Event table + let epoch_exists = Event::find() + .filter(event::Column::EpochId.eq(epoch_id)) + .one(&self.db_conn) + .await + .unwrap(); + let new_cost = cost::ActiveModel { expr_id: ActiveValue::Set(expr_id), epoch_id: ActiveValue::Set(epoch_id), @@ -76,33 +107,37 @@ impl StorageLayer for ORMManager { let res = Cost::insert(new_cost).exec(&self.db_conn).await; match res { Ok(_) => Ok(()), - Err(_) => Err(DbErr::RecordNotInserted), + Err(e) => Err(DbErr::Custom(e.to_string())), } } - async fn get_stats_analysis( - &self, - table_id: i32, - attr_id: Option, - epoch_id: storage_layer::EpochId, - ) -> Option { - todo!() - } - - async fn get_stats(&self, table_id: i32, attr_id: Option) -> Option { - todo!() - } - async fn get_cost_analysis( &self, expr_id: storage_layer::ExprId, epoch_id: storage_layer::EpochId, ) -> Option { - todo!() + let cost = Cost::find() + .filter(cost::Column::ExprId.eq(expr_id)) + .filter(cost::Column::EpochId.eq(epoch_id)) + .one(&self.db_conn) + .await + .unwrap(); + assert!(cost.is_some(), "Cost not found in Cost table"); + assert!(cost.clone().unwrap().valid, "Cost is not valid"); + cost.map(|c| c.cost) } + /// Get the latest cost for an expression async fn get_cost(&self, expr_id: storage_layer::ExprId) -> Option { - todo!() + let cost = Cost::find() + .filter(cost::Column::ExprId.eq(expr_id)) + .order_by_desc(cost::Column::EpochId) + .one(&self.db_conn) + .await + .unwrap(); + assert!(cost.is_some(), "Cost not found in Cost table"); + assert!(cost.clone().unwrap().valid, "Cost is not valid"); + cost.map(|c| c.cost) } async fn get_group_winner_from_group_id( @@ -183,37 +218,39 @@ impl StorageLayer for ORMManager { } } -// NOTE: Please run `cargo run --bin migrate_test` before you want to run this test. #[cfg(test)] mod tests { - use sea_orm::{EntityTrait, ModelTrait}; + use crate::migrate; + use sea_orm::{ConnectionTrait, Database, EntityTrait, ModelTrait}; use serde_json::de; use crate::entities::event::Entity as Event; use crate::storage_layer::StorageLayer; use crate::TEST_DATABASE_URL; - async fn delete_all_events(orm_manager: &mut super::ORMManager) { - let events = super::Event::find() - .all(&orm_manager.db_conn) + async fn run_migration() { + let _ = std::fs::remove_file(TEST_DATABASE_URL); + + let db = Database::connect(TEST_DATABASE_URL) .await - .unwrap(); - for event in events { - event.delete(&orm_manager.db_conn).await.unwrap(); - } - } + .expect("Unable to connect to the database"); - async fn delete_all_costs(orm_manager: &mut super::ORMManager) { - let costs = super::Cost::find().all(&orm_manager.db_conn).await.unwrap(); - for cost in costs { - cost.delete(&orm_manager.db_conn).await.unwrap(); - } + migrate(&db) + .await + .expect("Something went wrong during migration"); + + db.execute(sea_orm::Statement::from_string( + sea_orm::DatabaseBackend::Sqlite, + "PRAGMA foreign_keys = ON;".to_owned(), + )) + .await + .expect("Unable to enable foreign keys"); } #[tokio::test] async fn test_create_new_epoch() { + run_migration().await; let mut orm_manager = super::ORMManager::new(Some(TEST_DATABASE_URL)).await; - delete_all_events(&mut orm_manager).await; let res = orm_manager .create_new_epoch("source".to_string(), "data".to_string()) .await; @@ -242,13 +279,13 @@ mod tests { .epoch_id, res.unwrap() ); - delete_all_events(&mut orm_manager).await; } #[tokio::test] + #[ignore] // Need to update all tables async fn test_store_cost() { + run_migration().await; let mut orm_manager = super::ORMManager::new(Some(TEST_DATABASE_URL)).await; - delete_all_costs(&mut orm_manager).await; let epoch_id = orm_manager .create_new_epoch("source".to_string(), "data".to_string()) .await @@ -256,12 +293,17 @@ mod tests { let expr_id = 1; let cost = 42; let res = orm_manager.store_cost(expr_id, cost, epoch_id).await; - assert!(res.is_ok()); + match res { + Ok(_) => assert!(true), + Err(e) => { + println!("Error: {:?}", e); + assert!(false); + } + } let costs = super::Cost::find().all(&orm_manager.db_conn).await.unwrap(); assert_eq!(costs.len(), 1); assert_eq!(costs[0].epoch_id, epoch_id); assert_eq!(costs[0].expr_id, expr_id); assert_eq!(costs[0].cost, cost); - delete_all_events(&mut orm_manager).await; } } From f94cc7a7cb6e9571bde651e2d048533700348878 Mon Sep 17 00:00:00 2001 From: Yuanxin Cao Date: Tue, 5 Nov 2024 12:50:48 -0800 Subject: [PATCH 08/14] introduce StorageResult and make methods return StorageResult --- optd-persistent/src/orm_manager.rs | 52 +++++++++++++++---------- optd-persistent/src/storage_layer.rs | 57 ++++++++++++++++++---------- 2 files changed, 68 insertions(+), 41 deletions(-) diff --git a/optd-persistent/src/orm_manager.rs b/optd-persistent/src/orm_manager.rs index 498891f..f77b9d0 100644 --- a/optd-persistent/src/orm_manager.rs +++ b/optd-persistent/src/orm_manager.rs @@ -1,7 +1,7 @@ #![allow(dead_code, unused_imports, unused_variables)] use crate::entities::physical_expression; -use crate::storage_layer::{self, EpochId, StorageLayer}; +use crate::storage_layer::{self, EpochId, StorageLayer, StorageResult}; use sea_orm::DatabaseConnection; pub struct ORMManager { @@ -11,7 +11,7 @@ pub struct ORMManager { } impl StorageLayer for ORMManager { - async fn create_new_epoch(&self) -> storage_layer::EpochId { + async fn create_new_epoch(&self) -> StorageResult { todo!() } @@ -19,11 +19,15 @@ impl StorageLayer for ORMManager { &self, c: storage_layer::CatalogSource, epoch_id: storage_layer::EpochId, - ) -> Result<(), ()> { + ) -> StorageResult<()> { todo!() } - async fn update_stats(&self, stats: i32, epoch_id: storage_layer::EpochId) -> Result<(), ()> { + async fn update_stats( + &self, + stats: i32, + epoch_id: storage_layer::EpochId, + ) -> StorageResult<()> { todo!() } @@ -32,7 +36,7 @@ impl StorageLayer for ORMManager { expr_id: storage_layer::ExprId, cost: i32, epoch_id: storage_layer::EpochId, - ) -> Result<(), ()> { + ) -> StorageResult<()> { todo!() } @@ -41,11 +45,11 @@ impl StorageLayer for ORMManager { table_id: i32, attr_id: Option, epoch_id: storage_layer::EpochId, - ) -> Option { + ) -> StorageResult> { todo!() } - async fn get_stats(&self, table_id: i32, attr_id: Option) -> Option { + async fn get_stats(&self, table_id: i32, attr_id: Option) -> StorageResult> { todo!() } @@ -53,25 +57,25 @@ impl StorageLayer for ORMManager { &self, expr_id: storage_layer::ExprId, epoch_id: storage_layer::EpochId, - ) -> Option { + ) -> StorageResult> { todo!() } - async fn get_cost(&self, expr_id: storage_layer::ExprId) -> Option { + async fn get_cost(&self, expr_id: storage_layer::ExprId) -> StorageResult> { todo!() } async fn get_group_winner_from_group_id( &self, group_id: i32, - ) -> Option { + ) -> StorageResult> { todo!() } async fn add_new_expr( &mut self, expr: storage_layer::Expression, - ) -> (storage_layer::GroupId, storage_layer::ExprId) { + ) -> StorageResult<(storage_layer::GroupId, storage_layer::ExprId)> { todo!() } @@ -79,26 +83,32 @@ impl StorageLayer for ORMManager { &mut self, expr: storage_layer::Expression, group_id: storage_layer::GroupId, - ) -> Option { + ) -> StorageResult> { todo!() } - async fn get_group_id(&self, expr_id: storage_layer::ExprId) -> storage_layer::GroupId { + async fn get_group_id( + &self, + expr_id: storage_layer::ExprId, + ) -> StorageResult { todo!() } - async fn get_expr_memoed(&self, expr_id: storage_layer::ExprId) -> storage_layer::Expression { + async fn get_expr_memoed( + &self, + expr_id: storage_layer::ExprId, + ) -> StorageResult { todo!() } - async fn get_all_group_ids(&self) -> Vec { + async fn get_all_group_ids(&self) -> StorageResult> { todo!() } async fn get_group( &self, group_id: storage_layer::GroupId, - ) -> crate::entities::cascades_group::ActiveModel { + ) -> StorageResult { todo!() } @@ -106,35 +116,35 @@ impl StorageLayer for ORMManager { &mut self, group_id: storage_layer::GroupId, latest_winner: Option, - ) { + ) -> StorageResult<()> { todo!() } async fn get_all_exprs_in_group( &self, group_id: storage_layer::GroupId, - ) -> Vec { + ) -> StorageResult> { todo!() } async fn get_group_info( &self, group_id: storage_layer::GroupId, - ) -> &Option { + ) -> StorageResult<&Option> { todo!() } async fn get_predicate_binding( &self, group_id: storage_layer::GroupId, - ) -> Option { + ) -> StorageResult> { todo!() } async fn try_get_predicate_binding( &self, group_id: storage_layer::GroupId, - ) -> Option { + ) -> StorageResult> { todo!() } } diff --git a/optd-persistent/src/storage_layer.rs b/optd-persistent/src/storage_layer.rs index e4578ca..f16f52d 100644 --- a/optd-persistent/src/storage_layer.rs +++ b/optd-persistent/src/storage_layer.rs @@ -13,6 +13,8 @@ pub type GroupId = i32; pub type ExprId = i32; pub type EpochId = i32; +pub type StorageResult = Result; + pub enum CatalogSource { Iceberg(), } @@ -37,63 +39,75 @@ pub struct WinnerInfo {} pub trait StorageLayer { // TODO: Change EpochId to event::Model::epoch_id - async fn create_new_epoch(&self) -> EpochId; + async fn create_new_epoch(&self) -> StorageResult; async fn update_stats_from_catalog( &self, c: CatalogSource, epoch_id: EpochId, - ) -> Result<(), ()>; + ) -> StorageResult<()>; // i32 in `stats:i32` is a placeholder for the stats type - async fn update_stats(&self, stats: i32, epoch_id: EpochId) -> Result<(), ()>; - async fn store_cost(&self, expr_id: ExprId, cost: i32, epoch_id: EpochId) -> Result<(), ()>; + async fn update_stats(&self, stats: i32, epoch_id: EpochId) -> StorageResult<()>; + async fn store_cost(&self, expr_id: ExprId, cost: i32, epoch_id: EpochId) -> StorageResult<()>; // table_id, attr_id OR expr_id and return a vector? async fn get_stats_analysis( &self, table_id: i32, attr_id: Option, epoch_id: EpochId, - ) -> Option; - async fn get_stats(&self, table_id: i32, attr_id: Option) -> Option; - async fn get_cost_analysis(&self, expr_id: ExprId, epoch_id: EpochId) -> Option; - async fn get_cost(&self, expr_id: ExprId) -> Option; + ) -> StorageResult>; + async fn get_stats(&self, table_id: i32, attr_id: Option) -> StorageResult>; + async fn get_cost_analysis( + &self, + expr_id: ExprId, + epoch_id: EpochId, + ) -> StorageResult>; + async fn get_cost(&self, expr_id: ExprId) -> StorageResult>; async fn get_group_winner_from_group_id( &self, group_id: i32, - ) -> Option; + ) -> StorageResult>; /// Add an expression to the memo table. If the expression already exists, it will return the existing group id and /// expr id. Otherwise, a new group and expr will be created. - async fn add_new_expr(&mut self, expr: Expression) -> (GroupId, ExprId); + async fn add_new_expr(&mut self, expr: Expression) -> StorageResult<(GroupId, ExprId)>; /// Add a new expression to an existing group. If the expression is a group, it will merge the two groups. Otherwise, /// it will add the expression to the group. Returns the expr id if the expression is not a group. - async fn add_expr_to_group(&mut self, expr: Expression, group_id: GroupId) -> Option; + async fn add_expr_to_group( + &mut self, + expr: Expression, + group_id: GroupId, + ) -> StorageResult>; /// Get the group id of an expression. /// The group id is volatile, depending on whether the groups are merged. - async fn get_group_id(&self, expr_id: ExprId) -> GroupId; + async fn get_group_id(&self, expr_id: ExprId) -> StorageResult; /// Get the memoized representation of a node. - async fn get_expr_memoed(&self, expr_id: ExprId) -> Expression; + async fn get_expr_memoed(&self, expr_id: ExprId) -> StorageResult; /// Get all groups IDs in the memo table. - async fn get_all_group_ids(&self) -> Vec; + async fn get_all_group_ids(&self) -> StorageResult>; /// Get a group by ID - async fn get_group(&self, group_id: GroupId) -> cascades_group::ActiveModel; + async fn get_group(&self, group_id: GroupId) -> StorageResult; /// Update the group winner. - async fn update_group_winner(&mut self, group_id: GroupId, latest_winner: Option); + async fn update_group_winner( + &mut self, + group_id: GroupId, + latest_winner: Option, + ) -> StorageResult<()>; // The below functions can be overwritten by the memo table implementation if there // are more efficient way to retrieve the information. /// Get all expressions in the group. - async fn get_all_exprs_in_group(&self, group_id: GroupId) -> Vec; + async fn get_all_exprs_in_group(&self, group_id: GroupId) -> StorageResult>; /// Get winner info for a group id - async fn get_group_info(&self, group_id: GroupId) -> &Option; + async fn get_group_info(&self, group_id: GroupId) -> StorageResult<&Option>; // TODO: /// Get the best group binding based on the cost @@ -125,8 +139,11 @@ pub trait StorageLayer { // }; /// Get all bindings of a predicate group. Will panic if the group contains more than one bindings. - async fn get_predicate_binding(&self, group_id: GroupId) -> Option; + async fn get_predicate_binding(&self, group_id: GroupId) -> StorageResult>; /// Get all bindings of a predicate group. Returns None if the group contains zero or more than one bindings. - async fn try_get_predicate_binding(&self, group_id: GroupId) -> Option; + async fn try_get_predicate_binding( + &self, + group_id: GroupId, + ) -> StorageResult>; } From 2894d399f3a16d82e3972a1eeb0d8113740c3073 Mon Sep 17 00:00:00 2001 From: Yuanxin Cao Date: Tue, 5 Nov 2024 13:15:39 -0800 Subject: [PATCH 09/14] sync w impl --- optd-persistent/src/orm_manager.rs | 2 +- optd-persistent/src/storage_layer.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/optd-persistent/src/orm_manager.rs b/optd-persistent/src/orm_manager.rs index f77b9d0..1883dce 100644 --- a/optd-persistent/src/orm_manager.rs +++ b/optd-persistent/src/orm_manager.rs @@ -11,7 +11,7 @@ pub struct ORMManager { } impl StorageLayer for ORMManager { - async fn create_new_epoch(&self) -> StorageResult { + async fn create_new_epoch(&mut self, source: String, data: String) -> StorageResult { todo!() } diff --git a/optd-persistent/src/storage_layer.rs b/optd-persistent/src/storage_layer.rs index f16f52d..4cafa61 100644 --- a/optd-persistent/src/storage_layer.rs +++ b/optd-persistent/src/storage_layer.rs @@ -39,7 +39,7 @@ pub struct WinnerInfo {} pub trait StorageLayer { // TODO: Change EpochId to event::Model::epoch_id - async fn create_new_epoch(&self) -> StorageResult; + async fn create_new_epoch(&mut self, source: String, data: String) -> StorageResult; async fn update_stats_from_catalog( &self, c: CatalogSource, From 090be14ce7b59b7cc0b7d32f30754defdffa5aff Mon Sep 17 00:00:00 2001 From: Yuanxin Cao Date: Tue, 5 Nov 2024 13:36:54 -0800 Subject: [PATCH 10/14] refine get stats interface --- optd-persistent/src/orm_manager.rs | 22 +++++++++++++---- optd-persistent/src/storage_layer.rs | 36 ++++++++++++++++++++++++---- 2 files changed, 49 insertions(+), 9 deletions(-) diff --git a/optd-persistent/src/orm_manager.rs b/optd-persistent/src/orm_manager.rs index 1883dce..14fa0dd 100644 --- a/optd-persistent/src/orm_manager.rs +++ b/optd-persistent/src/orm_manager.rs @@ -40,16 +40,30 @@ impl StorageLayer for ORMManager { todo!() } - async fn get_stats_analysis( + async fn get_stats_for_table( &self, table_id: i32, - attr_id: Option, - epoch_id: storage_layer::EpochId, + stat_type: i32, + epoch_id: Option, + ) -> StorageResult> { + todo!() + } + + async fn get_stats_for_attr( + &self, + attr_id: i32, + stat_type: i32, + epoch_id: Option, ) -> StorageResult> { todo!() } - async fn get_stats(&self, table_id: i32, attr_id: Option) -> StorageResult> { + async fn get_stats_for_attrs( + &self, + attr_ids: Vec, + stat_type: i32, + epoch_id: Option, + ) -> StorageResult> { todo!() } diff --git a/optd-persistent/src/storage_layer.rs b/optd-persistent/src/storage_layer.rs index 4cafa61..50c61dc 100644 --- a/optd-persistent/src/storage_layer.rs +++ b/optd-persistent/src/storage_layer.rs @@ -40,22 +40,48 @@ pub struct WinnerInfo {} pub trait StorageLayer { // TODO: Change EpochId to event::Model::epoch_id async fn create_new_epoch(&mut self, source: String, data: String) -> StorageResult; + async fn update_stats_from_catalog( &self, c: CatalogSource, epoch_id: EpochId, ) -> StorageResult<()>; + // i32 in `stats:i32` is a placeholder for the stats type async fn update_stats(&self, stats: i32, epoch_id: EpochId) -> StorageResult<()>; + async fn store_cost(&self, expr_id: ExprId, cost: i32, epoch_id: EpochId) -> StorageResult<()>; - // table_id, attr_id OR expr_id and return a vector? - async fn get_stats_analysis( + + /// Get the statistics for a given table. + /// + /// If `epoch_id` is None, it will return the latest statistics. + async fn get_stats_for_table( &self, table_id: i32, - attr_id: Option, - epoch_id: EpochId, + stat_type: i32, + epoch_id: Option, + ) -> StorageResult>; + + /// Get the statistics for a given attribute. + /// + /// If `epoch_id` is None, it will return the latest statistics. + async fn get_stats_for_attr( + &self, + attr_id: i32, + stat_type: i32, + epoch_id: Option, + ) -> StorageResult>; + + /// Get the joint statistics for a list of attributes. + /// + /// If `epoch_id` is None, it will return the latest statistics. + async fn get_stats_for_attrs( + &self, + attr_ids: Vec, + stat_type: i32, + epoch_id: Option, ) -> StorageResult>; - async fn get_stats(&self, table_id: i32, attr_id: Option) -> StorageResult>; + async fn get_cost_analysis( &self, expr_id: ExprId, From 513800ce7a2ac097905f93eb2b40398428e3f4df Mon Sep 17 00:00:00 2001 From: Yuanxin Cao Date: Tue, 5 Nov 2024 21:11:52 -0800 Subject: [PATCH 11/14] make get_stats return float --- optd-persistent/src/orm_manager.rs | 6 +++--- optd-persistent/src/storage_layer.rs | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/optd-persistent/src/orm_manager.rs b/optd-persistent/src/orm_manager.rs index 14fa0dd..eb05a2b 100644 --- a/optd-persistent/src/orm_manager.rs +++ b/optd-persistent/src/orm_manager.rs @@ -45,7 +45,7 @@ impl StorageLayer for ORMManager { table_id: i32, stat_type: i32, epoch_id: Option, - ) -> StorageResult> { + ) -> StorageResult> { todo!() } @@ -54,7 +54,7 @@ impl StorageLayer for ORMManager { attr_id: i32, stat_type: i32, epoch_id: Option, - ) -> StorageResult> { + ) -> StorageResult> { todo!() } @@ -63,7 +63,7 @@ impl StorageLayer for ORMManager { attr_ids: Vec, stat_type: i32, epoch_id: Option, - ) -> StorageResult> { + ) -> StorageResult> { todo!() } diff --git a/optd-persistent/src/storage_layer.rs b/optd-persistent/src/storage_layer.rs index 50c61dc..20d5023 100644 --- a/optd-persistent/src/storage_layer.rs +++ b/optd-persistent/src/storage_layer.rs @@ -60,7 +60,7 @@ pub trait StorageLayer { table_id: i32, stat_type: i32, epoch_id: Option, - ) -> StorageResult>; + ) -> StorageResult>; /// Get the statistics for a given attribute. /// @@ -70,7 +70,7 @@ pub trait StorageLayer { attr_id: i32, stat_type: i32, epoch_id: Option, - ) -> StorageResult>; + ) -> StorageResult>; /// Get the joint statistics for a list of attributes. /// @@ -80,7 +80,7 @@ pub trait StorageLayer { attr_ids: Vec, stat_type: i32, epoch_id: Option, - ) -> StorageResult>; + ) -> StorageResult>; async fn get_cost_analysis( &self, From c6ad051db699c447eb674a8983e6ece2ab4d5656 Mon Sep 17 00:00:00 2001 From: Yuanxin Cao Date: Tue, 5 Nov 2024 22:32:07 -0800 Subject: [PATCH 12/14] remove impls --- optd-persistent/src/orm_manager.rs | 153 +-------------------------- optd-persistent/src/storage_layer.rs | 1 + 2 files changed, 5 insertions(+), 149 deletions(-) diff --git a/optd-persistent/src/orm_manager.rs b/optd-persistent/src/orm_manager.rs index b8eed86..462eee6 100644 --- a/optd-persistent/src/orm_manager.rs +++ b/optd-persistent/src/orm_manager.rs @@ -32,17 +32,7 @@ impl StorageLayer for ORMManager { source: String, data: String, ) -> StorageResult { - let new_event = event::ActiveModel { - source_variant: sea_orm::ActiveValue::Set(source), - timestamp: sea_orm::ActiveValue::Set(Utc::now()), - data: sea_orm::ActiveValue::Set(sea_orm::JsonValue::String(data)), - ..Default::default() - }; - let res = Event::insert(new_event).exec(&self.db_conn).await; - res.and_then(|insert_res| { - self.latest_epoch_id = insert_res.last_insert_id; - Ok(self.latest_epoch_id) - }) + todo!() } async fn update_stats_from_catalog( @@ -67,35 +57,7 @@ impl StorageLayer for ORMManager { cost: i32, epoch_id: storage_layer::EpochId, ) -> StorageResult<()> { - // TODO: update PhysicalExpression and Event tables - // Check if expr_id exists in PhysicalExpression table - let expr_exists = PhysicalExpression::find_by_id(expr_id) - .one(&self.db_conn) - .await?; - if expr_exists.is_none() { - return Err(DbErr::RecordNotFound( - "ExprId not found in PhysicalExpression table".to_string(), - )); - } - - // Check if epoch_id exists in Event table - let epoch_exists = Event::find() - .filter(event::Column::EpochId.eq(epoch_id)) - .one(&self.db_conn) - .await - .unwrap(); - - let new_cost = plan_cost::ActiveModel { - physical_expression_id: ActiveValue::Set(expr_id), - epoch_id: ActiveValue::Set(epoch_id), - cost: ActiveValue::Set(cost), - is_valid: ActiveValue::Set(true), - ..Default::default() - }; - PlanCost::insert(new_cost) - .exec(&self.db_conn) - .await - .map(|_| ()) + todo!() } async fn get_stats_for_table( @@ -130,26 +92,12 @@ impl StorageLayer for ORMManager { expr_id: storage_layer::ExprId, epoch_id: storage_layer::EpochId, ) -> StorageResult> { - let cost = PlanCost::find() - .filter(plan_cost::Column::PhysicalExpressionId.eq(expr_id)) - .filter(plan_cost::Column::EpochId.eq(epoch_id)) - .one(&self.db_conn) - .await?; - assert!(cost.is_some(), "Cost not found in Cost table"); - assert!(cost.clone().unwrap().is_valid, "Cost is not valid"); - Ok(cost.map(|c| c.cost)) + todo!() } /// Get the latest cost for an expression async fn get_cost(&self, expr_id: storage_layer::ExprId) -> StorageResult> { - let cost = PlanCost::find() - .filter(plan_cost::Column::PhysicalExpressionId.eq(expr_id)) - .order_by_desc(plan_cost::Column::EpochId) - .one(&self.db_conn) - .await?; - assert!(cost.is_some(), "Cost not found in Cost table"); - assert!(cost.clone().unwrap().is_valid, "Cost is not valid"); - Ok(cost.map(|c| c.cost)) + todo!() } async fn get_group_winner_from_group_id( @@ -235,96 +183,3 @@ impl StorageLayer for ORMManager { todo!() } } - -#[cfg(test)] -mod tests { - use crate::migrate; - use sea_orm::{ConnectionTrait, Database, EntityTrait, ModelTrait}; - use serde_json::de; - - use crate::entities::event::Entity as Event; - use crate::storage_layer::StorageLayer; - use crate::TEST_DATABASE_URL; - - async fn run_migration() { - let _ = std::fs::remove_file(TEST_DATABASE_URL); - - let db = Database::connect(TEST_DATABASE_URL) - .await - .expect("Unable to connect to the database"); - - migrate(&db) - .await - .expect("Something went wrong during migration"); - - db.execute(sea_orm::Statement::from_string( - sea_orm::DatabaseBackend::Sqlite, - "PRAGMA foreign_keys = ON;".to_owned(), - )) - .await - .expect("Unable to enable foreign keys"); - } - - #[tokio::test] - async fn test_create_new_epoch() { - run_migration().await; - let mut orm_manager = super::ORMManager::new(Some(TEST_DATABASE_URL)).await; - let res = orm_manager - .create_new_epoch("source".to_string(), "data".to_string()) - .await; - println!("{:?}", res); - assert!(res.is_ok()); - assert_eq!( - super::Event::find() - .all(&orm_manager.db_conn) - .await - .unwrap() - .len(), - 1 - ); - println!( - "{:?}", - super::Event::find() - .all(&orm_manager.db_conn) - .await - .unwrap()[0] - ); - assert_eq!( - super::Event::find() - .all(&orm_manager.db_conn) - .await - .unwrap()[0] - .epoch_id, - res.unwrap() - ); - } - - #[tokio::test] - #[ignore] // Need to update all tables - async fn test_store_cost() { - run_migration().await; - let mut orm_manager = super::ORMManager::new(Some(TEST_DATABASE_URL)).await; - let epoch_id = orm_manager - .create_new_epoch("source".to_string(), "data".to_string()) - .await - .unwrap(); - let expr_id = 1; - let cost = 42; - let res = orm_manager.store_cost(expr_id, cost, epoch_id).await; - match res { - Ok(_) => assert!(true), - Err(e) => { - println!("Error: {:?}", e); - assert!(false); - } - } - let costs = super::PlanCost::find() - .all(&orm_manager.db_conn) - .await - .unwrap(); - assert_eq!(costs.len(), 1); - assert_eq!(costs[0].epoch_id, epoch_id); - assert_eq!(costs[0].physical_expression_id, expr_id); - assert_eq!(costs[0].cost, cost); - } -} diff --git a/optd-persistent/src/storage_layer.rs b/optd-persistent/src/storage_layer.rs index 20d5023..93590f6 100644 --- a/optd-persistent/src/storage_layer.rs +++ b/optd-persistent/src/storage_layer.rs @@ -87,6 +87,7 @@ pub trait StorageLayer { expr_id: ExprId, epoch_id: EpochId, ) -> StorageResult>; + async fn get_cost(&self, expr_id: ExprId) -> StorageResult>; async fn get_group_winner_from_group_id( From 863c9f72a13505f87d53ee36a84990e740c12559 Mon Sep 17 00:00:00 2001 From: Yuanxin Cao Date: Tue, 5 Nov 2024 22:36:17 -0800 Subject: [PATCH 13/14] implement get_stats methods --- optd-persistent/src/orm_manager.rs | 221 ++++++++++++++++++++++++++++- 1 file changed, 214 insertions(+), 7 deletions(-) diff --git a/optd-persistent/src/orm_manager.rs b/optd-persistent/src/orm_manager.rs index 462eee6..b68dc10 100644 --- a/optd-persistent/src/orm_manager.rs +++ b/optd-persistent/src/orm_manager.rs @@ -5,6 +5,7 @@ use crate::orm_manager::{Event, PlanCost}; use crate::storage_layer::{self, EpochId, StorageLayer, StorageResult}; use crate::DATABASE_URL; use sea_orm::*; +use sea_query::Expr; use sqlx::types::chrono::Utc; pub struct ORMManager { @@ -32,7 +33,17 @@ impl StorageLayer for ORMManager { source: String, data: String, ) -> StorageResult { - todo!() + let new_event = event::ActiveModel { + source_variant: sea_orm::ActiveValue::Set(source), + timestamp: sea_orm::ActiveValue::Set(Utc::now()), + data: sea_orm::ActiveValue::Set(sea_orm::JsonValue::String(data)), + ..Default::default() + }; + let res = Event::insert(new_event).exec(&self.db_conn).await; + res.and_then(|insert_res| { + self.latest_epoch_id = insert_res.last_insert_id; + Ok(self.latest_epoch_id) + }) } async fn update_stats_from_catalog( @@ -57,7 +68,35 @@ impl StorageLayer for ORMManager { cost: i32, epoch_id: storage_layer::EpochId, ) -> StorageResult<()> { - todo!() + // TODO: update PhysicalExpression and Event tables + // Check if expr_id exists in PhysicalExpression table + let expr_exists = PhysicalExpression::find_by_id(expr_id) + .one(&self.db_conn) + .await?; + if expr_exists.is_none() { + return Err(DbErr::RecordNotFound( + "ExprId not found in PhysicalExpression table".to_string(), + )); + } + + // Check if epoch_id exists in Event table + let epoch_exists = Event::find() + .filter(event::Column::EpochId.eq(epoch_id)) + .one(&self.db_conn) + .await + .unwrap(); + + let new_cost = plan_cost::ActiveModel { + physical_expression_id: ActiveValue::Set(expr_id), + epoch_id: ActiveValue::Set(epoch_id), + cost: ActiveValue::Set(cost), + is_valid: ActiveValue::Set(true), + ..Default::default() + }; + PlanCost::insert(new_cost) + .exec(&self.db_conn) + .await + .map(|_| ()) } async fn get_stats_for_table( @@ -66,7 +105,23 @@ impl StorageLayer for ORMManager { stat_type: i32, epoch_id: Option, ) -> StorageResult> { - todo!() + match epoch_id { + Some(epoch_id) => Statistic::find() + .filter(statistic::Column::TableId.eq(table_id)) + .filter(statistic::Column::StatisticType.eq(stat_type)) + .filter(statistic::Column::EpochId.eq(epoch_id)) + .one(&self.db_conn) + .await + .map(|stat| stat.map(|s| s.statistic_value)), + + None => Statistic::find() + .filter(statistic::Column::TableId.eq(table_id)) + .filter(statistic::Column::StatisticType.eq(stat_type)) + .order_by_desc(statistic::Column::EpochId) + .one(&self.db_conn) + .await + .map(|stat| stat.map(|s| s.statistic_value)), + } } async fn get_stats_for_attr( @@ -75,7 +130,27 @@ impl StorageLayer for ORMManager { stat_type: i32, epoch_id: Option, ) -> StorageResult> { - todo!() + match epoch_id { + Some(epoch_id) => Statistic::find() + .filter(statistic::Column::NumberOfAttributes.eq(1)) + .filter(statistic::Column::StatisticType.eq(stat_type)) + .filter(statistic::Column::EpochId.eq(epoch_id)) + .inner_join(statistic_to_attribute_junction::Entity) + .filter(statistic_to_attribute_junction::Column::AttributeId.eq(attr_id)) + .one(&self.db_conn) + .await + .map(|stat| stat.map(|s| s.statistic_value)), + + None => Statistic::find() + .filter(statistic::Column::NumberOfAttributes.eq(1)) + .filter(statistic::Column::StatisticType.eq(stat_type)) + .inner_join(statistic_to_attribute_junction::Entity) + .filter(statistic_to_attribute_junction::Column::AttributeId.eq(attr_id)) + .order_by_desc(statistic::Column::EpochId) + .one(&self.db_conn) + .await + .map(|stat| stat.map(|s| s.statistic_value)), + } } async fn get_stats_for_attrs( @@ -84,7 +159,32 @@ impl StorageLayer for ORMManager { stat_type: i32, epoch_id: Option, ) -> StorageResult> { - todo!() + let attr_count = attr_ids.len() as i32; + match epoch_id { + Some(epoch_id) => Statistic::find() + .filter(statistic::Column::NumberOfAttributes.eq(attr_count)) + .filter(statistic::Column::StatisticType.eq(stat_type)) + .filter(statistic::Column::EpochId.eq(epoch_id)) + .inner_join(statistic_to_attribute_junction::Entity) + .filter(statistic_to_attribute_junction::Column::AttributeId.is_in(attr_ids)) + .group_by(statistic::Column::Id) + .having(Expr::col(statistic::Column::Name).count().eq(attr_count)) + .one(&self.db_conn) + .await + .map(|stat| stat.map(|s| s.statistic_value)), + + None => Statistic::find() + .filter(statistic::Column::NumberOfAttributes.eq(attr_count)) + .filter(statistic::Column::StatisticType.eq(stat_type)) + .inner_join(statistic_to_attribute_junction::Entity) + .filter(statistic_to_attribute_junction::Column::AttributeId.is_in(attr_ids)) + .group_by(statistic::Column::Id) + .having(Expr::col(statistic::Column::Name).count().eq(attr_count)) + .order_by_desc(statistic::Column::EpochId) + .one(&self.db_conn) + .await + .map(|stat| stat.map(|s| s.statistic_value)), + } } async fn get_cost_analysis( @@ -92,12 +192,26 @@ impl StorageLayer for ORMManager { expr_id: storage_layer::ExprId, epoch_id: storage_layer::EpochId, ) -> StorageResult> { - todo!() + let cost = PlanCost::find() + .filter(plan_cost::Column::PhysicalExpressionId.eq(expr_id)) + .filter(plan_cost::Column::EpochId.eq(epoch_id)) + .one(&self.db_conn) + .await?; + assert!(cost.is_some(), "Cost not found in Cost table"); + assert!(cost.clone().unwrap().is_valid, "Cost is not valid"); + Ok(cost.map(|c| c.cost)) } /// Get the latest cost for an expression async fn get_cost(&self, expr_id: storage_layer::ExprId) -> StorageResult> { - todo!() + let cost = PlanCost::find() + .filter(plan_cost::Column::PhysicalExpressionId.eq(expr_id)) + .order_by_desc(plan_cost::Column::EpochId) + .one(&self.db_conn) + .await?; + assert!(cost.is_some(), "Cost not found in Cost table"); + assert!(cost.clone().unwrap().is_valid, "Cost is not valid"); + Ok(cost.map(|c| c.cost)) } async fn get_group_winner_from_group_id( @@ -183,3 +297,96 @@ impl StorageLayer for ORMManager { todo!() } } + +#[cfg(test)] +mod tests { + use crate::migrate; + use sea_orm::{ConnectionTrait, Database, EntityTrait, ModelTrait}; + use serde_json::de; + + use crate::entities::event::Entity as Event; + use crate::storage_layer::StorageLayer; + use crate::TEST_DATABASE_URL; + + async fn run_migration() { + let _ = std::fs::remove_file(TEST_DATABASE_URL); + + let db = Database::connect(TEST_DATABASE_URL) + .await + .expect("Unable to connect to the database"); + + migrate(&db) + .await + .expect("Something went wrong during migration"); + + db.execute(sea_orm::Statement::from_string( + sea_orm::DatabaseBackend::Sqlite, + "PRAGMA foreign_keys = ON;".to_owned(), + )) + .await + .expect("Unable to enable foreign keys"); + } + + #[tokio::test] + async fn test_create_new_epoch() { + run_migration().await; + let mut orm_manager = super::ORMManager::new(Some(TEST_DATABASE_URL)).await; + let res = orm_manager + .create_new_epoch("source".to_string(), "data".to_string()) + .await; + println!("{:?}", res); + assert!(res.is_ok()); + assert_eq!( + super::Event::find() + .all(&orm_manager.db_conn) + .await + .unwrap() + .len(), + 1 + ); + println!( + "{:?}", + super::Event::find() + .all(&orm_manager.db_conn) + .await + .unwrap()[0] + ); + assert_eq!( + super::Event::find() + .all(&orm_manager.db_conn) + .await + .unwrap()[0] + .epoch_id, + res.unwrap() + ); + } + + #[tokio::test] + #[ignore] // Need to update all tables + async fn test_store_cost() { + run_migration().await; + let mut orm_manager = super::ORMManager::new(Some(TEST_DATABASE_URL)).await; + let epoch_id = orm_manager + .create_new_epoch("source".to_string(), "data".to_string()) + .await + .unwrap(); + let expr_id = 1; + let cost = 42; + let res = orm_manager.store_cost(expr_id, cost, epoch_id).await; + match res { + Ok(_) => assert!(true), + Err(e) => { + println!("Error: {:?}", e); + assert!(false); + } + } + let costs = super::PlanCost::find() + .all(&orm_manager.db_conn) + .await + .unwrap(); + assert_eq!(costs.len(), 1); + assert_eq!(costs[0].epoch_id, epoch_id); + assert_eq!(costs[0].physical_expression_id, expr_id); + assert_eq!(costs[0].cost, cost); + } +} From 308e82c3013f41ab32490f16e787abc33022541f Mon Sep 17 00:00:00 2001 From: unw9527 <1041593558@qq.com> Date: Wed, 6 Nov 2024 10:07:10 -0500 Subject: [PATCH 14/14] add: store cost validity check --- optd-persistent/src/orm_manager.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/optd-persistent/src/orm_manager.rs b/optd-persistent/src/orm_manager.rs index b68dc10..5a9f921 100644 --- a/optd-persistent/src/orm_manager.rs +++ b/optd-persistent/src/orm_manager.rs @@ -85,6 +85,11 @@ impl StorageLayer for ORMManager { .one(&self.db_conn) .await .unwrap(); + if epoch_exists.is_none() { + return Err(DbErr::RecordNotFound( + "EpochId not found in Event table".to_string(), + )); + } let new_cost = plan_cost::ActiveModel { physical_expression_id: ActiveValue::Set(expr_id),