From ea20998c62cf90e81f0cef5bebbc7b681f349354 Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Wed, 20 Nov 2024 16:07:34 -0500 Subject: [PATCH 1/7] Enable separate get and store cost & estimated_statistic in ORM --- optd-persistent/src/bin/init.rs | 4 +- optd-persistent/src/cost_model/interface.rs | 15 +- optd-persistent/src/cost_model/orm.rs | 204 ++++++++++++------ optd-persistent/src/db/init.db | Bin 147456 -> 147456 bytes optd-persistent/src/entities/plan_cost.rs | 4 +- .../cost_model/m20241029_000001_plan_cost.rs | 12 +- 6 files changed, 161 insertions(+), 78 deletions(-) diff --git a/optd-persistent/src/bin/init.rs b/optd-persistent/src/bin/init.rs index 9cc07e2..69f3c69 100644 --- a/optd-persistent/src/bin/init.rs +++ b/optd-persistent/src/bin/init.rs @@ -355,8 +355,8 @@ async fn init_all_tables() -> Result<(), sea_orm::error::DbErr> { id: Set(1), physical_expression_id: Set(1), epoch_id: Set(1), - cost: Set(json!({"compute_cost":10, "io_cost":10})), - estimated_statistic: Set(10), + cost: Set(Some(json!({"compute_cost":10, "io_cost":10}))), + estimated_statistic: Set(Some(10)), is_valid: Set(true), }; plan_cost::Entity::insert(plan_cost) diff --git a/optd-persistent/src/cost_model/interface.rs b/optd-persistent/src/cost_model/interface.rs index ee767d7..f5fd200 100644 --- a/optd-persistent/src/cost_model/interface.rs +++ b/optd-persistent/src/cost_model/interface.rs @@ -91,8 +91,6 @@ pub struct Stat { pub struct Cost { pub compute_cost: i32, pub io_cost: i32, - // Raw estimated output row count of targeted expression. - pub estimated_statistic: i32, } #[derive(Clone, Debug)] @@ -118,8 +116,13 @@ pub trait CostModelStorageLayer { epoch_option: EpochOption, ) -> StorageResult>; - async fn store_cost(&self, expr_id: ExprId, cost: Cost, epoch_id: EpochId) - -> StorageResult<()>; + async fn store_cost( + &self, + expr_id: ExprId, + cost: Option, + estimated_statistic: Option, + epoch_id: EpochId, + ) -> StorageResult<()>; async fn store_expr_stats_mappings( &self, @@ -162,9 +165,9 @@ pub trait CostModelStorageLayer { &self, expr_id: ExprId, epoch_id: EpochId, - ) -> StorageResult>; + ) -> StorageResult<(Option, Option)>; - async fn get_cost(&self, expr_id: ExprId) -> StorageResult>; + async fn get_cost(&self, expr_id: ExprId) -> StorageResult<(Option, Option)>; async fn get_attribute( &self, diff --git a/optd-persistent/src/cost_model/orm.rs b/optd-persistent/src/cost_model/orm.rs index d5b7ad6..f452eb8 100644 --- a/optd-persistent/src/cost_model/orm.rs +++ b/optd-persistent/src/cost_model/orm.rs @@ -4,7 +4,7 @@ use crate::cost_model::interface::Cost; use crate::entities::{prelude::*, *}; use crate::{BackendError, BackendManager, CostModelStorageLayer, StorageResult}; use sea_orm::prelude::{Expr, Json}; -use sea_orm::sea_query::Query; +use sea_orm::sea_query::{ExprTrait, Query}; use sea_orm::{sqlx::types::chrono::Utc, EntityTrait}; use sea_orm::{ ActiveModelTrait, ColumnTrait, Condition, DbBackend, DbErr, DeleteResult, EntityOrSelect, @@ -208,7 +208,7 @@ impl CostModelStorageLayer for BackendManager { // 0. Check if the stat already exists. If exists, get stat_id, else insert into statistic table. let stat_id = match stat.table_id { Some(table_id) => { - // TODO(lanlou): only select needed fields + // TODO: only select needed fields let res = Statistic::find() .filter(statistic::Column::TableId.eq(table_id)) .inner_join(versioned_statistic::Entity) @@ -467,47 +467,68 @@ impl CostModelStorageLayer for BackendManager { } /// TODO: documentation + /// Each record in the `plan_cost` table can contain either the cost or the estimated statistic + /// or both, but never neither. + /// The name can be misleading, since it can also return the estimated statistic. async fn get_cost_analysis( &self, expr_id: ExprId, epoch_id: EpochId, - ) -> StorageResult> { + ) -> StorageResult<(Option, Option)> { let cost = PlanCost::find() .filter(plan_cost::Column::PhysicalExpressionId.eq(expr_id)) .filter(plan_cost::Column::EpochId.eq(epoch_id)) .one(&self.db) .await?; - assert!(cost.is_some(), "Cost not found in Cost table"); - assert!(cost.clone().unwrap().is_valid, "Cost is not valid"); - Ok(cost.map(|c| Cost { - compute_cost: c.cost.get("compute_cost").unwrap().as_i64().unwrap() as i32, - io_cost: c.cost.get("io_cost").unwrap().as_i64().unwrap() as i32, - estimated_statistic: c.estimated_statistic, - })) + // When this cost is invalid or not found, we should return None + if cost.is_none() || !cost.clone().unwrap().is_valid { + return Ok((None, None)); + } + + let real_cost = cost.as_ref().and_then(|c| c.cost.as_ref()).map(|c| Cost { + compute_cost: c.get("compute_cost").unwrap().as_i64().unwrap() as i32, + io_cost: c.get("io_cost").unwrap().as_i64().unwrap() as i32, + }); + + Ok((real_cost, cost.unwrap().estimated_statistic)) } - async fn get_cost(&self, expr_id: ExprId) -> StorageResult> { + /// TODO: documentation + /// It returns the cost and estimated statistic if applicable. + /// Each record in the `plan_cost` table can contain either the cost or the estimated statistic + /// or both, but never neither. + /// The name can be misleading, since it can also return the estimated statistic. + async fn get_cost(&self, expr_id: ExprId) -> StorageResult<(Option, Option)> { let cost = PlanCost::find() .filter(plan_cost::Column::PhysicalExpressionId.eq(expr_id)) .order_by_desc(plan_cost::Column::EpochId) .one(&self.db) .await?; - assert!(cost.is_some(), "Cost not found in Cost table"); - assert!(cost.clone().unwrap().is_valid, "Cost is not valid"); - Ok(cost.map(|c| Cost { - compute_cost: c.cost.get("compute_cost").unwrap().as_i64().unwrap() as i32, - io_cost: c.cost.get("io_cost").unwrap().as_i64().unwrap() as i32, - estimated_statistic: c.estimated_statistic, - })) + // When this cost is invalid or not found, we should return None + if cost.is_none() || !cost.clone().unwrap().is_valid { + return Ok((None, None)); + } + + let real_cost = cost.as_ref().and_then(|c| c.cost.as_ref()).map(|c| Cost { + compute_cost: c.get("compute_cost").unwrap().as_i64().unwrap() as i32, + io_cost: c.get("io_cost").unwrap().as_i64().unwrap() as i32, + }); + + Ok((real_cost, cost.unwrap().estimated_statistic)) } + /// This method should handle the case when the cost is already stored. + /// The name maybe misleading, since it can also store the estimated statistic. /// TODO: documentation async fn store_cost( &self, physical_expression_id: ExprId, - cost: Cost, + cost: Option, + estimated_statistic: Option, epoch_id: EpochId, ) -> StorageResult<()> { + assert!(cost.is_some() || estimated_statistic.is_some()); + // TODO: should we do the following checks in the production environment? let expr_exists = PhysicalExpression::find_by_id(physical_expression_id) .one(&self.db) .await?; @@ -520,7 +541,6 @@ impl CostModelStorageLayer for BackendManager { .into(), )); } - // Check if epoch_id exists in Event table let epoch_exists = Event::find() .filter(event::Column::EpochId.eq(epoch_id)) @@ -533,17 +553,42 @@ impl CostModelStorageLayer for BackendManager { )); } - let new_cost = plan_cost::ActiveModel { - physical_expression_id: sea_orm::ActiveValue::Set(physical_expression_id), - epoch_id: sea_orm::ActiveValue::Set(epoch_id), - cost: sea_orm::ActiveValue::Set( - json!({"compute_cost": cost.compute_cost, "io_cost": cost.io_cost}), - ), - estimated_statistic: sea_orm::ActiveValue::Set(cost.estimated_statistic), - is_valid: sea_orm::ActiveValue::Set(true), - ..Default::default() - }; - let _ = PlanCost::insert(new_cost).exec(&self.db).await?; + let transaction = self.db.begin().await?; + + let valid_cost = PlanCost::find() + .filter(plan_cost::Column::PhysicalExpressionId.eq(physical_expression_id)) + .filter(plan_cost::Column::EpochId.eq(epoch_id)) + .filter(plan_cost::Column::IsValid.eq(true)) + .one(&transaction) + .await?; + + if valid_cost.is_some() { + let mut new_cost: plan_cost::ActiveModel = valid_cost.unwrap().into(); + if cost.is_some() { + new_cost.cost = sea_orm::ActiveValue::Set(Some(json!({ + "compute_cost": cost.clone().unwrap().compute_cost, + "io_cost": cost.clone().unwrap().io_cost + }))); + } + if estimated_statistic.is_some() { + new_cost.estimated_statistic = sea_orm::ActiveValue::Set(estimated_statistic); + } + let _ = PlanCost::update(new_cost).exec(&transaction).await?; + } else { + let new_cost = plan_cost::ActiveModel { + physical_expression_id: sea_orm::ActiveValue::Set(physical_expression_id), + epoch_id: sea_orm::ActiveValue::Set(epoch_id), + cost: sea_orm::ActiveValue::Set( + cost.map(|c| json!({"compute_cost": c.compute_cost, "io_cost": c.io_cost})), + ), + estimated_statistic: sea_orm::ActiveValue::Set(estimated_statistic), + is_valid: sea_orm::ActiveValue::Set(true), + ..Default::default() + }; + let _ = PlanCost::insert(new_cost).exec(&transaction).await?; + } + + transaction.commit().await?; Ok(()) } @@ -755,13 +800,11 @@ mod tests { backend_manager .store_cost( expr_id, - { - Cost { - compute_cost: 42, - io_cost: 42, - estimated_statistic: 42, - } - }, + Some(Cost { + compute_cost: 42, + io_cost: 42, + }), + Some(42), versioned_stat_res[0].epoch_id, ) .await @@ -826,7 +869,10 @@ mod tests { .await .unwrap(); assert_eq!(cost_res.len(), 1); - assert_eq!(cost_res[0].cost, json!({"compute_cost": 42, "io_cost": 42})); + assert_eq!( + cost_res[0].cost, + Some(json!({"compute_cost": 42, "io_cost": 42})) + ); assert_eq!(cost_res[0].epoch_id, epoch_id1); assert!(!cost_res[0].is_valid); @@ -960,10 +1006,15 @@ mod tests { let cost = Cost { compute_cost: 42, io_cost: 42, - estimated_statistic: 42, }; + let mut estimated_statistic = 42; backend_manager - .store_cost(physical_expression_id, cost.clone(), epoch_id) + .store_cost( + physical_expression_id, + Some(cost.clone()), + Some(estimated_statistic), + epoch_id, + ) .await .unwrap(); let costs = super::PlanCost::find() @@ -975,11 +1026,37 @@ mod tests { assert_eq!(costs[1].physical_expression_id, physical_expression_id); assert_eq!( costs[1].cost, - json!({"compute_cost": cost.compute_cost, "io_cost": cost.io_cost}) + Some(json!({"compute_cost": cost.compute_cost, "io_cost": cost.io_cost})) ); assert_eq!( - costs[1].estimated_statistic as i32, - cost.estimated_statistic + costs[1].estimated_statistic.unwrap() as i32, + estimated_statistic + ); + + estimated_statistic = 50; + backend_manager + .store_cost( + physical_expression_id, + None, + Some(estimated_statistic), + epoch_id, + ) + .await + .unwrap(); + let costs = super::PlanCost::find() + .all(&backend_manager.db) + .await + .unwrap(); + assert_eq!(costs.len(), 2); // We should not insert a new row + assert_eq!(costs[1].epoch_id, epoch_id); + assert_eq!(costs[1].physical_expression_id, physical_expression_id); + assert_eq!( + costs[1].cost, + Some(json!({"compute_cost": cost.compute_cost, "io_cost": cost.io_cost})) + ); + assert_eq!( + costs[1].estimated_statistic.unwrap() as i32, + estimated_statistic // The estimated_statistic should be update ); remove_db_file(DATABASE_FILE); @@ -999,10 +1076,9 @@ mod tests { let cost = Cost { compute_cost: 42, io_cost: 42, - estimated_statistic: 42, }; let _ = backend_manager - .store_cost(physical_expression_id, cost.clone(), epoch_id) + .store_cost(physical_expression_id, Some(cost.clone()), None, epoch_id) .await; let costs = super::PlanCost::find() .all(&backend_manager.db) @@ -1013,18 +1089,16 @@ mod tests { assert_eq!(costs[1].physical_expression_id, physical_expression_id); assert_eq!( costs[1].cost, - json!({"compute_cost": cost.compute_cost, "io_cost": cost.io_cost}) - ); - assert_eq!( - costs[1].estimated_statistic as i32, - cost.estimated_statistic + Some(json!({"compute_cost": cost.compute_cost, "io_cost": cost.io_cost})) ); + assert_eq!(costs[1].estimated_statistic, None); let res = backend_manager .get_cost(physical_expression_id) .await .unwrap(); - assert_eq!(res.unwrap(), cost); + assert_eq!(res.0.unwrap(), cost); + assert_eq!(res.1, None); remove_db_file(DATABASE_FILE); } @@ -1040,13 +1114,14 @@ mod tests { .await .unwrap(); let physical_expression_id = 1; - let cost = Cost { - compute_cost: 1420, - io_cost: 42, - estimated_statistic: 42, - }; + let estimated_statistic = 42; let _ = backend_manager - .store_cost(physical_expression_id, cost.clone(), epoch_id) + .store_cost( + physical_expression_id, + None, + Some(estimated_statistic), + epoch_id, + ) .await; let costs = super::PlanCost::find() .all(&backend_manager.db) @@ -1055,13 +1130,10 @@ mod tests { assert_eq!(costs.len(), 2); // The first row one is the initialized data assert_eq!(costs[1].epoch_id, epoch_id); assert_eq!(costs[1].physical_expression_id, physical_expression_id); + assert_eq!(costs[1].cost, None); assert_eq!( - costs[1].cost, - json!({"compute_cost": cost.compute_cost, "io_cost": cost.io_cost}) - ); - assert_eq!( - costs[1].estimated_statistic as i32, - cost.estimated_statistic + costs[1].estimated_statistic.unwrap() as i32, + estimated_statistic ); println!("{:?}", costs); @@ -1073,13 +1145,13 @@ mod tests { // The cost in the dummy data is 10 assert_eq!( - res.unwrap(), + res.0.unwrap(), Cost { compute_cost: 10, io_cost: 10, - estimated_statistic: 10, } ); + assert_eq!(res.1.unwrap(), 10); remove_db_file(DATABASE_FILE); } diff --git a/optd-persistent/src/db/init.db b/optd-persistent/src/db/init.db index 5350952e8707eea71c0be08a166dab40e6706137..e524ff128f0bc5f45fab1a21f1e309102b013cf3 100644 GIT binary patch delta 657 zcmZo@;B08%oFFB{a)W_^K?R7Rfbq#h9U~@|8ygdDNi*B|YfiS6(O?JC+M>*%lO1KG zC*RiLVoL`KXoyTcC?msDP?TSgT2xXA5}LeUMx6slYY8z2vrHDym7MIa%f$(jQ4wTz zXPF$QE6V~_XDzGB3KUi3pByMBVpx!qm=~X%Ut9u`ljUQ!WXUx$FfuVTFtUs{00G1J z)Uwn(u(T5|OCux59n()UGb%t#dns!McBzt_7MS*zQvkU_o0B<|W%6u26^N?&a&lnV z19GY$Sz9*dN|wnE`qGmX^rgXWG-YMZ1c~T@y{E_ma@*v8a-0zDjq<8s?JMOqLE1GL znFCoS-_utD%WhJ5AunWP5MpF#WngM$Vyb6iWNc<>wE36)J9KdqV@tEmzvAEVqp3GF zGBMfwD_?#BdngYtn<|^?#EFNuK4yH)#ppEsJU64+UK_Yr!?oh8P)H8Jk;~8tYk@nVFlLZvJKe4qe>b$jE5(ulRTTXzI<) z49zzG%9o$O?#siD}^-8mQuCmKKI)(~rtCDx-@Vm>X^X zCC_*!9!soT$jS n-FqWrJR9T0?W?yleq?6k+b+n%=!)0*(}B)Evwi+P#>7Sd5rMbT diff --git a/optd-persistent/src/entities/plan_cost.rs b/optd-persistent/src/entities/plan_cost.rs index 1acf101..713f2da 100644 --- a/optd-persistent/src/entities/plan_cost.rs +++ b/optd-persistent/src/entities/plan_cost.rs @@ -9,8 +9,8 @@ pub struct Model { pub id: i32, pub physical_expression_id: i32, pub epoch_id: i32, - pub cost: Json, - pub estimated_statistic: i32, + pub cost: Option, + pub estimated_statistic: Option, pub is_valid: bool, } diff --git a/optd-persistent/src/migrator/cost_model/m20241029_000001_plan_cost.rs b/optd-persistent/src/migrator/cost_model/m20241029_000001_plan_cost.rs index d8f9bf9..84ee676 100644 --- a/optd-persistent/src/migrator/cost_model/m20241029_000001_plan_cost.rs +++ b/optd-persistent/src/migrator/cost_model/m20241029_000001_plan_cost.rs @@ -1,6 +1,14 @@ //! When a statistic is updated, then all the related costs should be invalidated. (IsValid is set to false) //! This design (using IsValid flag) is based on the assumption that update_stats will not be called very frequently. //! It favors the compute_cost performance over the update_stats performance. +//! +//! This file stores cost like compute_cost, io_cost, network_cost, etc. for each physical expression. It also +//! stores the estimated output row count (estimated statistic) of each physical expression. +//! Sometimes we only have one of them to store, so we make Cost and EstimatedStatistic optional. But +//! one record must have at least one of them. +//! +//! TODO: Ideally, we can separate them since sometimes we only have the estimated output row count to store, +//! (when calling `derive_statistic`) but we don't have the detailed cost. use crate::migrator::cost_model::event::Event; use crate::migrator::memo::physical_expression::PhysicalExpression; @@ -49,8 +57,8 @@ impl MigrationTrait for Migration { .on_delete(ForeignKeyAction::Cascade) .on_update(ForeignKeyAction::Cascade), ) - .col(json(PlanCost::Cost)) - .col(integer(PlanCost::EstimatedStatistic)) + .col(json_null(PlanCost::Cost)) + .col(integer_null(PlanCost::EstimatedStatistic)) .col(boolean(PlanCost::IsValid)) .to_owned(), ) From 904f275fbd8dbc6f2b18c4d8db63c717c4031b73 Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Wed, 20 Nov 2024 16:48:05 -0500 Subject: [PATCH 2/7] Add get_cost & store_cost in the cost model storage layer --- optd-cost-model/src/cost_model.rs | 2 +- optd-cost-model/src/lib.rs | 41 ++++++++++++++- optd-cost-model/src/storage/mock.rs | 25 ++++++++- optd-cost-model/src/storage/mod.rs | 19 ++++++- optd-cost-model/src/storage/persistent.rs | 37 ++++++++++++- optd-persistent/src/bin/init.rs | 2 +- optd-persistent/src/cost_model/interface.rs | 10 ++-- optd-persistent/src/cost_model/orm.rs | 52 ++++++++----------- optd-persistent/src/entities/plan_cost.rs | 5 +- .../cost_model/m20241029_000001_plan_cost.rs | 2 +- schema/all_tables.dbml | 4 +- 11 files changed, 153 insertions(+), 46 deletions(-) diff --git a/optd-cost-model/src/cost_model.rs b/optd-cost-model/src/cost_model.rs index 9ae84bb..21765bc 100644 --- a/optd-cost-model/src/cost_model.rs +++ b/optd-cost-model/src/cost_model.rs @@ -15,7 +15,7 @@ use crate::{ }, memo_ext::MemoExt, stats::AttributeCombValueStats, - storage::CostModelStorageManager, + storage::{self, CostModelStorageManager}, ComputeCostContext, Cost, CostModel, CostModelResult, EstimatedStatistic, StatValue, }; diff --git a/optd-cost-model/src/lib.rs b/optd-cost-model/src/lib.rs index 68b56ac..f4de0d7 100644 --- a/optd-cost-model/src/lib.rs +++ b/optd-cost-model/src/lib.rs @@ -30,13 +30,52 @@ pub struct ComputeCostContext { } #[derive(Default, Clone, Debug, PartialOrd, PartialEq)] -pub struct Cost(pub Vec); +pub struct Cost { + pub compute_cost: f64, + pub io_cost: f64, +} + +impl From for optd_persistent::cost_model::interface::Cost { + fn from(c: Cost) -> optd_persistent::cost_model::interface::Cost { + Self { + compute_cost: c.compute_cost, + io_cost: c.io_cost, + } + } +} + +impl From for Cost { + fn from(c: optd_persistent::cost_model::interface::Cost) -> Cost { + Self { + compute_cost: c.compute_cost, + io_cost: c.io_cost, + } + } +} /// Estimated statistic calculated by the cost model. /// It is the estimated output row count of the targeted expression. #[derive(PartialEq, PartialOrd, Clone, Debug)] pub struct EstimatedStatistic(pub f64); +impl From for f32 { + fn from(e: EstimatedStatistic) -> f32 { + e.0 as f32 + } +} + +impl From for f64 { + fn from(e: EstimatedStatistic) -> f64 { + e.0 + } +} + +impl From for EstimatedStatistic { + fn from(f: f32) -> EstimatedStatistic { + Self(f as f64) + } +} + pub type CostModelResult = Result; #[derive(Debug)] diff --git a/optd-cost-model/src/storage/mock.rs b/optd-cost-model/src/storage/mock.rs index e2c9b1e..1025eba 100644 --- a/optd-cost-model/src/storage/mock.rs +++ b/optd-cost-model/src/storage/mock.rs @@ -3,7 +3,11 @@ use std::collections::HashMap; use serde::{Deserialize, Serialize}; -use crate::{common::types::TableId, stats::AttributeCombValueStats, CostModelResult}; +use crate::{ + common::types::{EpochId, ExprId, TableId}, + stats::AttributeCombValueStats, + Cost, CostModelResult, EstimatedStatistic, +}; use super::CostModelStorageManager; @@ -63,4 +67,23 @@ impl CostModelStorageManager for CostModelStorageMockManagerImpl { let table_stats = self.per_table_stats_map.get(&table_id); Ok(table_stats.map(|stats| stats.row_cnt)) } + + /// TODO: finish this when implementing the cost get/store tests + async fn get_cost( + &self, + expr_id: ExprId, + ) -> CostModelResult<(Option, Option)> { + todo!() + } + + /// TODO: finish this when implementing the cost get/store tests + async fn store_cost( + &self, + expr_id: ExprId, + cost: Option, + estimated_statistic: Option, + epoch_id: EpochId, + ) -> CostModelResult<()> { + todo!() + } } diff --git a/optd-cost-model/src/storage/mod.rs b/optd-cost-model/src/storage/mod.rs index 311da44..33a277b 100644 --- a/optd-cost-model/src/storage/mod.rs +++ b/optd-cost-model/src/storage/mod.rs @@ -1,4 +1,8 @@ -use crate::{common::types::TableId, stats::AttributeCombValueStats, CostModelResult}; +use crate::{ + common::types::{EpochId, ExprId, TableId}, + stats::AttributeCombValueStats, + Cost, CostModelResult, EstimatedStatistic, +}; pub mod mock; pub mod persistent; @@ -12,4 +16,17 @@ pub trait CostModelStorageManager { ) -> CostModelResult>; async fn get_table_row_count(&self, table_id: TableId) -> CostModelResult>; + + async fn get_cost( + &self, + expr_id: ExprId, + ) -> CostModelResult<(Option, Option)>; + + async fn store_cost( + &self, + expr_id: ExprId, + cost: Option, + estimated_statistic: Option, + epoch_id: EpochId, + ) -> CostModelResult<()>; } diff --git a/optd-cost-model/src/storage/persistent.rs b/optd-cost-model/src/storage/persistent.rs index 2238507..d4268c2 100644 --- a/optd-cost-model/src/storage/persistent.rs +++ b/optd-cost-model/src/storage/persistent.rs @@ -4,9 +4,9 @@ use std::sync::Arc; use optd_persistent::{cost_model::interface::StatType, CostModelStorageLayer}; use crate::{ - common::types::TableId, + common::types::{EpochId, ExprId, TableId}, stats::{utilities::counter::Counter, AttributeCombValueStats, Distribution, MostCommonValues}, - CostModelResult, + Cost, CostModelResult, EstimatedStatistic, }; use super::CostModelStorageManager; @@ -125,5 +125,38 @@ impl CostModelStorageManager .transpose()?) } + /// TODO: The name is misleading, since we can also get the estimated statistic. We should + /// rename it. + async fn get_cost( + &self, + expr_id: ExprId, + ) -> CostModelResult<(Option, Option)> { + let (cost, estimated_statistic) = self.backend_manager.get_cost(expr_id.into()).await?; + Ok(( + cost.map(|c| c.into()), + estimated_statistic.map(|x| x.into()), + )) + } + + /// TODO: The name is misleading, since we can also get the estimated statistic. We should + /// rename it. + async fn store_cost( + &self, + expr_id: ExprId, + cost: Option, + estimated_statistic: Option, + epoch_id: EpochId, + ) -> CostModelResult<()> { + self.backend_manager + .store_cost( + expr_id.into(), + cost.map(|c| c.into()), + estimated_statistic.map(|x| x.into()), + epoch_id.into(), + ) + .await?; + Ok(()) + } + // TODO: Support querying for a specific type of statistics. } diff --git a/optd-persistent/src/bin/init.rs b/optd-persistent/src/bin/init.rs index 69f3c69..a0b6b25 100644 --- a/optd-persistent/src/bin/init.rs +++ b/optd-persistent/src/bin/init.rs @@ -356,7 +356,7 @@ async fn init_all_tables() -> Result<(), sea_orm::error::DbErr> { physical_expression_id: Set(1), epoch_id: Set(1), cost: Set(Some(json!({"compute_cost":10, "io_cost":10}))), - estimated_statistic: Set(Some(10)), + estimated_statistic: Set(Some(10.0)), is_valid: Set(true), }; plan_cost::Entity::insert(plan_cost) diff --git a/optd-persistent/src/cost_model/interface.rs b/optd-persistent/src/cost_model/interface.rs index f5fd200..50b90b7 100644 --- a/optd-persistent/src/cost_model/interface.rs +++ b/optd-persistent/src/cost_model/interface.rs @@ -89,8 +89,8 @@ pub struct Stat { /// TODO: documentation #[derive(Clone, Debug, PartialEq)] pub struct Cost { - pub compute_cost: i32, - pub io_cost: i32, + pub compute_cost: f64, + pub io_cost: f64, } #[derive(Clone, Debug)] @@ -120,7 +120,7 @@ pub trait CostModelStorageLayer { &self, expr_id: ExprId, cost: Option, - estimated_statistic: Option, + estimated_statistic: Option, epoch_id: EpochId, ) -> StorageResult<()>; @@ -165,9 +165,9 @@ pub trait CostModelStorageLayer { &self, expr_id: ExprId, epoch_id: EpochId, - ) -> StorageResult<(Option, Option)>; + ) -> StorageResult<(Option, Option)>; - async fn get_cost(&self, expr_id: ExprId) -> StorageResult<(Option, Option)>; + async fn get_cost(&self, expr_id: ExprId) -> StorageResult<(Option, Option)>; async fn get_attribute( &self, diff --git a/optd-persistent/src/cost_model/orm.rs b/optd-persistent/src/cost_model/orm.rs index f452eb8..034c7cb 100644 --- a/optd-persistent/src/cost_model/orm.rs +++ b/optd-persistent/src/cost_model/orm.rs @@ -474,7 +474,7 @@ impl CostModelStorageLayer for BackendManager { &self, expr_id: ExprId, epoch_id: EpochId, - ) -> StorageResult<(Option, Option)> { + ) -> StorageResult<(Option, Option)> { let cost = PlanCost::find() .filter(plan_cost::Column::PhysicalExpressionId.eq(expr_id)) .filter(plan_cost::Column::EpochId.eq(epoch_id)) @@ -486,8 +486,8 @@ impl CostModelStorageLayer for BackendManager { } let real_cost = cost.as_ref().and_then(|c| c.cost.as_ref()).map(|c| Cost { - compute_cost: c.get("compute_cost").unwrap().as_i64().unwrap() as i32, - io_cost: c.get("io_cost").unwrap().as_i64().unwrap() as i32, + compute_cost: c.get("compute_cost").unwrap().as_f64().unwrap(), + io_cost: c.get("io_cost").unwrap().as_f64().unwrap(), }); Ok((real_cost, cost.unwrap().estimated_statistic)) @@ -498,7 +498,7 @@ impl CostModelStorageLayer for BackendManager { /// Each record in the `plan_cost` table can contain either the cost or the estimated statistic /// or both, but never neither. /// The name can be misleading, since it can also return the estimated statistic. - async fn get_cost(&self, expr_id: ExprId) -> StorageResult<(Option, Option)> { + async fn get_cost(&self, expr_id: ExprId) -> StorageResult<(Option, Option)> { let cost = PlanCost::find() .filter(plan_cost::Column::PhysicalExpressionId.eq(expr_id)) .order_by_desc(plan_cost::Column::EpochId) @@ -510,8 +510,8 @@ impl CostModelStorageLayer for BackendManager { } let real_cost = cost.as_ref().and_then(|c| c.cost.as_ref()).map(|c| Cost { - compute_cost: c.get("compute_cost").unwrap().as_i64().unwrap() as i32, - io_cost: c.get("io_cost").unwrap().as_i64().unwrap() as i32, + compute_cost: c.get("compute_cost").unwrap().as_f64().unwrap(), + io_cost: c.get("io_cost").unwrap().as_f64().unwrap(), }); Ok((real_cost, cost.unwrap().estimated_statistic)) @@ -524,7 +524,7 @@ impl CostModelStorageLayer for BackendManager { &self, physical_expression_id: ExprId, cost: Option, - estimated_statistic: Option, + estimated_statistic: Option, epoch_id: EpochId, ) -> StorageResult<()> { assert!(cost.is_some() || estimated_statistic.is_some()); @@ -801,10 +801,10 @@ mod tests { .store_cost( expr_id, Some(Cost { - compute_cost: 42, - io_cost: 42, + compute_cost: 42.0, + io_cost: 42.0, }), - Some(42), + Some(42.0), versioned_stat_res[0].epoch_id, ) .await @@ -1004,10 +1004,10 @@ mod tests { .unwrap(); let physical_expression_id = 1; let cost = Cost { - compute_cost: 42, - io_cost: 42, + compute_cost: 42.0, + io_cost: 42.0, }; - let mut estimated_statistic = 42; + let mut estimated_statistic = 42.0; backend_manager .store_cost( physical_expression_id, @@ -1028,12 +1028,9 @@ mod tests { costs[1].cost, Some(json!({"compute_cost": cost.compute_cost, "io_cost": cost.io_cost})) ); - assert_eq!( - costs[1].estimated_statistic.unwrap() as i32, - estimated_statistic - ); + assert_eq!(costs[1].estimated_statistic.unwrap(), estimated_statistic); - estimated_statistic = 50; + estimated_statistic = 50.0; backend_manager .store_cost( physical_expression_id, @@ -1055,7 +1052,7 @@ mod tests { Some(json!({"compute_cost": cost.compute_cost, "io_cost": cost.io_cost})) ); assert_eq!( - costs[1].estimated_statistic.unwrap() as i32, + costs[1].estimated_statistic.unwrap(), estimated_statistic // The estimated_statistic should be update ); @@ -1074,8 +1071,8 @@ mod tests { .unwrap(); let physical_expression_id = 1; let cost = Cost { - compute_cost: 42, - io_cost: 42, + compute_cost: 42.0, + io_cost: 42.0, }; let _ = backend_manager .store_cost(physical_expression_id, Some(cost.clone()), None, epoch_id) @@ -1114,7 +1111,7 @@ mod tests { .await .unwrap(); let physical_expression_id = 1; - let estimated_statistic = 42; + let estimated_statistic = 42.0; let _ = backend_manager .store_cost( physical_expression_id, @@ -1131,10 +1128,7 @@ mod tests { assert_eq!(costs[1].epoch_id, epoch_id); assert_eq!(costs[1].physical_expression_id, physical_expression_id); assert_eq!(costs[1].cost, None); - assert_eq!( - costs[1].estimated_statistic.unwrap() as i32, - estimated_statistic - ); + assert_eq!(costs[1].estimated_statistic.unwrap(), estimated_statistic); println!("{:?}", costs); // Retrieve physical_expression_id 1 and epoch_id 1 @@ -1147,11 +1141,11 @@ mod tests { assert_eq!( res.0.unwrap(), Cost { - compute_cost: 10, - io_cost: 10, + compute_cost: 10.0, + io_cost: 10.0, } ); - assert_eq!(res.1.unwrap(), 10); + assert_eq!(res.1.unwrap(), 10.0); remove_db_file(DATABASE_FILE); } diff --git a/optd-persistent/src/entities/plan_cost.rs b/optd-persistent/src/entities/plan_cost.rs index 713f2da..5d7e24b 100644 --- a/optd-persistent/src/entities/plan_cost.rs +++ b/optd-persistent/src/entities/plan_cost.rs @@ -2,7 +2,7 @@ use sea_orm::entity::prelude::*; -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel)] #[sea_orm(table_name = "plan_cost")] pub struct Model { #[sea_orm(primary_key)] @@ -10,7 +10,8 @@ pub struct Model { pub physical_expression_id: i32, pub epoch_id: i32, pub cost: Option, - pub estimated_statistic: Option, + #[sea_orm(column_type = "Float", nullable)] + pub estimated_statistic: Option, pub is_valid: bool, } diff --git a/optd-persistent/src/migrator/cost_model/m20241029_000001_plan_cost.rs b/optd-persistent/src/migrator/cost_model/m20241029_000001_plan_cost.rs index 84ee676..fdd2fef 100644 --- a/optd-persistent/src/migrator/cost_model/m20241029_000001_plan_cost.rs +++ b/optd-persistent/src/migrator/cost_model/m20241029_000001_plan_cost.rs @@ -58,7 +58,7 @@ impl MigrationTrait for Migration { .on_update(ForeignKeyAction::Cascade), ) .col(json_null(PlanCost::Cost)) - .col(integer_null(PlanCost::EstimatedStatistic)) + .col(float_null(PlanCost::EstimatedStatistic)) .col(boolean(PlanCost::IsValid)) .to_owned(), ) diff --git a/schema/all_tables.dbml b/schema/all_tables.dbml index 305075a..29a2136 100644 --- a/schema/all_tables.dbml +++ b/schema/all_tables.dbml @@ -60,9 +60,9 @@ Table plan_cost { physical_expression_id integer [ref: > physical_expression.id] epoch_id integer [ref: > event.epoch_id] // It is json type, including computation cost, I/O cost, etc. - cost json + cost json [null] // Raw estimated output row count of this expression - estimated_statistic integer + estimated_statistic float [null] // Whether the cost is valid or not. If the latest cost for an expr is invalid, then we need to recompute the cost. // We need to invalidate the cost when the related stats are updated. is_valid boolean From 23c444d9a654d40ab57b46755e1624d4c2297f01 Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Wed, 20 Nov 2024 17:48:15 -0500 Subject: [PATCH 3/7] Finish compute_operation_cost --- optd-cost-model/src/cost_model.rs | 107 +++++++++++++++++++- optd-cost-model/src/lib.rs | 20 +++- optd-cost-model/src/storage/mock.rs | 2 +- optd-cost-model/src/storage/mod.rs | 2 +- optd-cost-model/src/storage/persistent.rs | 4 +- optd-persistent/src/cost_model/interface.rs | 2 +- optd-persistent/src/cost_model/orm.rs | 53 +++++++--- optd-persistent/src/db/init.db | Bin 147456 -> 147456 bytes 8 files changed, 164 insertions(+), 26 deletions(-) diff --git a/optd-cost-model/src/cost_model.rs b/optd-cost-model/src/cost_model.rs index 21765bc..38957f9 100644 --- a/optd-cost-model/src/cost_model.rs +++ b/optd-cost-model/src/cost_model.rs @@ -43,24 +43,111 @@ impl CostModelImpl { #[async_trait::async_trait] impl CostModel for CostModelImpl { + /// TODO: should we add epoch_id? async fn compute_operation_cost( &self, - node: &PhysicalNodeType, + node: PhysicalNodeType, predicates: &[ArcPredicateNode], + children_costs: &[Cost], children_stats: &[EstimatedStatistic], context: ComputeCostContext, ) -> CostModelResult { - todo!() + let res = self.storage_manager.get_cost(context.expr_id).await; + if let Ok((Some(cost), _)) = res { + return Ok(cost); + }; + let mut output_statistic = None; + if let Ok((_, Some(statistic))) = res { + output_statistic = Some(statistic); + }; + let output_cost = match node { + PhysicalNodeType::PhysicalScan => { + let output_statistic_data = output_statistic.unwrap_or( + self.derive_statistics( + node, + predicates, + children_stats, + context.clone(), + false, + ) + .await?, + ); + output_statistic = Some(output_statistic_data.clone()); + Cost { + compute_cost: 0.0, + io_cost: output_statistic_data.0, + } + } + PhysicalNodeType::PhysicalEmptyRelation => Cost { + compute_cost: 0.1, + io_cost: 0.0, + }, + PhysicalNodeType::PhysicalLimit => Cost { + compute_cost: children_costs[0].compute_cost, + io_cost: 0.0, + }, + PhysicalNodeType::PhysicalFilter => Cost { + // TODO: now this equation is specific to optd, and try to make this equation more general + compute_cost: children_costs[1].compute_cost * children_stats[0].0, + io_cost: 0.0, + }, + PhysicalNodeType::PhysicalNestedLoopJoin(join_typ) => { + let child_compute_cost = children_costs[2].compute_cost; + Cost { + compute_cost: children_stats[0].0 * children_stats[1].0 * child_compute_cost + + children_stats[0].0, + io_cost: 0.0, + } + } + // TODO: we should document that the first child is the left table, which is used to build + // the hash table. + PhysicalNodeType::PhysicalHashJoin(join_typ) => Cost { + compute_cost: children_stats[0].0 * 2.0 + children_stats[1].0, + io_cost: 0.0, + }, + PhysicalNodeType::PhysicalAgg => Cost { + compute_cost: children_stats[0].0 + * (children_costs[1].compute_cost + children_costs[2].compute_cost), + io_cost: 0.0, + }, + PhysicalNodeType::PhysicalProjection => Cost { + compute_cost: children_stats[0].0 * children_costs[1].compute_cost, + io_cost: 0.0, + }, + PhysicalNodeType::PhysicalSort => Cost { + compute_cost: children_stats[0].0 * children_stats[0].0.ln_1p().max(1.0), + io_cost: 0.0, + }, + }; + let res = self + .storage_manager + .store_cost( + context.expr_id, + Some(output_cost.clone()), + output_statistic, + None, + ) + .await; + if res.is_err() { + eprintln!("Failed to store output cost"); + } + Ok(output_cost) } + /// TODO: should we add epoch_id? async fn derive_statistics( &self, node: PhysicalNodeType, predicates: &[ArcPredicateNode], children_statistics: &[EstimatedStatistic], context: ComputeCostContext, + store_output_statistic: bool, ) -> CostModelResult { - match node { + let res = self.storage_manager.get_cost(context.expr_id).await; + if let Ok((_, Some(statistic))) = res { + return Ok(statistic); + } + let output_statistic = match node { PhysicalNodeType::PhysicalScan => { let table_id = TableId(predicates[0].data.as_ref().unwrap().as_u64()); let row_cnt = self @@ -114,7 +201,17 @@ impl CostModel for CostModel PhysicalNodeType::PhysicalSort | PhysicalNodeType::PhysicalProjection => { Ok(children_statistics[0].clone()) } - } + }?; + if store_output_statistic { + let res = self + .storage_manager + .store_cost(context.expr_id, None, Some(output_statistic.clone()), None) + .await; + if res.is_err() { + eprintln!("Failed to store output statistic"); + } + }; + Ok(output_statistic) } async fn update_statistics( @@ -167,3 +264,5 @@ impl CostModelImpl { .await } } + +// TODO: Add tests for `derive_statistic`` and `compute_operation_cost`. diff --git a/optd-cost-model/src/lib.rs b/optd-cost-model/src/lib.rs index f4de0d7..abf1555 100644 --- a/optd-cost-model/src/lib.rs +++ b/optd-cost-model/src/lib.rs @@ -118,8 +118,9 @@ pub trait CostModel: 'static + Send + Sync { /// TODO: documentation async fn compute_operation_cost( &self, - node: &PhysicalNodeType, + node: PhysicalNodeType, predicates: &[ArcPredicateNode], + children_costs: &[Cost], children_stats: &[EstimatedStatistic], context: ComputeCostContext, ) -> CostModelResult; @@ -127,14 +128,31 @@ pub trait CostModel: 'static + Send + Sync { /// TODO: documentation /// It is for cardinality estimation. The output should be the estimated /// statistic calculated by the cost model. + /// If this method is called by `compute_operation_cost`, please set + /// `store_output_statistic` to `false`; if it is called by the optimizer, + /// please set `store_output_statistic` to `true`. Since we can store the + /// estimated statistic and cost by calling the ORM method once. + /// + /// TODO: I am not sure whether to introduce `store_output_statistic`, since + /// it add complexity to the interface, considering currently only Scan needs + /// the output row count to calculate the costs. So updating the database twice + /// seems cheap. But in the future, maybe more cost computations rely on the output + /// row count. + /// /// TODO: Consider make it a helper function, so we can store Cost in the /// ORM more easily. + /// + /// TODO: I would suggest to rename this method to `derive_row_count`, since + /// statistic is easily to be confused with the real statistic. + /// Also we need to update other places to use estimated statistic to row count, + /// either in this crate or in optd-persistent. async fn derive_statistics( &self, node: PhysicalNodeType, predicates: &[ArcPredicateNode], children_stats: &[EstimatedStatistic], context: ComputeCostContext, + store_output_statistic: bool, ) -> CostModelResult; /// TODO: documentation diff --git a/optd-cost-model/src/storage/mock.rs b/optd-cost-model/src/storage/mock.rs index 1025eba..f20c417 100644 --- a/optd-cost-model/src/storage/mock.rs +++ b/optd-cost-model/src/storage/mock.rs @@ -82,7 +82,7 @@ impl CostModelStorageManager for CostModelStorageMockManagerImpl { expr_id: ExprId, cost: Option, estimated_statistic: Option, - epoch_id: EpochId, + epoch_id: Option, ) -> CostModelResult<()> { todo!() } diff --git a/optd-cost-model/src/storage/mod.rs b/optd-cost-model/src/storage/mod.rs index 33a277b..14cccd6 100644 --- a/optd-cost-model/src/storage/mod.rs +++ b/optd-cost-model/src/storage/mod.rs @@ -27,6 +27,6 @@ pub trait CostModelStorageManager { expr_id: ExprId, cost: Option, estimated_statistic: Option, - epoch_id: EpochId, + epoch_id: Option, ) -> CostModelResult<()>; } diff --git a/optd-cost-model/src/storage/persistent.rs b/optd-cost-model/src/storage/persistent.rs index d4268c2..9d044db 100644 --- a/optd-cost-model/src/storage/persistent.rs +++ b/optd-cost-model/src/storage/persistent.rs @@ -145,14 +145,14 @@ impl CostModelStorageManager expr_id: ExprId, cost: Option, estimated_statistic: Option, - epoch_id: EpochId, + epoch_id: Option, ) -> CostModelResult<()> { self.backend_manager .store_cost( expr_id.into(), cost.map(|c| c.into()), estimated_statistic.map(|x| x.into()), - epoch_id.into(), + epoch_id.map(|id| id.into()), ) .await?; Ok(()) diff --git a/optd-persistent/src/cost_model/interface.rs b/optd-persistent/src/cost_model/interface.rs index 50b90b7..e4e4ceb 100644 --- a/optd-persistent/src/cost_model/interface.rs +++ b/optd-persistent/src/cost_model/interface.rs @@ -121,7 +121,7 @@ pub trait CostModelStorageLayer { expr_id: ExprId, cost: Option, estimated_statistic: Option, - epoch_id: EpochId, + epoch_id: Option, ) -> StorageResult<()>; async fn store_expr_stats_mappings( diff --git a/optd-persistent/src/cost_model/orm.rs b/optd-persistent/src/cost_model/orm.rs index 034c7cb..f3e70bb 100644 --- a/optd-persistent/src/cost_model/orm.rs +++ b/optd-persistent/src/cost_model/orm.rs @@ -519,13 +519,14 @@ impl CostModelStorageLayer for BackendManager { /// This method should handle the case when the cost is already stored. /// The name maybe misleading, since it can also store the estimated statistic. + /// If epoch_id is none, we pick the latest epoch_id. /// TODO: documentation async fn store_cost( &self, physical_expression_id: ExprId, cost: Option, estimated_statistic: Option, - epoch_id: EpochId, + epoch_id: Option, ) -> StorageResult<()> { assert!(cost.is_some() || estimated_statistic.is_some()); // TODO: should we do the following checks in the production environment? @@ -542,17 +543,32 @@ impl CostModelStorageLayer for BackendManager { )); } // Check if epoch_id exists in Event table - let epoch_exists = Event::find() - .filter(event::Column::EpochId.eq(epoch_id)) - .one(&self.db) - .await - .unwrap(); - if epoch_exists.is_none() { - return Err(BackendError::CostModel( - format!("epoch id {} not found when storing cost", epoch_id).into(), - )); + if epoch_id.is_some() { + let epoch_exists = Event::find() + .filter(event::Column::EpochId.eq(epoch_id.unwrap())) + .one(&self.db) + .await + .unwrap(); + if epoch_exists.is_none() { + return Err(BackendError::CostModel( + format!("epoch id {} not found when storing cost", epoch_id.unwrap()).into(), + )); + } } + let epoch_id = match epoch_id { + Some(id) => id, + None => { + // When init, please make sure there is at least one epoch in the Event table. + let latest_epoch_id = Event::find() + .order_by_desc(event::Column::EpochId) + .one(&self.db) + .await? + .unwrap(); + latest_epoch_id.epoch_id + } + }; + let transaction = self.db.begin().await?; let valid_cost = PlanCost::find() @@ -805,7 +821,7 @@ mod tests { io_cost: 42.0, }), Some(42.0), - versioned_stat_res[0].epoch_id, + Some(versioned_stat_res[0].epoch_id), ) .await .unwrap(); @@ -871,7 +887,7 @@ mod tests { assert_eq!(cost_res.len(), 1); assert_eq!( cost_res[0].cost, - Some(json!({"compute_cost": 42, "io_cost": 42})) + Some(json!({"compute_cost": 42.0, "io_cost": 42.0})) ); assert_eq!(cost_res[0].epoch_id, epoch_id1); assert!(!cost_res[0].is_valid); @@ -1013,7 +1029,7 @@ mod tests { physical_expression_id, Some(cost.clone()), Some(estimated_statistic), - epoch_id, + Some(epoch_id), ) .await .unwrap(); @@ -1036,7 +1052,7 @@ mod tests { physical_expression_id, None, Some(estimated_statistic), - epoch_id, + None, ) .await .unwrap(); @@ -1075,7 +1091,12 @@ mod tests { io_cost: 42.0, }; let _ = backend_manager - .store_cost(physical_expression_id, Some(cost.clone()), None, epoch_id) + .store_cost( + physical_expression_id, + Some(cost.clone()), + None, + Some(epoch_id), + ) .await; let costs = super::PlanCost::find() .all(&backend_manager.db) @@ -1117,7 +1138,7 @@ mod tests { physical_expression_id, None, Some(estimated_statistic), - epoch_id, + Some(epoch_id), ) .await; let costs = super::PlanCost::find() diff --git a/optd-persistent/src/db/init.db b/optd-persistent/src/db/init.db index e524ff128f0bc5f45fab1a21f1e309102b013cf3..0c3d71cbc9c86b448e4a0a1026ee9e6aec03b782 100644 GIT binary patch delta 569 zcmZo@;B08%++Z!k6ghjct&ApHu|B9+eOaS5Se^XMuw-LD8C@J zsH74kGWzf2jtX%a<**Dl`NAT^ra^&=u3m0Wy;E& z2@=r*dq|N5?4EyeoDj{8@@gQ>E9JF-nl%}j16d~D(^mq^ZBlqA&uwa8WoW5qU}9`+ zviYa|S9u<|fQ5zS=3nt&_~9ZZ#-`?*f919v{VKoCMqb~!eDEnsC8|+yuvzg5p zeVIP<&1V1u_SG9FYBO&4-pClw#yETX>g|jlnHgEO3-U0!Vt3zkp!=?DpTCbWu@L~0 Cl%Saa delta 570 zcmZo@;B08%++Z!ktsCZX=DUhKm9Z_qXNW~m$K#{ z=PAi)gD8JFMWEH%oXnvtlV|IxK$OgvlLyHikW&N7*|ITLvP^c+m!7PkFAa8hUsTOTvN=3;c3ex94r zZaY5@qbDP0tT?;4ygXw&=k!Eg#+&S!c_pdosYToE`4}Ap;bvKyn;TE>mS@z1i&&VL z7)(DZ&!`C(F*h+Z-2O|R@lrfo$jrjRaQlsVMlAz{l?>F9Hv1hQ%WHw>+ zX8OoCp8*WmS8klB$++EjBV#-pbBmE*DM*y70>O`TH0X8v(M3 Bm%#u4 From 8fb462c310624b157dc72bc484f1753a1ef420bd Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Wed, 20 Nov 2024 17:58:52 -0500 Subject: [PATCH 4/7] Refine store_cost in the ORM layer --- optd-persistent/src/cost_model/orm.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/optd-persistent/src/cost_model/orm.rs b/optd-persistent/src/cost_model/orm.rs index f3e70bb..40366d0 100644 --- a/optd-persistent/src/cost_model/orm.rs +++ b/optd-persistent/src/cost_model/orm.rs @@ -580,16 +580,27 @@ impl CostModelStorageLayer for BackendManager { if valid_cost.is_some() { let mut new_cost: plan_cost::ActiveModel = valid_cost.unwrap().into(); + let mut update = false; if cost.is_some() { - new_cost.cost = sea_orm::ActiveValue::Set(Some(json!({ + let input_cost = sea_orm::ActiveValue::Set(Some(json!({ "compute_cost": cost.clone().unwrap().compute_cost, "io_cost": cost.clone().unwrap().io_cost }))); + if new_cost.cost != input_cost { + update = true; + new_cost.cost = input_cost; + } } if estimated_statistic.is_some() { - new_cost.estimated_statistic = sea_orm::ActiveValue::Set(estimated_statistic); + let input_estimated_statistic = sea_orm::ActiveValue::Set(estimated_statistic); + if new_cost.estimated_statistic != input_estimated_statistic { + update = true; + new_cost.estimated_statistic = input_estimated_statistic; + } + } + if update { + let _ = PlanCost::update(new_cost).exec(&transaction).await?; } - let _ = PlanCost::update(new_cost).exec(&transaction).await?; } else { let new_cost = plan_cost::ActiveModel { physical_expression_id: sea_orm::ActiveValue::Set(physical_expression_id), From 4a73c3334dc51e6d3d214bf66f64a079a0d6c0a1 Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Wed, 20 Nov 2024 18:08:28 -0500 Subject: [PATCH 5/7] Improve comments --- optd-cost-model/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/optd-cost-model/src/lib.rs b/optd-cost-model/src/lib.rs index abf1555..4b65038 100644 --- a/optd-cost-model/src/lib.rs +++ b/optd-cost-model/src/lib.rs @@ -137,7 +137,8 @@ pub trait CostModel: 'static + Send + Sync { /// it add complexity to the interface, considering currently only Scan needs /// the output row count to calculate the costs. So updating the database twice /// seems cheap. But in the future, maybe more cost computations rely on the output - /// row count. + /// row count. (Of course, it should be removed if we separate the cost and + /// estimated_statistic into 2 tables.) /// /// TODO: Consider make it a helper function, so we can store Cost in the /// ORM more easily. From 8fec3c20cf291976339ec3cfe5ced8063ca41979 Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Wed, 20 Nov 2024 19:46:05 -0500 Subject: [PATCH 6/7] Apply comment suggestions --- optd-cost-model/src/storage/persistent.rs | 4 ++++ optd-persistent/src/cost_model/orm.rs | 9 +++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/optd-cost-model/src/storage/persistent.rs b/optd-cost-model/src/storage/persistent.rs index 9d044db..b3078a6 100644 --- a/optd-cost-model/src/storage/persistent.rs +++ b/optd-cost-model/src/storage/persistent.rs @@ -127,6 +127,8 @@ impl CostModelStorageManager /// TODO: The name is misleading, since we can also get the estimated statistic. We should /// rename it. + /// + /// TODO: Add retry logic here. async fn get_cost( &self, expr_id: ExprId, @@ -140,6 +142,8 @@ impl CostModelStorageManager /// TODO: The name is misleading, since we can also get the estimated statistic. We should /// rename it. + /// + /// TODO: Add retry logic here. async fn store_cost( &self, expr_id: ExprId, diff --git a/optd-persistent/src/cost_model/orm.rs b/optd-persistent/src/cost_model/orm.rs index 40366d0..748012a 100644 --- a/optd-persistent/src/cost_model/orm.rs +++ b/optd-persistent/src/cost_model/orm.rs @@ -480,8 +480,8 @@ impl CostModelStorageLayer for BackendManager { .filter(plan_cost::Column::EpochId.eq(epoch_id)) .one(&self.db) .await?; - // When this cost is invalid or not found, we should return None - if cost.is_none() || !cost.clone().unwrap().is_valid { + // When this cost is not found, we should return None + if cost.is_none() { return Ok((None, None)); } @@ -520,6 +520,11 @@ impl CostModelStorageLayer for BackendManager { /// This method should handle the case when the cost is already stored. /// The name maybe misleading, since it can also store the estimated statistic. /// If epoch_id is none, we pick the latest epoch_id. + /// + /// TODO: consider whether we need to pass the epoch_id here. When the epoch is + /// stale because someone else updates the stats while we're still computing cost, + /// what is the expected behavior? + /// /// TODO: documentation async fn store_cost( &self, From 4a6bd0128ec7cfb7fe0f8a424472831515eb355b Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Wed, 20 Nov 2024 19:56:48 -0500 Subject: [PATCH 7/7] Refine comment --- optd-persistent/src/cost_model/orm.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/optd-persistent/src/cost_model/orm.rs b/optd-persistent/src/cost_model/orm.rs index 748012a..9c068ae 100644 --- a/optd-persistent/src/cost_model/orm.rs +++ b/optd-persistent/src/cost_model/orm.rs @@ -654,6 +654,7 @@ impl CostModelStorageLayer for BackendManager { } } +// TODO: add integration tests #[cfg(test)] mod tests { use crate::cost_model::interface::{Cost, EpochOption, StatType};