Skip to content

Commit 9e10d55

Browse files
committed
fix cubesql after introducing cacheMode
1 parent 402ff8a commit 9e10d55

File tree

8 files changed

+78
-70
lines changed

8 files changed

+78
-70
lines changed

rust/cubesql/cubesql/src/compile/builder.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,6 @@ impl QueryBuilder {
150150
} else {
151151
None
152152
},
153-
cache: None,
154153
ungrouped: None,
155154
subquery_joins: None,
156155
join_hints: None,

rust/cubesql/cubesql/src/compile/engine/df/scan.rs

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,16 @@
1+
use crate::compile::date_parser::parse_date_str;
2+
use crate::{
3+
compile::{
4+
engine::df::wrapper::{CubeScanWrappedSqlNode, CubeScanWrapperNode, SqlQuery},
5+
test::find_cube_scans_deep_search,
6+
},
7+
config::ConfigObj,
8+
sql::AuthContextRef,
9+
transport::{CubeStreamReceiver, LoadRequestMeta, SpanId, TransportService},
10+
CubeError,
11+
};
112
use async_trait::async_trait;
13+
use chrono::{Datelike, NaiveDate};
214
use cubeclient::models::{V1LoadRequestQuery, V1LoadResponse};
315
pub use datafusion::{
416
arrow::{
@@ -18,28 +30,6 @@ pub use datafusion::{
1830
Partitioning, PhysicalPlanner, RecordBatchStream, SendableRecordBatchStream, Statistics,
1931
},
2032
};
21-
use futures::Stream;
22-
use log::warn;
23-
use std::{
24-
any::Any,
25-
borrow::Cow,
26-
fmt,
27-
sync::Arc,
28-
task::{Context, Poll},
29-
};
30-
31-
use crate::compile::date_parser::parse_date_str;
32-
use crate::{
33-
compile::{
34-
engine::df::wrapper::{CubeScanWrappedSqlNode, CubeScanWrapperNode, SqlQuery},
35-
test::find_cube_scans_deep_search,
36-
},
37-
config::ConfigObj,
38-
sql::AuthContextRef,
39-
transport::{CubeStreamReceiver, LoadRequestMeta, SpanId, TransportService},
40-
CubeError,
41-
};
42-
use chrono::{Datelike, NaiveDate};
4333
use datafusion::{
4434
arrow::{
4535
array::{
@@ -51,7 +41,17 @@ use datafusion::{
5141
execution::context::TaskContext,
5242
scalar::ScalarValue,
5343
};
44+
use futures::Stream;
45+
use log::warn;
5446
use serde_json::Value;
47+
use std::str::FromStr;
48+
use std::{
49+
any::Any,
50+
borrow::Cow,
51+
fmt,
52+
sync::Arc,
53+
task::{Context, Poll},
54+
};
5555

5656
#[derive(Debug, Clone, Eq, PartialEq)]
5757
pub struct RegularMember {
@@ -79,10 +79,33 @@ impl MemberField {
7979
}
8080
}
8181

82+
#[derive(Debug, Clone)]
83+
pub enum CacheMode {
84+
StaleIfSlow,
85+
StaleWhileRevalidate,
86+
MustRevalidate,
87+
NoCache,
88+
}
89+
90+
impl FromStr for CacheMode {
91+
type Err = String;
92+
93+
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
94+
match s {
95+
"stale-if-slow" => Ok(Self::StaleIfSlow),
96+
"stale-while-revalidate" => Ok(Self::StaleWhileRevalidate),
97+
"must-revalidate" => Ok(Self::MustRevalidate),
98+
"no-cache" => Ok(Self::NoCache),
99+
other => Err(format!("Unknown cache mode: {}", other)),
100+
}
101+
}
102+
}
103+
82104
#[derive(Debug, Clone)]
83105
pub struct CubeScanOptions {
84106
pub change_user: Option<String>,
85107
pub max_records: Option<usize>,
108+
pub cache_mode: Option<CacheMode>,
86109
}
87110

88111
#[derive(Debug, Clone)]
@@ -679,6 +702,7 @@ async fn load_data(
679702
meta,
680703
schema,
681704
member_fields,
705+
options.cache_mode,
682706
)
683707
.await
684708
.map_err(|err| ArrowError::ComputeError(err.to_string()))?;
@@ -1189,6 +1213,7 @@ mod tests {
11891213
_meta_fields: LoadRequestMeta,
11901214
schema: SchemaRef,
11911215
member_fields: Vec<MemberField>,
1216+
_cache_mode: Option<CacheMode>,
11921217
) -> Result<Vec<RecordBatch>, CubeError> {
11931218
let response = r#"
11941219
{
@@ -1314,6 +1339,7 @@ mod tests {
13141339
options: CubeScanOptions {
13151340
change_user: None,
13161341
max_records: None,
1342+
cache_mode: None,
13171343
},
13181344
transport: get_test_transport(),
13191345
meta: get_test_load_meta(DatabaseProtocol::PostgreSQL),

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3453,7 +3453,6 @@ impl WrappedSelectNode {
34533453
} else {
34543454
None
34553455
},
3456-
cache: None,
34573456
// TODO is it okay to just override limit?
34583457
limit: if let Some(limit) = self.limit {
34593458
Some(limit as i32)

rust/cubesql/cubesql/src/compile/rewrite/converter.rs

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
pub use super::rewriter::CubeRunner;
2-
use crate::sql::session::CacheMode;
32
use crate::{
43
compile::{
54
engine::df::{
@@ -39,8 +38,7 @@ use crate::{
3938
CubeError,
4039
};
4140
use cubeclient::models::{
42-
Cache as V1LoadRequestCache, V1LoadRequestQuery, V1LoadRequestQueryFilterItem,
43-
V1LoadRequestQueryTimeDimension,
41+
V1LoadRequestQuery, V1LoadRequestQueryFilterItem, V1LoadRequestQueryTimeDimension,
4442
};
4543
use datafusion::{
4644
arrow::datatypes::{DataType, TimeUnit},
@@ -1584,26 +1582,6 @@ impl LanguageToLogicalPlanConverter {
15841582
let mut query_time_dimensions = Vec::new();
15851583
let mut query_order = Vec::new();
15861584
let mut query_dimensions = Vec::new();
1587-
let cache_mode = &*self
1588-
.cube_context
1589-
.session_state
1590-
.cache_mode
1591-
.read()
1592-
.expect("failed to read lock for session cache_mode");
1593-
1594-
let v1_cache_mode = match cache_mode {
1595-
None => None,
1596-
Some(m) => match m {
1597-
CacheMode::StaleIfSlow => Some(V1LoadRequestCache::StaleIfSlow),
1598-
CacheMode::StaleWhileRevalidate => {
1599-
Some(V1LoadRequestCache::StaleWhileRevalidate)
1600-
}
1601-
CacheMode::MustRevalidate => Some(V1LoadRequestCache::MustRevalidate),
1602-
CacheMode::NoCache => Some(V1LoadRequestCache::NoCache),
1603-
},
1604-
};
1605-
1606-
query.cache = v1_cache_mode;
16071585

16081586
for m in members {
16091587
match m {
@@ -2077,6 +2055,13 @@ impl LanguageToLogicalPlanConverter {
20772055

20782056
let member_fields = fields.iter().map(|(_, m)| m.clone()).collect();
20792057

2058+
let cache_mode = &*self
2059+
.cube_context
2060+
.session_state
2061+
.cache_mode
2062+
.read()
2063+
.expect("failed to read lock for session cache_mode");
2064+
20802065
let node = Arc::new(CubeScanNode::new(
20812066
Arc::new(DFSchema::new_with_metadata(
20822067
fields.into_iter().map(|(f, _)| f).collect(),
@@ -2088,6 +2073,7 @@ impl LanguageToLogicalPlanConverter {
20882073
CubeScanOptions {
20892074
change_user,
20902075
max_records,
2076+
cache_mode: cache_mode.clone(),
20912077
},
20922078
alias_to_cube.into_iter().map(|(_, c)| c).unique().collect(),
20932079
self.span_id.clone(),

rust/cubesql/cubesql/src/compile/test/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pub mod test_user_change;
4747
#[cfg(test)]
4848
pub mod test_wrapper;
4949
pub mod utils;
50+
use crate::compile::engine::df::scan::CacheMode;
5051
use crate::compile::{
5152
arrow::record_batch::RecordBatch, engine::df::scan::convert_transport_response,
5253
};
@@ -887,6 +888,7 @@ impl TransportService for TestConnectionTransport {
887888
meta: LoadRequestMeta,
888889
schema: SchemaRef,
889890
member_fields: Vec<MemberField>,
891+
_cache_mode: Option<CacheMode>,
890892
) -> Result<Vec<RecordBatch>, CubeError> {
891893
{
892894
let mut calls = self.load_calls.lock().await;

rust/cubesql/cubesql/src/sql/session.rs

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use datafusion::scalar::ScalarValue;
22
use log::trace;
33
use rand::Rng;
4-
use std::str::FromStr;
54
use std::{
65
collections::HashMap,
76
sync::{Arc, LazyLock, RwLock as RwLockSync, Weak},
@@ -10,6 +9,7 @@ use std::{
109
use tokio_util::sync::CancellationToken;
1110

1211
use super::{server_manager::ServerManager, session_manager::SessionManager, AuthContextRef};
12+
use crate::compile::engine::df::scan::CacheMode;
1313
use crate::{
1414
compile::{
1515
DatabaseProtocol, DatabaseProtocolDetails, DatabaseVariable, DatabaseVariables,
@@ -57,28 +57,6 @@ pub enum QueryState {
5757
},
5858
}
5959

60-
#[derive(Debug)]
61-
pub enum CacheMode {
62-
StaleIfSlow,
63-
StaleWhileRevalidate,
64-
MustRevalidate,
65-
NoCache,
66-
}
67-
68-
impl FromStr for CacheMode {
69-
type Err = String;
70-
71-
fn from_str(s: &str) -> Result<Self, Self::Err> {
72-
match s {
73-
"stale-if-slow" => Ok(Self::StaleIfSlow),
74-
"stale-while-revalidate" => Ok(Self::StaleWhileRevalidate),
75-
"must-revalidate" => Ok(Self::MustRevalidate),
76-
"no-cache" => Ok(Self::NoCache),
77-
other => Err(format!("Unknown cache mode: {}", other)),
78-
}
79-
}
80-
}
81-
8260
#[derive(Debug)]
8361
pub struct SessionState {
8462
// connection id, immutable

rust/cubesql/cubesql/src/transport/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub type CubeMetaHierarchy = cubeclient::models::V1CubeMetaHierarchy;
1717
pub type TransportLoadResponse = cubeclient::models::V1LoadResponse;
1818
pub type TransportLoadRequestQuery = cubeclient::models::V1LoadRequestQuery;
1919
pub type TransportLoadRequest = cubeclient::models::V1LoadRequest;
20+
pub type TransportLoadRequestCacheMode = cubeclient::models::Cache;
2021
pub type TransportMetaResponse = cubeclient::models::V1MetaResponse;
2122
pub type TransportError = cubeclient::models::V1Error;
2223

rust/cubesql/cubesql/src/transport/service.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ use tokio::{
2525
};
2626
use uuid::Uuid;
2727

28+
use crate::compile::engine::df::scan::CacheMode;
29+
use crate::transport::TransportLoadRequestCacheMode;
2830
use crate::{
2931
compile::{
3032
engine::df::{
@@ -142,6 +144,7 @@ pub trait TransportService: Send + Sync + Debug {
142144
meta_fields: LoadRequestMeta,
143145
schema: SchemaRef,
144146
member_fields: Vec<MemberField>,
147+
cache_mode: Option<CacheMode>,
145148
) -> Result<Vec<RecordBatch>, CubeError>;
146149

147150
async fn load_stream(
@@ -282,6 +285,7 @@ impl TransportService for HttpTransport {
282285
meta: LoadRequestMeta,
283286
schema: SchemaRef,
284287
member_fields: Vec<MemberField>,
288+
cache_mode: Option<CacheMode>,
285289
) -> Result<Vec<RecordBatch>, CubeError> {
286290
if meta.change_user().is_some() {
287291
return Err(CubeError::internal(
@@ -290,10 +294,23 @@ impl TransportService for HttpTransport {
290294
));
291295
}
292296

297+
let v1_cache_mode = match cache_mode {
298+
None => None,
299+
Some(m) => match m {
300+
CacheMode::StaleIfSlow => Some(TransportLoadRequestCacheMode::StaleIfSlow),
301+
CacheMode::StaleWhileRevalidate => {
302+
Some(TransportLoadRequestCacheMode::StaleWhileRevalidate)
303+
}
304+
CacheMode::MustRevalidate => Some(TransportLoadRequestCacheMode::MustRevalidate),
305+
CacheMode::NoCache => Some(TransportLoadRequestCacheMode::NoCache),
306+
},
307+
};
308+
293309
// TODO: support meta_fields for HTTP
294310
let request = TransportLoadRequest {
295311
query: Some(query),
296312
query_type: Some("multi".to_string()),
313+
cache: v1_cache_mode,
297314
};
298315
let response =
299316
cube_api::load_v1(&self.get_client_config_for_ctx(ctx), Some(request)).await?;

0 commit comments

Comments
 (0)