Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion optd-persistent/src/bin/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ async fn init_all_tables() -> Result<(), sea_orm::error::DbErr> {
table_id: Set(Some(1)),
creation_time: Set(Utc::now()),
number_of_attributes: Set(0),
variant_tag: Set(StatType::Count as i32),
variant_tag: Set(StatType::TableRowCount as i32),
description: Set("".to_owned()),
};
let table_versioned_statistic = versioned_statistic::ActiveModel {
Expand Down
21 changes: 13 additions & 8 deletions optd-persistent/src/cost_model/catalog/mock_catalog.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use sea_orm::prelude::Json;
use serde_json::json;

use crate::cost_model::interface::{AttrType, IndexType, StatType};

/// TODO: documentation
pub struct MockDatabaseMetadata {
pub id: i32,
pub name: String,
Expand Down Expand Up @@ -30,8 +34,7 @@ pub struct MockAttribute {
pub struct MockStatistic {
pub id: i32,
pub stat_type: i32,
// TODO(lanlou): what should I use for the value type?
pub stat_value: String,
pub stat_value: Json,
pub attr_ids: Vec<i32>,
pub table_id: Option<i32>,
pub name: String,
Expand Down Expand Up @@ -71,7 +74,9 @@ pub struct MockCatalog {
pub triggers: Vec<MockTrigger>,
// TODO: constraints
}

impl MockCatalog {
/// TODO: documentation
pub fn new() -> Self {
let databases: Vec<MockDatabaseMetadata> = vec![MockDatabaseMetadata {
id: 1,
Expand Down Expand Up @@ -110,24 +115,24 @@ impl MockCatalog {
let statistics: Vec<MockStatistic> = vec![
MockStatistic {
id: 1,
stat_type: StatType::Count as i32,
stat_value: "100".to_string(),
stat_type: StatType::NotNullCount as i32,
stat_value: json!(100),
attr_ids: vec![1],
table_id: None,
name: "CountAttr1".to_string(),
},
MockStatistic {
id: 2,
stat_type: StatType::Count as i32,
stat_value: "200".to_string(),
stat_type: StatType::NotNullCount as i32,
stat_value: json!(200),
attr_ids: vec![2],
table_id: None,
name: "CountAttr2".to_string(),
},
MockStatistic {
id: 3,
stat_type: StatType::Count as i32,
stat_value: "300".to_string(),
stat_type: StatType::TableRowCount as i32,
stat_value: json!(300),
attr_ids: vec![],
table_id: Some(1),
name: "Table1Count".to_string(),
Expand Down
10 changes: 4 additions & 6 deletions optd-persistent/src/cost_model/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ pub enum ConstraintType {

/// TODO: documentation
pub enum StatType {
Count,
/// `TableRowCount` only applies to table statistics.
TableRowCount,
NotNullCount,
Cardinality,
Min,
Max,
Expand Down Expand Up @@ -78,11 +80,7 @@ pub trait CostModelStorageLayer {
// TODO: Change EpochId to event::Model::epoch_id
async fn create_new_epoch(&self, source: String, data: String) -> StorageResult<Self::EpochId>;

async fn update_stats_from_catalog(
&self,
c: CatalogSource,
epoch_id: Self::EpochId,
) -> StorageResult<()>;
async fn update_stats_from_catalog(&self, c: CatalogSource) -> StorageResult<Self::EpochId>;

async fn update_stats(
&self,
Expand Down
94 changes: 72 additions & 22 deletions optd-persistent/src/cost_model/orm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,24 @@ impl CostModelStorageLayer for BackendManager {
}

/// TODO: documentation
async fn update_stats_from_catalog(
&self,
c: CatalogSource,
epoch_id: Self::EpochId,
) -> StorageResult<()> {
async fn update_stats_from_catalog(&self, c: CatalogSource) -> StorageResult<Self::EpochId> {
let transaction = self.db.begin().await?;
let source = match c {
CatalogSource::Mock => "Mock",
CatalogSource::Iceberg() => "Iceberg",
};
let new_event = event::ActiveModel {
source_variant: sea_orm::ActiveValue::Set(source.to_string()),
timestamp: sea_orm::ActiveValue::Set(Utc::now()),
data: sea_orm::ActiveValue::Set(sea_orm::JsonValue::String(
"Update stats from catalog".to_string(),
)),
..Default::default()
};
let epoch_id = Event::insert(new_event)
.exec(&transaction)
.await?
.last_insert_id;
match c {
CatalogSource::Mock => {
let mock_catalog = MockCatalog::new();
Expand All @@ -66,7 +79,7 @@ impl CostModelStorageLayer for BackendManager {
..Default::default()
}
}))
.exec(&self.db)
.exec(&transaction)
.await?;
NamespaceMetadata::insert_many(mock_catalog.namespaces.iter().map(|namespace| {
namespace_metadata::ActiveModel {
Expand All @@ -76,7 +89,7 @@ impl CostModelStorageLayer for BackendManager {
..Default::default()
}
}))
.exec(&self.db)
.exec(&transaction)
.await?;
TableMetadata::insert_many(mock_catalog.tables.iter().map(|table| {
table_metadata::ActiveModel {
Expand All @@ -86,7 +99,7 @@ impl CostModelStorageLayer for BackendManager {
..Default::default()
}
}))
.exec(&self.db)
.exec(&transaction)
.await?;
Attribute::insert_many(mock_catalog.attributes.iter().map(|attr| {
attribute::ActiveModel {
Expand All @@ -101,7 +114,7 @@ impl CostModelStorageLayer for BackendManager {
..Default::default()
}
}))
.exec(&self.db)
.exec(&transaction)
.await?;
Statistic::insert_many(mock_catalog.statistics.iter().map(|stat| {
statistic::ActiveModel {
Expand All @@ -116,7 +129,29 @@ impl CostModelStorageLayer for BackendManager {
..Default::default()
}
}))
.exec(&self.db)
.exec(&transaction)
.await?;
VersionedStatistic::insert_many(mock_catalog.statistics.iter().map(|stat| {
versioned_statistic::ActiveModel {
epoch_id: sea_orm::ActiveValue::Set(epoch_id),
statistic_id: sea_orm::ActiveValue::Set(stat.id),
statistic_value: sea_orm::ActiveValue::Set(stat.stat_value.clone()),
..Default::default()
}
}))
.exec(&transaction)
.await?;
StatisticToAttributeJunction::insert_many(mock_catalog.statistics.iter().flat_map(
|stat| {
stat.attr_ids.iter().map(move |attr_id| {
statistic_to_attribute_junction::ActiveModel {
statistic_id: sea_orm::ActiveValue::Set(stat.id),
attribute_id: sea_orm::ActiveValue::Set(*attr_id),
}
})
},
))
.exec(&transaction)
.await?;
IndexMetadata::insert_many(
mock_catalog
Expand All @@ -140,12 +175,14 @@ impl CostModelStorageLayer for BackendManager {
..Default::default()
}),
)
.exec(&self.db)
.exec(&transaction)
.await?;
Ok(())
// TODO: initialize constraints
}
CatalogSource::Iceberg() => todo!(),
}
transaction.commit().await?;
Ok(epoch_id)
}

/// TODO: improve the documentation
Expand Down Expand Up @@ -551,15 +588,28 @@ mod tests {
let mut binding = super::BackendManager::new(Some(&database_url)).await;
let backend_manager = binding.as_mut().unwrap();
let res = backend_manager
.update_stats_from_catalog(super::CatalogSource::Mock, 1)
.update_stats_from_catalog(super::CatalogSource::Mock)
.await;
println!("{:?}", res);
assert!(res.is_ok());
let epoch_id = res.unwrap();
assert_eq!(epoch_id, 1);

let lookup_res = Statistic::find().all(&backend_manager.db).await.unwrap();
println!("{:?}", lookup_res);
assert_eq!(lookup_res.len(), 3);

let stat_res = backend_manager
.get_stats_for_table(1, StatType::TableRowCount as i32, Some(epoch_id))
.await;
assert!(stat_res.is_ok());
assert_eq!(stat_res.unwrap().unwrap(), json!(300));
let stat_res = backend_manager
.get_stats_for_attr([2].to_vec(), StatType::NotNullCount as i32, None)
.await;
assert!(stat_res.is_ok());
assert_eq!(stat_res.unwrap().unwrap(), json!(200));

remove_db_file(DATABASE_FILE);
}

Expand All @@ -575,7 +625,7 @@ mod tests {
.await
.unwrap();
let stat = Stat {
stat_type: StatType::Count as i32,
stat_type: StatType::NotNullCount as i32,
stat_value: json!(100),
attr_ids: vec![1],
table_id: None,
Expand All @@ -594,7 +644,7 @@ mod tests {
println!("{:?}", stat_res);
assert_eq!(stat_res[0].number_of_attributes, 1);
assert_eq!(stat_res[0].description, "1".to_string());
assert_eq!(stat_res[0].variant_tag, StatType::Count as i32);
assert_eq!(stat_res[0].variant_tag, StatType::NotNullCount as i32);
let stat_attr_res = StatisticToAttributeJunction::find()
.filter(statistic_to_attribute_junction::Column::StatisticId.eq(stat_res[0].id))
.all(&backend_manager.db)
Expand Down Expand Up @@ -647,7 +697,7 @@ mod tests {
.await
.unwrap();
let stat2 = Stat {
stat_type: StatType::Count as i32,
stat_type: StatType::NotNullCount as i32,
stat_value: json!(200),
attr_ids: vec![1],
table_id: None,
Expand Down Expand Up @@ -701,7 +751,7 @@ mod tests {
// 3. Update existed stat with the same value
let epoch_num = Event::find().all(&backend_manager.db).await.unwrap().len();
let stat3 = Stat {
stat_type: StatType::Count as i32,
stat_type: StatType::NotNullCount as i32,
stat_value: json!(200),
attr_ids: vec![1],
table_id: None,
Expand Down Expand Up @@ -742,21 +792,21 @@ mod tests {

let statistics: Vec<Stat> = vec![
Stat {
stat_type: StatType::Count as i32,
stat_type: StatType::TableRowCount as i32,
stat_value: json!(0),
attr_ids: vec![],
table_id: Some(1),
name: "row_count".to_string(),
},
Stat {
stat_type: StatType::Count as i32,
stat_type: StatType::TableRowCount as i32,
stat_value: json!(20),
attr_ids: vec![],
table_id: Some(1),
name: "row_count".to_string(),
},
Stat {
stat_type: StatType::Count as i32,
stat_type: StatType::TableRowCount as i32,
stat_value: json!(100),
attr_ids: vec![],
table_id: Some(table_inserted_res.last_insert_id),
Expand Down Expand Up @@ -920,7 +970,7 @@ mod tests {
let backend_manager = binding.as_mut().unwrap();
let epoch_id = 1;
let table_id = 1;
let stat_type = StatType::Count as i32;
let stat_type = StatType::TableRowCount as i32;

// Get initial stats
let res = backend_manager
Expand All @@ -937,7 +987,7 @@ mod tests {
.await
.unwrap();
let stat = Stat {
stat_type: StatType::Count as i32,
stat_type: StatType::TableRowCount as i32,
stat_value: json!(100),
attr_ids: vec![],
table_id: Some(table_id),
Expand Down
Binary file modified optd-persistent/src/db/init.db
Binary file not shown.
13 changes: 9 additions & 4 deletions optd-persistent/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(dead_code)]

use std::cell::LazyCell;
use std::sync::LazyLock;

use sea_orm::*;
use sea_orm_migration::prelude::*;
Expand All @@ -13,11 +13,15 @@ mod migrator;
pub mod cost_model;
pub use cost_model::interface::CostModelStorageLayer;

/// The filename of the SQLite database for migration.
pub const DATABASE_FILENAME: &str = "sqlite.db";
/// The URL of the SQLite database for migration.
pub const DATABASE_URL: &str = "sqlite:./sqlite.db?mode=rwc";

/// The filename of the SQLite database for testing.
pub const TEST_DATABASE_FILENAME: &str = "init.db";
pub const TEST_DATABASE_FILE: LazyCell<String> = LazyCell::new(|| {
/// The URL of the SQLite database for testing.
pub static TEST_DATABASE_FILE: LazyLock<String> = LazyLock::new(|| {
std::env::current_dir()
.unwrap()
.join("src")
Expand All @@ -27,8 +31,9 @@ pub const TEST_DATABASE_FILE: LazyCell<String> = LazyCell::new(|| {
.unwrap()
.to_owned()
});
pub const TEST_DATABASE_URL: LazyCell<String> =
LazyCell::new(|| get_sqlite_url(TEST_DATABASE_FILE.as_str()));
/// The URL of the SQLite database for testing.
pub static TEST_DATABASE_URL: LazyLock<String> =
LazyLock::new(|| get_sqlite_url(TEST_DATABASE_FILE.as_str()));

fn get_sqlite_url(file: &str) -> String {
format!("sqlite:{}?mode=rwc", file)
Expand Down