Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 41 additions & 46 deletions packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,34 @@
* @license Apache-2.0
*/

import R from 'ramda';
import cronParser from 'cron-parser';
import moment from 'moment-timezone';
import inflection from 'inflection';
import moment from 'moment-timezone';
import R from 'ramda';

import {
buildSqlAndParams as nativeBuildSqlAndParams,
} from '@cubejs-backend/native';
import {
FROM_PARTITION_RANGE,
MAX_SOURCE_ROW_LIMIT,
localTimestampToUtc,
QueryAlias,
getEnv,
localTimestampToUtc,
timeSeries as timeSeriesBase
} from '@cubejs-backend/shared';
import {
buildSqlAndParams as nativeBuildSqlAndParams,
} from '@cubejs-backend/native';

import { UserError } from '../compiler/UserError';
import { BaseMeasure } from './BaseMeasure';
import { SqlParser } from '../parser/SqlParser';
import { BaseDimension } from './BaseDimension';
import { BaseSegment } from './BaseSegment';
import { BaseFilter } from './BaseFilter';
import { BaseGroupFilter } from './BaseGroupFilter';
import { BaseMeasure } from './BaseMeasure';
import { BaseSegment } from './BaseSegment';
import { BaseTimeDimension } from './BaseTimeDimension';
import { Granularity } from './Granularity';
import { ParamAllocator } from './ParamAllocator';
import { PreAggregations } from './PreAggregations';
import { SqlParser } from '../parser/SqlParser';
import { Granularity } from './Granularity';

const DEFAULT_PREAGGREGATIONS_SCHEMA = 'stb_pre_aggregations';

Expand Down Expand Up @@ -571,12 +571,9 @@ export class BaseQuery {
* @returns {string}
*/
countAllQuery(sql) {
return `select count(*) ${
this.escapeColumnName(QueryAlias.TOTAL_COUNT)
} from (\n${
sql
}\n) ${
this.escapeColumnName(QueryAlias.ORIGINAL_QUERY)
return `select count(*) ${this.escapeColumnName(QueryAlias.TOTAL_COUNT)
} from (\n${sql
}\n) ${this.escapeColumnName(QueryAlias.ORIGINAL_QUERY)
}`;
}

Expand Down Expand Up @@ -967,8 +964,7 @@ export class BaseQuery {
).concat(
R.map(
([multiplied, measure]) => this.withCubeAliasPrefix(
`${
this.aliasName(measure.measure.replace('.', '_'))
`${this.aliasName(measure.measure.replace('.', '_'))
}_cumulative`,
() => this.overTimeSeriesQuery(
multiplied
Expand All @@ -983,7 +979,7 @@ export class BaseQuery {
),
)
)(cumulativeMeasures)
// TODO SELECT *
// TODO SELECT *
).concat(multiStageMembers.map(m => `SELECT * FROM ${m.alias}`));
}

Expand Down Expand Up @@ -1891,10 +1887,9 @@ export class BaseQuery {
)
), inlineWhereConditions);

return `SELECT ${this.selectAllDimensionsAndMeasures(measures)} FROM ${
query
return `SELECT ${this.selectAllDimensionsAndMeasures(measures)} FROM ${query
} ${this.baseWhere(filters.concat(inlineWhereConditions))}` +
(!this.safeEvaluateSymbolContext().ungrouped && this.groupByClause() || '');
(!this.safeEvaluateSymbolContext().ungrouped && this.groupByClause() || '');
}

/**
Expand Down Expand Up @@ -1946,14 +1941,11 @@ export class BaseQuery {
.join(', ');

const primaryKeyJoinConditions = primaryKeyDimensions.map((pkd) => (
`${
this.escapeColumnName(QueryAlias.AGG_SUB_QUERY_KEYS)
}.${
pkd.aliasName()
} = ${
shouldBuildJoinForMeasureSelect
? `${this.cubeAlias(keyCubeName)}.${pkd.aliasName()}`
: this.dimensionSql(pkd)
`${this.escapeColumnName(QueryAlias.AGG_SUB_QUERY_KEYS)
}.${pkd.aliasName()
} = ${shouldBuildJoinForMeasureSelect
? `${this.cubeAlias(keyCubeName)}.${pkd.aliasName()}`
: this.dimensionSql(pkd)
}`
)).join(' AND ');

Expand Down Expand Up @@ -2026,8 +2018,7 @@ export class BaseQuery {
'collectSubQueryDimensionsFor'
)
), inlineWhereConditions);
return `SELECT DISTINCT ${this.keysSelect(primaryKeyDimensions)} FROM ${
query
return `SELECT DISTINCT ${this.keysSelect(primaryKeyDimensions)} FROM ${query
} ${this.baseWhere(filters.concat(inlineWhereConditions))}`;
}

Expand Down Expand Up @@ -3337,7 +3328,7 @@ export class BaseQuery {
}
throw new UserError('Output schema is only supported for rollup pre-aggregations');
},
{ inputProps: { }, cache: this.queryCache }
{ inputProps: {}, cache: this.queryCache }
);
}

Expand Down Expand Up @@ -3457,14 +3448,18 @@ export class BaseQuery {
join: '{{ join_type }} JOIN {{ source }} ON {{ condition }}',
cte: '{{ alias }} AS ({{ query | indent(2, true) }})',
time_series_select: 'SELECT date_from::timestamp AS "date_from",\n' +
'date_to::timestamp AS "date_to" \n' +
'FROM(\n' +
' VALUES ' +
'{% for time_item in seria %}' +
'(\'{{ time_item | join(\'\\\', \\\'\') }}\')' +
'{% if not loop.last %}, {% endif %}' +
'{% endfor %}' +
') AS dates (date_from, date_to)'
'date_to::timestamp AS "date_to" \n' +
'FROM(\n' +
' VALUES ' +
'{% for time_item in seria %}' +
'(\'{{ time_item | join(\'\\\', \\\'\') }}\')' +
'{% if not loop.last %}, {% endif %}' +
'{% endfor %}' +
') AS dates (date_from, date_to)',
time_series_get_range: 'SELECT {{ max_expr }} as {{ quoted_max_name }},\n' +
'{{ min_expr }} as {{ quoted_min_name }}\n' +
'FROM {{ from_prepared }}\n' +
'{% if filter %}WHERE {{ filter }}{% endif %}'
},
expressions: {
column_reference: '{% if table_name %}{{ table_name }}.{% endif %}{{ name }}',
Expand Down Expand Up @@ -4078,11 +4073,11 @@ export class BaseQuery {
const filterParamArg = filterParamArgs.filter(p => {
const member = p.__member();
return member === filter.measure ||
member === filter.dimension ||
(aliases[member] && (
aliases[member] === filter.measure ||
aliases[member] === filter.dimension
));
member === filter.dimension ||
(aliases[member] && (
aliases[member] === filter.measure ||
aliases[member] === filter.dimension
));
})[0];

if (!filterParamArg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ export class PostgresQuery extends BaseQuery {
templates.types.double = 'DOUBLE PRECISION';
templates.types.binary = 'BYTEA';
templates.operators.is_not_distinct_from = 'IS NOT DISTINCT FROM';
templates.statements.generated_time_series_select = 'SELECT date_from AS "date_from",\n' +
'date_from + interval \'{{ granularity }}\' - interval \'1 millisecond\' AS "date_to" \n' +
'FROM generate_series({{ start }}::timestamp, {{ end }}:: timestamp, \'{{ granularity }}\'::interval) "date_from" ';
return templates;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ describe('SQL Generation', () => {
offset: 'start'
}
},
countRollingThreeMonth: {
type: 'count',
rollingWindow: {
trailing: '3 month',
offset: 'end'
}
},
countRollingUnbounded: {
type: 'count',
rollingWindow: {
Expand Down Expand Up @@ -1088,6 +1095,34 @@ SELECT 1 AS revenue, cast('2024-01-01' AS timestamp) as time UNION ALL
{ visitors__created_at_day: '2017-01-10T00:00:00.000Z', visitors__count_rolling: null }
]));

if (getEnv('nativeSqlPlanner')) {
it('rolling count without date range', async () => {
await runQueryTest({
measures: [
'visitors.countRollingThreeMonth'
],
timeDimensions: [{
dimension: 'visitors.created_at',
granularity: 'month',
}],
order: [{
id: 'visitors.created_at'
}],
timezone: 'America/Los_Angeles'
}, [
{ visitors__created_at_month: '2016-09-01T00:00:00.000Z', visitors__count_rolling_three_month: '1' },
{ visitors__created_at_month: '2016-10-01T00:00:00.000Z', visitors__count_rolling_three_month: '1' },
{ visitors__created_at_month: '2016-11-01T00:00:00.000Z', visitors__count_rolling_three_month: '1' },
{ visitors__created_at_month: '2016-12-01T00:00:00.000Z', visitors__count_rolling_three_month: null },
{ visitors__created_at_month: '2017-01-01T00:00:00.000Z', visitors__count_rolling_three_month: '5' },
]);
});
} else {
it.skip('rolling count without date range', () => {
// Skipping because it works only in Tesseract
});
}

it('rolling qtd', async () => runQueryTest({
measures: [
'visitors.revenue_qtd'
Expand Down
35 changes: 28 additions & 7 deletions rust/cubesqlplanner/cubesqlplanner/src/plan/builder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,28 @@ impl SelectBuilder {
.add_column(SchemaColumn::new(alias.clone(), Some(member.full_name())));
}

pub fn add_projection_function_expression(
&mut self,
function: &str,
args: Vec<Rc<dyn BaseMember>>,
alias: String,
) {
let expr = Expr::Function(FunctionExpression {
function: function.to_string(),
arguments: args
.into_iter()
.map(|r| Expr::Member(MemberExpression::new(r.clone())))
.collect(),
});
let aliased_expr = AliasedExpr {
expr,
alias: alias.clone(),
};

self.projection_columns.push(aliased_expr);
self.result_schema
.add_column(SchemaColumn::new(alias.clone(), None));
}
pub fn add_projection_coalesce_member(
&mut self,
member: &Rc<dyn BaseMember>,
Expand Down Expand Up @@ -147,16 +169,16 @@ impl SelectBuilder {
self.ctes = ctes;
}

fn make_cube_references(&self) -> HashMap<String, String> {
pub fn make_cube_references(from: Rc<From>) -> HashMap<String, String> {
let mut refs = HashMap::new();
match &self.from.source {
match &from.source {
crate::plan::FromSource::Single(source) => {
self.add_cube_reference_if_needed(source, &mut refs)
Self::add_cube_reference_if_needed(source, &mut refs)
}
crate::plan::FromSource::Join(join) => {
self.add_cube_reference_if_needed(&join.root, &mut refs);
Self::add_cube_reference_if_needed(&join.root, &mut refs);
for join_item in join.joins.iter() {
self.add_cube_reference_if_needed(&join_item.from, &mut refs);
Self::add_cube_reference_if_needed(&join_item.from, &mut refs);
}
}
crate::plan::FromSource::Empty => {}
Expand All @@ -165,7 +187,6 @@ impl SelectBuilder {
}

fn add_cube_reference_if_needed(
&self,
source: &SingleAliasedSource,
refs: &mut HashMap<String, String>,
) {
Expand Down Expand Up @@ -194,7 +215,7 @@ impl SelectBuilder {
}

pub fn build(self, mut nodes_factory: SqlNodesFactory) -> Select {
let cube_references = self.make_cube_references();
let cube_references = Self::make_cube_references(self.from.clone());
nodes_factory.set_cube_name_references(cube_references);
let schema = if self.projection_columns.is_empty() {
self.make_asteriks_schema()
Expand Down
2 changes: 1 addition & 1 deletion rust/cubesqlplanner/cubesqlplanner/src/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ pub use order::OrderBy;
pub use query_plan::QueryPlan;
pub use schema::{QualifiedColumnName, Schema, SchemaColumn};
pub use select::{AliasedExpr, Select};
pub use time_series::TimeSeries;
pub use time_series::{TimeSeries, TimeSeriesDateRange};
pub use union::Union;
Loading
Loading