Skip to content

Commit 38467ab

Browse files
committed
feat(cubesql): Initial SQL push down support for BigQuery, Clickhouse and MySQL
1 parent 5690d8d commit 38467ab

File tree

4 files changed

+148
-63
lines changed

4 files changed

+148
-63
lines changed

packages/cubejs-schema-compiler/src/adapter/BigqueryQuery.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,4 +144,11 @@ export class BigqueryQuery extends BaseQuery {
144144
every: '2 minutes'
145145
};
146146
}
147+
148+
sqlTemplates() {
149+
const templates = super.sqlTemplates();
150+
templates.quotes.identifiers = '`';
151+
templates.quotes.escape = '\\`';
152+
return templates;
153+
}
147154
}

packages/cubejs-schema-compiler/src/adapter/ClickHouseQuery.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,4 +240,11 @@ export class ClickHouseQuery extends BaseQuery {
240240
createIndexSql(indexName, tableName, escapedColumns) {
241241
return `ALTER TABLE ${tableName} ADD INDEX ${indexName} (${escapedColumns.join(', ')}) TYPE minmax GRANULARITY 1`;
242242
}
243+
244+
sqlTemplates() {
245+
const templates = super.sqlTemplates();
246+
templates.quotes.identifiers = '`';
247+
templates.quotes.escape = '\\`';
248+
return templates;
249+
}
243250
}

packages/cubejs-schema-compiler/src/adapter/MysqlQuery.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,11 @@ export class MysqlQuery extends BaseQuery {
8686
}
8787
return name;
8888
}
89+
90+
sqlTemplates() {
91+
const templates = super.sqlTemplates();
92+
templates.quotes.identifiers = '`';
93+
templates.quotes.escape = '\\`';
94+
return templates;
95+
}
8996
}

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

Lines changed: 127 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,16 @@ use crate::{
99
use datafusion::{
1010
error::{DataFusionError, Result},
1111
logical_plan::{
12-
plan::Extension, DFSchema, DFSchemaRef, Expr, LogicalPlan, UserDefinedLogicalNode,
12+
plan::Extension, replace_col, Column, DFSchema, DFSchemaRef, Expr, LogicalPlan,
13+
UserDefinedLogicalNode,
1314
},
1415
physical_plan::aggregates::AggregateFunction,
1516
scalar::ScalarValue,
1617
};
1718
use itertools::Itertools;
1819
use regex::{Captures, Regex};
1920
use serde_derive::*;
20-
use std::{any::Any, fmt, future::Future, pin::Pin, result, sync::Arc};
21+
use std::{any::Any, collections::HashMap, fmt, future::Future, pin::Pin, result, sync::Arc};
2122

2223
#[derive(Debug, Clone, Deserialize)]
2324
pub struct SqlQuery {
@@ -127,6 +128,13 @@ fn expr_name(e: &Expr, schema: &Arc<DFSchema>) -> Result<String> {
127128
}
128129
}
129130

131+
pub struct SqlGenerationResult {
132+
pub data_source: Option<String>,
133+
pub from_alias: Option<String>,
134+
pub column_remapping: Option<HashMap<Column, Column>>,
135+
pub sql: SqlQuery,
136+
}
137+
130138
impl CubeScanWrapperNode {
131139
pub async fn generate_sql(
132140
&self,
@@ -140,7 +148,7 @@ impl CubeScanWrapperNode {
140148
self.wrapped_plan.clone(),
141149
)
142150
.await
143-
.and_then(|(data_source, _, mut sql): (Option<String>, _, SqlQuery)| -> result::Result<_, CubeError> {
151+
.and_then(|SqlGenerationResult { data_source, mut sql, .. }| -> result::Result<_, CubeError> {
144152
let data_source = data_source.ok_or_else(|| CubeError::internal(format!(
145153
"Can't generate SQL for wrapped select: no data source returned"
146154
)))?;
@@ -166,13 +174,7 @@ impl CubeScanWrapperNode {
166174
transport: Arc<dyn TransportService>,
167175
load_request_meta: Arc<LoadRequestMeta>,
168176
node: Arc<LogicalPlan>,
169-
) -> Pin<
170-
Box<
171-
dyn Future<
172-
Output = result::Result<(Option<String>, Option<String>, SqlQuery), CubeError>,
173-
> + Send,
174-
>,
175-
> {
177+
) -> Pin<Box<dyn Future<Output = result::Result<SqlGenerationResult, CubeError>> + Send>> {
176178
Box::pin(async move {
177179
match node.as_ref() {
178180
// LogicalPlan::Projection(_) => {}
@@ -239,16 +241,17 @@ impl CubeScanWrapperNode {
239241
)
240242
.await?;
241243
// TODO Add wrapper for reprojection and literal members handling
242-
return Ok((
243-
Some(data_sources[0].clone()),
244-
// TODO Implement more straightforward way to get alias name
245-
node.schema
244+
return Ok(SqlGenerationResult {
245+
data_source: Some(data_sources[0].clone()),
246+
from_alias: node
247+
.schema
246248
.fields()
247249
.iter()
248250
.next()
249251
.and_then(|f| f.qualifier().cloned()),
250-
sql.sql,
251-
));
252+
sql: sql.sql,
253+
column_remapping: None,
254+
});
252255
} else if let Some(WrappedSelectNode {
253256
schema,
254257
select_type: _select_type,
@@ -266,13 +269,19 @@ impl CubeScanWrapperNode {
266269
}) = wrapped_select_node
267270
{
268271
// TODO support joins
269-
let (data_source, from_alias, mut sql) = Self::generate_sql_for_node(
272+
let SqlGenerationResult {
273+
data_source,
274+
from_alias,
275+
column_remapping,
276+
sql,
277+
} = Self::generate_sql_for_node(
270278
plan.clone(),
271279
transport.clone(),
272280
load_request_meta.clone(),
273281
from.clone(),
274282
)
275283
.await?;
284+
let mut next_remapping = HashMap::new();
276285
let alias = alias.or(from_alias.clone());
277286
if let Some(data_source) = data_source {
278287
let generator = plan
@@ -286,51 +295,36 @@ impl CubeScanWrapperNode {
286295
))
287296
})?
288297
.clone();
289-
let mut group_by = Vec::new();
290-
let mut projection = Vec::new();
291-
for expr in projection_expr {
292-
let (expr_sql, new_sql_query) = Self::generate_sql_for_expr(
293-
plan.clone(),
294-
sql,
295-
generator.clone(),
296-
expr.clone(),
297-
)
298-
.await?;
299-
sql = new_sql_query;
300-
projection.push(AliasedColumn {
301-
expr: expr_sql,
302-
alias: expr_name(&expr, &schema)?,
303-
});
304-
}
305-
for expr in group_expr {
306-
let (expr_sql, new_sql_query) = Self::generate_sql_for_expr(
307-
plan.clone(),
308-
sql,
309-
generator.clone(),
310-
expr.clone(),
311-
)
312-
.await?;
313-
sql = new_sql_query;
314-
group_by.push(AliasedColumn {
315-
expr: expr_sql,
316-
alias: expr_name(&expr, &schema)?,
317-
});
318-
}
319-
let mut aggregate = Vec::new();
320-
for expr in aggr_expr {
321-
let (expr_sql, new_sql_query) = Self::generate_sql_for_expr(
322-
plan.clone(),
323-
sql,
324-
generator.clone(),
325-
expr.clone(),
326-
)
327-
.await?;
328-
sql = new_sql_query;
329-
aggregate.push(AliasedColumn {
330-
expr: expr_sql,
331-
alias: expr_name(&expr, &schema)?,
332-
});
333-
}
298+
let (projection, sql) = Self::generate_column_expr(
299+
plan.clone(),
300+
schema.clone(),
301+
projection_expr,
302+
sql,
303+
generator.clone(),
304+
&column_remapping,
305+
&mut next_remapping,
306+
)
307+
.await?;
308+
let (group_by, sql) = Self::generate_column_expr(
309+
plan.clone(),
310+
schema.clone(),
311+
group_expr,
312+
sql,
313+
generator.clone(),
314+
&column_remapping,
315+
&mut next_remapping,
316+
)
317+
.await?;
318+
let (aggregate, mut sql) = Self::generate_column_expr(
319+
plan.clone(),
320+
schema.clone(),
321+
aggr_expr,
322+
sql,
323+
generator.clone(),
324+
&column_remapping,
325+
&mut next_remapping,
326+
)
327+
.await?;
334328
let resulting_sql = generator
335329
.get_sql_templates()
336330
.select(
@@ -351,7 +345,16 @@ impl CubeScanWrapperNode {
351345
))
352346
})?;
353347
sql.replace_sql(resulting_sql.clone());
354-
Ok((Some(data_source), alias, sql))
348+
Ok(SqlGenerationResult {
349+
data_source: Some(data_source),
350+
from_alias: alias,
351+
sql,
352+
column_remapping: if next_remapping.len() > 0 {
353+
Some(next_remapping)
354+
} else {
355+
None
356+
},
357+
})
355358
} else {
356359
Err(CubeError::internal(format!(
357360
"Can't generate SQL for wrapped select: no data source for {:?}",
@@ -376,6 +379,67 @@ impl CubeScanWrapperNode {
376379
})
377380
}
378381

382+
async fn generate_column_expr(
383+
plan: Arc<Self>,
384+
schema: DFSchemaRef,
385+
exprs: Vec<Expr>,
386+
mut sql: SqlQuery,
387+
generator: Arc<dyn SqlGenerator>,
388+
column_remapping: &Option<HashMap<Column, Column>>,
389+
next_remapping: &mut HashMap<Column, Column>,
390+
) -> result::Result<(Vec<AliasedColumn>, SqlQuery), CubeError> {
391+
let non_id_regex = Regex::new(r"[^a-zA-Z0-9_]")
392+
.map_err(|e| CubeError::internal(format!("Can't parse regex: {}", e)))?;
393+
let mut aliased_columns = Vec::new();
394+
for expr in exprs {
395+
let expr = if let Some(column_remapping) = column_remapping.as_ref() {
396+
replace_col(
397+
expr.clone(),
398+
&column_remapping.iter().map(|(k, v)| (k, v)).collect(),
399+
)
400+
.map_err(|_| {
401+
CubeError::internal(format!("Can't rename columns for expr: {:?}", expr))
402+
})?
403+
} else {
404+
expr
405+
};
406+
let (expr_sql, new_sql_query) =
407+
Self::generate_sql_for_expr(plan.clone(), sql, generator.clone(), expr.clone())
408+
.await?;
409+
sql = new_sql_query;
410+
411+
let original_alias = expr_name(&expr, &schema)?;
412+
let mut truncated_alias = non_id_regex.replace_all(&original_alias, "_").to_string();
413+
truncated_alias.truncate(16);
414+
let mut alias = truncated_alias.clone();
415+
for i in 1..10000 {
416+
if !next_remapping.contains_key(&Column::from_name(&alias)) {
417+
break;
418+
}
419+
alias = format!("{}_{}", truncated_alias, i);
420+
}
421+
if original_alias != alias {
422+
if !next_remapping.contains_key(&Column::from_name(&alias)) {
423+
next_remapping.insert(
424+
Column::from_name(&original_alias),
425+
Column::from_name(&alias),
426+
);
427+
} else {
428+
return Err(CubeError::internal(format!(
429+
"Can't generate SQL for column expr: duplicate alias {}",
430+
alias
431+
)));
432+
}
433+
}
434+
435+
aliased_columns.push(AliasedColumn {
436+
expr: expr_sql,
437+
alias,
438+
});
439+
}
440+
Ok((aliased_columns, sql))
441+
}
442+
379443
pub fn generate_sql_for_expr(
380444
plan: Arc<Self>,
381445
mut sql_query: SqlQuery,

0 commit comments

Comments
 (0)