Skip to content

Commit 986cf00

Browse files
authored
feat(cost-model): introduce attributes & stats methods in ORM (#34)
1 parent fe0041a commit 986cf00

File tree

11 files changed

+445
-128
lines changed

11 files changed

+445
-128
lines changed

optd-cost-model/src/common/types.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,33 @@ impl Display for EpochId {
4949
write!(f, "Epoch#{}", self.0)
5050
}
5151
}
52+
53+
impl From<GroupId> for i32 {
54+
fn from(id: GroupId) -> i32 {
55+
id.0 as i32
56+
}
57+
}
58+
59+
impl From<ExprId> for i32 {
60+
fn from(id: ExprId) -> i32 {
61+
id.0 as i32
62+
}
63+
}
64+
65+
impl From<TableId> for i32 {
66+
fn from(id: TableId) -> i32 {
67+
id.0 as i32
68+
}
69+
}
70+
71+
impl From<AttrId> for i32 {
72+
fn from(id: AttrId) -> i32 {
73+
id.0 as i32
74+
}
75+
}
76+
77+
impl From<EpochId> for i32 {
78+
fn from(id: EpochId) -> i32 {
79+
id.0 as i32
80+
}
81+
}

optd-cost-model/src/lib.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ use common::{
22
nodes::{ArcPredicateNode, PhysicalNodeType},
33
types::{AttrId, EpochId, ExprId, GroupId, TableId},
44
};
5-
use optd_persistent::cost_model::interface::{Stat, StatType};
5+
use optd_persistent::{
6+
cost_model::interface::{Stat, StatType},
7+
BackendError,
8+
};
69

710
pub mod common;
811
pub mod cost;
@@ -32,10 +35,25 @@ pub struct EstimatedStatistic(pub u64);
3235

3336
pub type CostModelResult<T> = Result<T, CostModelError>;
3437

38+
#[derive(Debug)]
39+
pub enum SemanticError {
40+
// TODO: Add more error types
41+
UnknownStatisticType,
42+
VersionedStatisticNotFound,
43+
AttributeNotFound(TableId, i32), // (table_id, attribute_base_index)
44+
}
45+
3546
#[derive(Debug)]
3647
pub enum CostModelError {
3748
// TODO: Add more error types
38-
ORMError,
49+
ORMError(BackendError),
50+
SemanticError(SemanticError),
51+
}
52+
53+
impl From<BackendError> for CostModelError {
54+
fn from(err: BackendError) -> Self {
55+
CostModelError::ORMError(err)
56+
}
3957
}
4058

4159
pub trait CostModel: 'static + Send + Sync {

optd-cost-model/src/stats/counter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};
66

77
/// The Counter structure to track exact frequencies of fixed elements.
88
#[serde_with::serde_as]
9-
#[derive(Serialize, Deserialize, Debug)]
9+
#[derive(Default, Serialize, Deserialize, Debug)]
1010
pub struct Counter<T: PartialEq + Eq + Hash + Clone + Serialize + DeserializeOwned> {
1111
#[serde_as(as = "HashMap<serde_with::json::JsonString, _>")]
1212
counts: HashMap<T, i32>, // The exact counts of an element T.
@@ -33,7 +33,7 @@ where
3333
}
3434

3535
// Inserts an element in the Counter if it is being tracked.
36-
pub fn insert_element(&mut self, elem: T, occ: i32) {
36+
fn insert_element(&mut self, elem: T, occ: i32) {
3737
if let Some(frequency) = self.counts.get_mut(&elem) {
3838
*frequency += occ;
3939
}

optd-cost-model/src/stats/mod.rs

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ impl MostCommonValues {
7272
}
7373

7474
#[derive(Serialize, Deserialize, Debug)]
75+
#[serde(tag = "type")]
7576
pub enum Distribution {
7677
TDigest(tdigest::TDigest<Value>),
7778
// Add more types here...
@@ -116,8 +117,61 @@ impl AttributeCombValueStats {
116117
}
117118
}
118119

119-
impl From<serde_json::Value> for AttributeCombValueStats {
120-
fn from(value: serde_json::Value) -> Self {
121-
serde_json::from_value(value).unwrap()
120+
#[cfg(test)]
121+
mod tests {
122+
use super::{Counter, MostCommonValues};
123+
use crate::{common::values::Value, stats::AttributeCombValue};
124+
use serde_json::json;
125+
126+
#[test]
127+
fn test_most_common_values() {
128+
let elem1 = vec![Some(Value::Int32(1))];
129+
let elem2 = vec![Some(Value::Int32(2))];
130+
let mut counter = Counter::new(&[elem1.clone(), elem2.clone()]);
131+
132+
let elems = vec![elem2.clone(), elem1.clone(), elem2.clone(), elem2.clone()];
133+
counter.aggregate(&elems);
134+
135+
let mcvs = MostCommonValues::Counter(counter);
136+
assert_eq!(mcvs.freq(&elem1), Some(0.25));
137+
assert_eq!(mcvs.freq(&elem2), Some(0.75));
138+
assert_eq!(mcvs.total_freq(), 1.0);
139+
140+
let elem1_cloned = elem1.clone();
141+
let pred1 = Box::new(move |x: &AttributeCombValue| x == &elem1_cloned);
142+
let pred2 = Box::new(move |x: &AttributeCombValue| x != &elem1);
143+
assert_eq!(mcvs.freq_over_pred(pred1), 0.25);
144+
assert_eq!(mcvs.freq_over_pred(pred2), 0.75);
145+
146+
assert_eq!(mcvs.cnt(), 2);
147+
}
148+
149+
#[test]
150+
fn test_most_common_values_serde() {
151+
let elem1 = vec![Some(Value::Int32(1))];
152+
let elem2 = vec![Some(Value::Int32(2))];
153+
let mut counter = Counter::new(&[elem1.clone(), elem2.clone()]);
154+
155+
let elems = vec![elem2.clone(), elem1.clone(), elem2.clone(), elem2.clone()];
156+
counter.aggregate(&elems);
157+
158+
let mcvs = MostCommonValues::Counter(counter);
159+
let serialized = serde_json::to_value(&mcvs).unwrap();
160+
println!("serialized: {:?}", serialized);
161+
162+
let deserialized: MostCommonValues = serde_json::from_value(serialized).unwrap();
163+
assert_eq!(mcvs.freq(&elem1), Some(0.25));
164+
assert_eq!(mcvs.freq(&elem2), Some(0.75));
165+
assert_eq!(mcvs.total_freq(), 1.0);
166+
167+
let elem1_cloned = elem1.clone();
168+
let pred1 = Box::new(move |x: &AttributeCombValue| x == &elem1_cloned);
169+
let pred2 = Box::new(move |x: &AttributeCombValue| x != &elem1);
170+
assert_eq!(mcvs.freq_over_pred(pred1), 0.25);
171+
assert_eq!(mcvs.freq_over_pred(pred2), 0.75);
172+
173+
assert_eq!(mcvs.cnt(), 2);
122174
}
175+
176+
// TODO: Add tests for Distribution
123177
}

optd-cost-model/src/storage.rs

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
1+
#![allow(unused_variables)]
12
use std::sync::Arc;
23

3-
use optd_persistent::CostModelStorageLayer;
4+
use optd_persistent::{
5+
cost_model::interface::{Attr, StatType},
6+
CostModelStorageLayer,
7+
};
8+
9+
use crate::{
10+
common::types::TableId,
11+
stats::{counter::Counter, AttributeCombValueStats, Distribution, MostCommonValues},
12+
CostModelResult,
13+
};
414

515
/// TODO: documentation
616
pub struct CostModelStorageManager<S: CostModelStorageLayer> {
@@ -9,8 +19,113 @@ pub struct CostModelStorageManager<S: CostModelStorageLayer> {
919
}
1020

1121
impl<S: CostModelStorageLayer> CostModelStorageManager<S> {
12-
/// TODO: documentation
1322
pub fn new(backend_manager: Arc<S>) -> Self {
1423
Self { backend_manager }
1524
}
25+
26+
/// Gets the attribute information for a given table and attribute base index.
27+
///
28+
/// TODO: if we have memory cache,
29+
/// we should add the reference. (&Attr)
30+
pub async fn get_attribute_info(
31+
&self,
32+
table_id: TableId,
33+
attr_base_index: i32,
34+
) -> CostModelResult<Option<Attr>> {
35+
Ok(self
36+
.backend_manager
37+
.get_attribute(table_id.into(), attr_base_index)
38+
.await?)
39+
}
40+
41+
/// Gets the latest statistics for a given table.
42+
///
43+
/// TODO: Currently, in `AttributeCombValueStats`, only `Distribution` is optional.
44+
/// This poses a question about the behavior of the system if there is no corresponding
45+
/// `MostCommonValues`, `ndistinct`, or other statistics. We should have a clear
46+
/// specification about the behavior of the system in the presence of missing statistics.
47+
///
48+
/// TODO: if we have memory cache,
49+
/// we should add the reference. (&AttributeCombValueStats)
50+
///
51+
/// TODO: Shall we pass in an epoch here to make sure that the statistics are from the same
52+
/// epoch?
53+
pub async fn get_attributes_comb_statistics(
54+
&self,
55+
table_id: TableId,
56+
attr_base_indices: &[i32],
57+
) -> CostModelResult<Option<AttributeCombValueStats>> {
58+
let dist: Option<Distribution> = self
59+
.backend_manager
60+
.get_stats_for_attr_indices_based(
61+
table_id.into(),
62+
attr_base_indices.to_vec(),
63+
StatType::Distribution,
64+
None,
65+
)
66+
.await?
67+
.map(|json| serde_json::from_value(json).unwrap());
68+
69+
let mcvs = self
70+
.backend_manager
71+
.get_stats_for_attr_indices_based(
72+
table_id.into(),
73+
attr_base_indices.to_vec(),
74+
StatType::MostCommonValues,
75+
None,
76+
)
77+
.await?
78+
.map(|json| serde_json::from_value(json).unwrap())
79+
.unwrap_or_else(|| MostCommonValues::Counter(Counter::default()));
80+
81+
let ndistinct = self
82+
.backend_manager
83+
.get_stats_for_attr_indices_based(
84+
table_id.into(),
85+
attr_base_indices.to_vec(),
86+
StatType::Cardinality,
87+
None,
88+
)
89+
.await?
90+
.map(|json| serde_json::from_value(json).unwrap())
91+
.unwrap_or(0);
92+
93+
let table_row_count = self
94+
.backend_manager
95+
.get_stats_for_attr_indices_based(
96+
table_id.into(),
97+
attr_base_indices.to_vec(),
98+
StatType::TableRowCount,
99+
None,
100+
)
101+
.await?
102+
.map(|json| serde_json::from_value(json).unwrap())
103+
.unwrap_or(0);
104+
let non_null_count = self
105+
.backend_manager
106+
.get_stats_for_attr_indices_based(
107+
table_id.into(),
108+
attr_base_indices.to_vec(),
109+
StatType::NonNullCount,
110+
None,
111+
)
112+
.await?
113+
.map(|json| serde_json::from_value(json).unwrap())
114+
.unwrap_or(0);
115+
116+
// FIXME: Only minimal checks for invalid values is conducted here. We should have
117+
// much clear specification about the behavior of the system in the presence of
118+
// invalid statistics.
119+
let null_frac = if table_row_count == 0 {
120+
0.0
121+
} else {
122+
1.0 - (non_null_count as f64 / table_row_count as f64)
123+
};
124+
125+
Ok(Some(AttributeCombValueStats::new(
126+
mcvs, ndistinct, null_frac, dist,
127+
)))
128+
}
16129
}
130+
131+
// TODO: add some tests, especially cover the error cases.

optd-persistent/src/bin/init.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ async fn init_all_tables() -> Result<(), sea_orm::error::DbErr> {
6363
name: Set("user_id".to_owned()),
6464
compression_method: Set("N".to_owned()),
6565
variant_tag: Set(AttrType::Integer as i32),
66-
base_attribute_number: Set(1),
66+
base_attribute_number: Set(0),
6767
is_not_null: Set(true),
6868
};
6969
let attribute2 = attribute::ActiveModel {
@@ -72,7 +72,7 @@ async fn init_all_tables() -> Result<(), sea_orm::error::DbErr> {
7272
name: Set("username".to_owned()),
7373
compression_method: Set("N".to_owned()),
7474
variant_tag: Set(AttrType::Varchar as i32),
75-
base_attribute_number: Set(2),
75+
base_attribute_number: Set(1),
7676
is_not_null: Set(true),
7777
};
7878
attribute::Entity::insert(attribute1)

optd-persistent/src/cost_model/catalog/mock_catalog.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,15 +115,15 @@ impl MockCatalog {
115115
let statistics: Vec<MockStatistic> = vec![
116116
MockStatistic {
117117
id: 1,
118-
stat_type: StatType::NotNullCount as i32,
118+
stat_type: StatType::NonNullCount as i32,
119119
stat_value: json!(100),
120120
attr_ids: vec![1],
121121
table_id: None,
122122
name: "CountAttr1".to_string(),
123123
},
124124
MockStatistic {
125125
id: 2,
126-
stat_type: StatType::NotNullCount as i32,
126+
stat_type: StatType::NonNullCount as i32,
127127
stat_value: json!(200),
128128
attr_ids: vec![2],
129129
table_id: None,

0 commit comments

Comments
 (0)