Skip to content

Commit 79888af

Browse files
authored
feat(cubesql): Query rewrite cache (#7647)
* feat(cubesql): Introduce query rewrite cache * Bind cache to protocol and fix tests * Fix tests * Remove mock calls check as it isn't predictable * Do not cache LogicalPlan as it provides security checks * Fix queries cache key * Missing default hashing for language nodes introduces cache key ambiguity for ScalarVariableExprVariable * Do not eval stable function such as timestamps before putting rewrite to cache Add ConstantFolding to aliases and aliases push down for FilterSimplifyReplacer so it can find Alias wrapped stable functions and fold them to constant * Disable cache by default but enable in tests
1 parent 958b88d commit 79888af

File tree

28 files changed

+971
-755
lines changed

28 files changed

+971
-755
lines changed

.github/workflows/rust-cubesql.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ jobs:
8181
CUBESQL_TESTING_CUBE_URL: ${{ secrets.CUBESQL_TESTING_CUBE_URL }}
8282
CUBESQL_REWRITE_ENGINE: true
8383
CUBESQL_SQL_PUSH_DOWN: true
84+
CUBESQL_REWRITE_CACHE: true
8485
CUBESQL_REWRITE_TIMEOUT: 60
8586
run: cd rust/cubesql && cargo tarpaulin --workspace --no-fail-fast --avoid-cfg-tarpaulin --out Xml
8687
- name: Upload code coverage
@@ -119,6 +120,7 @@ jobs:
119120
CUBESQL_TESTING_CUBE_TOKEN: ${{ secrets.CUBESQL_TESTING_CUBE_TOKEN }}
120121
CUBESQL_TESTING_CUBE_URL: ${{ secrets.CUBESQL_TESTING_CUBE_URL }}
121122
CUBESQL_SQL_PUSH_DOWN: true
123+
CUBESQL_REWRITE_CACHE: true
122124
run: cd rust/cubesql && cargo test
123125

124126
native_linux:

packages/cubejs-backend-native/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/cubejs-backend-native/test/sql.test.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,6 @@ describe('SQLInterface', () => {
240240
password: null,
241241
});
242242

243-
expect(meta.mock.calls.length).toEqual(3);
244243
expect(meta.mock.calls[0][0]).toEqual({
245244
request: {
246245
id: expect.any(String),

rust/cubesql/Cargo.lock

Lines changed: 15 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubesql/cubesql/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ sha1_smol = "1.0.0"
6161
tera = { version = "1", default-features = false }
6262
minijinja = { version = "1", features = ["json", "loader"] }
6363
lru = "0.12.1"
64+
sha2 = "0.10.8"
6465

6566

6667
[dev-dependencies]

rust/cubesql/cubesql/src/compile/engine/udf.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4805,7 +4805,20 @@ pub fn register_fun_stubs(mut ctx: SessionContext) -> SessionContext {
48054805
vol = Stable
48064806
);
48074807

4808-
register_fun_stub!(udf, "eval_now", argc = 0, rettyp = Timestamp, vol = Stable);
4808+
register_fun_stub!(
4809+
udf,
4810+
"eval_now",
4811+
argc = 0,
4812+
rettyp = TimestampTz,
4813+
vol = Stable
4814+
);
4815+
register_fun_stub!(
4816+
udf,
4817+
"eval_utc_timestamp",
4818+
argc = 0,
4819+
rettyp = Timestamp,
4820+
vol = Stable
4821+
);
48094822

48104823
ctx
48114824
}

rust/cubesql/cubesql/src/compile/mod.rs

Lines changed: 67 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use datafusion::{
1515
projection_drop_out::ProjectionDropOut,
1616
utils::from_plan,
1717
},
18-
physical_plan::ExecutionPlan,
18+
physical_plan::{planner::DefaultPhysicalPlanner, ExecutionPlan},
1919
prelude::*,
2020
scalar::ScalarValue,
2121
sql::{parser::Statement as DFStatement, planner::SqlToRel},
@@ -105,7 +105,10 @@ pub mod test;
105105

106106
pub use crate::transport::ctx::*;
107107
use crate::{
108-
compile::engine::df::wrapper::CubeScanWrapperNode,
108+
compile::{
109+
engine::df::wrapper::CubeScanWrapperNode,
110+
rewrite::{analysis::LogicalPlanAnalysis, rewriter::Rewriter},
111+
},
109112
transport::{LoadRequestMeta, SpanId, TransportService},
110113
};
111114
pub use error::{CompilationError, CompilationResult};
@@ -1387,6 +1390,8 @@ WHERE `TABLE_SCHEMA` = '{}'",
13871390
qtrace: &mut Option<Qtrace>,
13881391
span_id: Option<Arc<SpanId>>,
13891392
) -> CompilationResult<QueryPlan> {
1393+
self.reauthenticate_if_needed().await?;
1394+
13901395
match &stmt {
13911396
ast::Statement::Query(query) => match &query.body {
13921397
ast::SetExpr::Select(select) if select.into.is_some() => {
@@ -1451,15 +1456,64 @@ WHERE `TABLE_SCHEMA` = '{}'",
14511456
qtrace.set_optimized_plan(&optimized_plan);
14521457
}
14531458

1454-
let mut converter = LogicalPlanToLanguageConverter::new(Arc::new(cube_ctx));
1459+
let cube_ctx = Arc::new(cube_ctx);
1460+
let mut converter = LogicalPlanToLanguageConverter::new(cube_ctx.clone());
1461+
let mut query_params = Some(HashMap::new());
14551462
let root = converter
1456-
.add_logical_plan(&optimized_plan)
1463+
.add_logical_plan_replace_params(&optimized_plan, &mut query_params)
14571464
.map_err(|e| CompilationError::internal(e.to_string()))?;
14581465

1459-
self.reauthenticate_if_needed().await?;
1466+
let mut finalized_graph = self
1467+
.session_manager
1468+
.server
1469+
.compiler_cache
1470+
.rewrite(
1471+
self.state.auth_context().unwrap(),
1472+
cube_ctx.clone(),
1473+
converter.take_egraph(),
1474+
&query_params.unwrap(),
1475+
qtrace,
1476+
)
1477+
.await
1478+
.map_err(|e| match e.cause {
1479+
CubeErrorCauseType::Internal(_) => CompilationError::Internal(
1480+
format!(
1481+
"Error during rewrite: {}. Please check logs for additional information.",
1482+
e.message
1483+
),
1484+
e.to_backtrace().unwrap_or_else(|| Backtrace::capture()),
1485+
Some(HashMap::from([
1486+
("query".to_string(), stmt.to_string()),
1487+
(
1488+
"sanitizedQuery".to_string(),
1489+
SensitiveDataSanitizer::new().replace(&stmt).to_string(),
1490+
),
1491+
])),
1492+
),
1493+
CubeErrorCauseType::User(_) => CompilationError::User(
1494+
format!(
1495+
"Error during rewrite: {}. Please check logs for additional information.",
1496+
e.message
1497+
),
1498+
Some(HashMap::from([
1499+
("query".to_string(), stmt.to_string()),
1500+
(
1501+
"sanitizedQuery".to_string(),
1502+
SensitiveDataSanitizer::new().replace(&stmt).to_string(),
1503+
),
1504+
])),
1505+
),
1506+
})?;
14601507

1461-
let result = converter
1462-
.take_rewriter()
1508+
// Replace Analysis as at least time has changed but it might be also context may affect rewriting in some other ways
1509+
finalized_graph.analysis = LogicalPlanAnalysis::new(
1510+
cube_ctx.clone(),
1511+
Arc::new(DefaultPhysicalPlanner::default()),
1512+
);
1513+
1514+
let mut rewriter = Rewriter::new(finalized_graph, cube_ctx.clone());
1515+
1516+
let result = rewriter
14631517
.find_best_plan(
14641518
root,
14651519
self.state.auth_context().unwrap(),
@@ -3244,15 +3298,9 @@ mod tests {
32443298

32453299
let logical_plan = &query_plan.print(true).unwrap();
32463300

3247-
let re = Regex::new(r"TimestampNanosecond\(\d+, None\)").unwrap();
3248-
let logical_plan = re
3249-
.replace_all(logical_plan, "TimestampNanosecond(0, None)")
3250-
.as_ref()
3251-
.to_string();
3252-
32533301
assert_eq!(
32543302
logical_plan,
3255-
"Projection: TimestampNanosecond(0, None) AS COL\
3303+
"Projection: CAST(utctimestamp() AS current_timestamp() AS Timestamp(Nanosecond, None)) AS COL\
32563304
\n EmptyRelation",
32573305
);
32583306
}
@@ -15679,7 +15727,10 @@ limit
1567915727
// Quarter of Year
1568015728
["EXTRACT(QUARTER FROM t.\"order_date\")", "quarter"],
1568115729
// Year
15682-
["CAST(EXTRACT(YEAR FROM t.\"order_date\") AS varchar)", "year"],
15730+
[
15731+
"CAST(EXTRACT(YEAR FROM t.\"order_date\") AS varchar)",
15732+
"year",
15733+
],
1568315734
];
1568415735

1568515736
for [expr, expected_granularity] in &supported_granularities {
@@ -18248,7 +18299,7 @@ limit
1824818299
segments: Some(vec![]),
1824918300
time_dimensions: Some(vec![V1LoadRequestQueryTimeDimension {
1825018301
dimension: "KibanaSampleDataEcommerce.order_date".to_string(),
18251-
granularity: Some("quarter".to_string()),
18302+
granularity: Some("month".to_string()),
1825218303
date_range: None
1825318304
}]),
1825418305
order: None,

rust/cubesql/cubesql/src/compile/rewrite/analysis.rs

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -833,6 +833,7 @@ impl LogicalPlanAnalysis {
833833
_ => panic!("Expected Literal but got: {:?}", expr),
834834
}
835835
}
836+
LogicalPlanLanguage::AliasExpr(params) => constant_node(params[0]),
836837
LogicalPlanLanguage::ScalarUDFExpr(_) => {
837838
let expr = node_to_expr(
838839
enode,
@@ -858,6 +859,14 @@ impl LogicalPlanAnalysis {
858859
args: vec![],
859860
},
860861
)
862+
} else if &fun.name == "eval_utc_timestamp" {
863+
Self::eval_constant_expr(
864+
&egraph,
865+
&Expr::ScalarFunction {
866+
fun: BuiltinScalarFunction::UtcTimestamp,
867+
args: vec![],
868+
},
869+
)
861870
} else if &fun.name == "str_to_date"
862871
|| &fun.name == "date_add"
863872
|| &fun.name == "date_sub"
@@ -883,13 +892,10 @@ impl LogicalPlanAnalysis {
883892
.ok()?;
884893

885894
if let Expr::ScalarFunction { fun, .. } = &expr {
886-
if (fun.volatility() == Volatility::Immutable
887-
|| fun.volatility() == Volatility::Stable)
888-
&& !matches!(
889-
fun,
890-
BuiltinScalarFunction::CurrentDate | BuiltinScalarFunction::Now
891-
)
892-
{
895+
// Removed stable evaluation as it affects caching and SQL push down.
896+
// Whatever stable function should be evaluated it should be addressed as a special rewrite rule
897+
// as it seems LogicalPlanAnalysis can't change it's state.
898+
if fun.volatility() == Volatility::Immutable {
893899
Self::eval_constant_expr(&egraph, &expr)
894900
} else {
895901
None
@@ -1185,18 +1191,29 @@ impl Analysis<LogicalPlanLanguage> for LogicalPlanAnalysis {
11851191

11861192
fn modify(egraph: &mut EGraph<LogicalPlanLanguage, Self>, id: Id) {
11871193
if let Some(ConstantFolding::Scalar(c)) = &egraph[id].data.constant {
1194+
// As ConstantFolding goes through Alias we can't add LiteralExpr at this level otherwise it gets dropped.
1195+
// In case there's wrapping node on top of Alias that can be evaluated to LiteralExpr further it gets replaced instead.
1196+
if let Some(Expr::Alias(_, _)) = egraph[id].data.original_expr.as_ref() {
1197+
return;
1198+
}
11881199
// TODO: ideally all constants should be aliased, but this requires
11891200
// rewrites to extract `.data.constant` instead of `literal_expr`.
1190-
let alias_name =
1191-
if c.is_null() || matches!(c, ScalarValue::Date32(_) | ScalarValue::Date64(_)) {
1192-
egraph[id]
1193-
.data
1194-
.original_expr
1195-
.as_ref()
1196-
.map(|expr| expr.name(&DFSchema::empty()).unwrap())
1197-
} else {
1198-
None
1199-
};
1201+
let alias_name = if c.is_null()
1202+
|| matches!(
1203+
c,
1204+
ScalarValue::Date32(_)
1205+
| ScalarValue::Date64(_)
1206+
| ScalarValue::Int64(_)
1207+
| ScalarValue::Float64(_)
1208+
) {
1209+
egraph[id]
1210+
.data
1211+
.original_expr
1212+
.as_ref()
1213+
.map(|expr| expr.name(&DFSchema::empty()).unwrap())
1214+
} else {
1215+
None
1216+
};
12001217
let c = c.clone();
12011218
let value = egraph.add(LogicalPlanLanguage::LiteralExprValue(LiteralExprValue(c)));
12021219
let literal_expr = egraph.add(LogicalPlanLanguage::LiteralExpr([value]));

0 commit comments

Comments
 (0)