diff --git a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js index 576a2eadfe9cc..9b6f8371f4b33 100644 --- a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js +++ b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js @@ -289,6 +289,11 @@ export class BaseQuery { }).filter(R.identity).map(this.newTimeDimension.bind(this)); this.allFilters = this.timeDimensions.concat(this.segments).concat(this.filters); this.useNativeSqlPlanner = this.options.useNativeSqlPlanner ?? getEnv('nativeSqlPlanner'); + this.canUseNativeSqlPlannerPreAggregation = false; + if (this.useNativeSqlPlanner) { + const hasMultiStageMeasures = this.fullKeyQueryAggregateMeasures({ hasMultipliedForPreAggregation: true }).multiStageMembers.length > 0; + this.canUseNativeSqlPlannerPreAggregation = hasMultiStageMeasures; + } this.prebuildJoin(); this.cubeAliasPrefix = this.options.cubeAliasPrefix; @@ -471,6 +476,19 @@ export class BaseQuery { } newDimension(dimensionPath) { + if (typeof dimensionPath === 'string') { + const memberArr = dimensionPath.split('.'); + if (memberArr.length > 3 && + memberArr[memberArr.length - 2] === 'granularities' && + this.cubeEvaluator.isDimension(memberArr.slice(0, -2))) { + return this.newTimeDimension( + { + dimension: this.cubeEvaluator.pathFromArray(memberArr.slice(0, -2)), + granularity: memberArr[memberArr.length - 1] + } + ); + } + } return new BaseDimension(this, dimensionPath); } @@ -636,38 +654,39 @@ export class BaseQuery { * @returns {[string, Array]} */ buildSqlAndParams(exportAnnotatedSql) { - if (!this.options.preAggregationQuery && !this.options.disableExternalPreAggregations && this.externalQueryClass) { - if (this.externalPreAggregationQuery()) { // TODO performance - return this.externalQuery().buildSqlAndParams(exportAnnotatedSql); - } - } - if (this.useNativeSqlPlanner) { let isRelatedToPreAggregation = false; - if (this.options.preAggregationQuery) { - isRelatedToPreAggregation = true; - } else if (!this.options.disableExternalPreAggregations && this.externalQueryClass) { - if (this.externalPreAggregationQuery()) { + + if (!this.canUseNativeSqlPlannerPreAggregation) { + if (this.options.preAggregationQuery) { isRelatedToPreAggregation = true; - } - } else { - let preAggForQuery = - this.preAggregations.findPreAggregationForQuery(); - if (this.options.disableExternalPreAggregations && preAggForQuery && preAggForQuery.preAggregation.external) { - preAggForQuery = undefined; - } - if (preAggForQuery) { + } else if (!this.options.disableExternalPreAggregations && this.externalQueryClass && this.externalPreAggregationQuery()) { isRelatedToPreAggregation = true; + } else { + let preAggForQuery = + this.preAggregations.findPreAggregationForQuery(); + if (this.options.disableExternalPreAggregations && preAggForQuery && preAggForQuery.preAggregation.external) { + preAggForQuery = undefined; + } + if (preAggForQuery) { + isRelatedToPreAggregation = true; + } } - } - if (isRelatedToPreAggregation) { - return this.newQueryWithoutNative().buildSqlAndParams(exportAnnotatedSql); + if (isRelatedToPreAggregation) { + return this.newQueryWithoutNative().buildSqlAndParams(exportAnnotatedSql); + } } return this.buildSqlAndParamsRust(exportAnnotatedSql); } + if (!this.options.preAggregationQuery && !this.options.disableExternalPreAggregations && this.externalQueryClass) { + if (this.externalPreAggregationQuery()) { // TODO performance + return this.externalQuery().buildSqlAndParams(exportAnnotatedSql); + } + } + return this.compilers.compiler.withQuery( this, () => this.cacheValue( @@ -703,8 +722,8 @@ export class BaseQuery { offset: this.options.offset ? this.options.offset.toString() : null, baseTools: this, ungrouped: this.options.ungrouped, - exportAnnotatedSql: exportAnnotatedSql === true - + exportAnnotatedSql: exportAnnotatedSql === true, + preAggregationQuery: this.options.preAggregationQuery }; const buildResult = nativeBuildSqlAndParams(queryParams); @@ -718,9 +737,57 @@ export class BaseQuery { } const res = buildResult.result; + const [query, params, preAggregation] = res; // FIXME - res[1] = [...res[1]]; - return res; + const paramsArray = [...params]; + if (preAggregation) { + this.preAggregations.preAggregationForQuery = preAggregation; + } + return [query, paramsArray]; + } + + // FIXME Temporary solution + findPreAggregationForQueryRust() { + let optionsOrder = this.options.order; + if (optionsOrder && !Array.isArray(optionsOrder)) { + optionsOrder = [optionsOrder]; + } + const order = optionsOrder ? R.pipe( + R.map((hash) => ((!hash || !hash.id) ? null : hash)), + R.reject(R.isNil), + )(optionsOrder) : undefined; + + const queryParams = { + measures: this.options.measures, + dimensions: this.options.dimensions, + segments: this.options.segments, + timeDimensions: this.options.timeDimensions, + timezone: this.options.timezone, + joinGraph: this.joinGraph, + cubeEvaluator: this.cubeEvaluator, + order, + filters: this.options.filters, + limit: this.options.limit ? this.options.limit.toString() : null, + rowLimit: this.options.rowLimit ? this.options.rowLimit.toString() : null, + offset: this.options.offset ? this.options.offset.toString() : null, + baseTools: this, + ungrouped: this.options.ungrouped, + exportAnnotatedSql: false, + preAggregationQuery: this.options.preAggregationQuery + }; + + const buildResult = nativeBuildSqlAndParams(queryParams); + + if (buildResult.error) { + if (buildResult.error.cause === 'User') { + throw new UserError(buildResult.error.message); + } else { + throw new Error(buildResult.error.message); + } + } + + const [, , preAggregation] = buildResult.result; + return preAggregation; } allCubeMembers(path) { @@ -743,6 +810,10 @@ export class BaseQuery { return timeSeriesFromCustomInterval(granularityInterval, dateRange, moment(origin), { timestampPrecision: 3 }); } + getPreAggregationByName(cube, preAggregationName) { + return this.preAggregations.getRollupPreAggregationByName(cube, preAggregationName); + } + get shouldReuseParams() { return false; } @@ -922,7 +993,6 @@ export class BaseQuery { const renderedWithQueries = withQueries.map(q => this.renderWithQuery(q)); let toJoin; - if (this.options.preAggregationQuery) { const allRegular = regularMeasures.concat( cumulativeMeasures @@ -1153,7 +1223,6 @@ export class BaseQuery { const multipliedMeasures = measuresToRender(true, false)(measureToHierarchy); const regularMeasures = measuresToRender(false, false)(measureToHierarchy); - const cumulativeMeasures = R.pipe( R.map(multiplied => R.xprod([multiplied], measuresToRender(multiplied, true)(measureToHierarchy))), @@ -3228,7 +3297,7 @@ export class BaseQuery { } newSubQueryForCube(cube, options) { - options = { ...options, useNativeSqlPlanner: false }; // We don't use tesseract for pre-aggregations generation yet + options = { ...options }; if (this.options.queryFactory) { // When dealing with rollup joins, it's crucial to use the correct parameter allocator for the specific cube in use. // By default, we'll use BaseQuery, but it's important to note that different databases (Oracle, PostgreSQL, MySQL, Druid, etc.) diff --git a/packages/cubejs-schema-compiler/src/adapter/PreAggregations.js b/packages/cubejs-schema-compiler/src/adapter/PreAggregations.js index 71a829546ed2b..bf3e041823813 100644 --- a/packages/cubejs-schema-compiler/src/adapter/PreAggregations.js +++ b/packages/cubejs-schema-compiler/src/adapter/PreAggregations.js @@ -559,7 +559,7 @@ export class PreAggregations { transformedQuery.filterDimensionsSingleValueEqual || {}, ) )); - + const backAlias = (references) => references.map(r => ( Array.isArray(r) ? [transformedQuery.allBackAliasMembers[r[0]] || r[0], r[1]] : @@ -782,11 +782,15 @@ export class PreAggregations { */ findPreAggregationForQuery() { if (!this.preAggregationForQuery) { - this.preAggregationForQuery = - this - .rollupMatchResults() - // Refresh worker can access specific pre-aggregations even in case those hidden by others - .find(p => p.canUsePreAggregation && (!this.query.options.preAggregationId || p.preAggregationId === this.query.options.preAggregationId)); + if (this.query.useNativeSqlPlanner && this.query.canUseNativeSqlPlannerPreAggregation) { + this.preAggregationForQuery = this.query.findPreAggregationForQueryRust(); + } else { + this.preAggregationForQuery = + this + .rollupMatchResults() + // Refresh worker can access specific pre-aggregations even in case those hidden by others + .find(p => p.canUsePreAggregation && (!this.query.options.preAggregationId || p.preAggregationId === this.query.options.preAggregationId)); + } } return this.preAggregationForQuery; } @@ -866,6 +870,25 @@ export class PreAggregations { )(preAggregations); } + getRollupPreAggregationByName(cube, preAggregationName) { + const canUsePreAggregation = () => true; + const preAggregation = R.pipe( + R.toPairs, + R.filter(([_, a]) => a.type === 'rollup' || a.type === 'rollupJoin' || a.type === 'rollupLambda'), + R.find(([k, _]) => k === preAggregationName) + )(this.query.cubeEvaluator.preAggregationsForCube(cube)); + if (preAggregation) { + const tableName = this.preAggregationTableName(cube, preAggregation[0], preAggregation[1]); + const preAggObj = preAggregation ? this.evaluatedPreAggregationObj(cube, preAggregation[0], preAggregation[1], canUsePreAggregation) : {}; + return { + tableName, + ...preAggObj + }; + } else { + return {}; + } + } + // TODO check multiplication factor didn't change buildRollupJoin(preAggObj, preAggObjsToJoin) { return this.query.cacheValue( diff --git a/packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts b/packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts index 880e80bf85eb4..2d9e3c14e547d 100644 --- a/packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts +++ b/packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts @@ -520,6 +520,13 @@ export class CubeEvaluator extends CubeSymbols { return this.cubeFromPath(path).preAggregations || {}; } + public preAggregationsForCubeAsArray(path: string) { + return Object.entries(this.cubeFromPath(path).preAggregations || {}).map(([name, preAggregation]) => ({ + name, + ...(preAggregation as Record) + })); + } + /** * Returns pre-aggregations filtered by the specified selector. */ diff --git a/packages/cubejs-schema-compiler/src/compiler/CubeValidator.ts b/packages/cubejs-schema-compiler/src/compiler/CubeValidator.ts index 97508876eab70..778ff93db8133 100644 --- a/packages/cubejs-schema-compiler/src/compiler/CubeValidator.ts +++ b/packages/cubejs-schema-compiler/src/compiler/CubeValidator.ts @@ -560,7 +560,7 @@ const measureTypeWithCount = Joi.string().valid( ); const multiStageMeasureType = Joi.string().valid( - 'count', 'number', 'string', 'boolean', 'time', 'sum', 'avg', 'min', 'max', 'countDistinct', 'runningTotal', 'countDistinctApprox', + 'count', 'number', 'string', 'boolean', 'time', 'sum', 'avg', 'min', 'max', 'countDistinct', 'runningTotal', 'countDistinctApprox', 'numberAgg', 'rank' ); diff --git a/packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations-alias.test.ts b/packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations-alias.test.ts index dcc6f97c08f92..401ffa463695a 100644 --- a/packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations-alias.test.ts +++ b/packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations-alias.test.ts @@ -368,8 +368,9 @@ describe('PreAggregationsAlias', () => { }); const preAggregationsDescription: any = query.preAggregations?.preAggregationsDescription(); + const sqlAndParams = query.buildSqlAndParams(); expect(preAggregationsDescription[0].tableName).toEqual('rvis_rollupalias'); - expect(query.buildSqlAndParams()[0]).toContain('rvis_rollupalias'); + expect(sqlAndParams[0]).toContain('rvis_rollupalias'); return dbRunner.evaluateQueryWithPreAggregations(query).then(res => { expect(res).toEqual( diff --git a/packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations-multi-stage.test.ts b/packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations-multi-stage.test.ts new file mode 100644 index 0000000000000..a3d0ebefaa707 --- /dev/null +++ b/packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations-multi-stage.test.ts @@ -0,0 +1,290 @@ +import { + getEnv, +} from '@cubejs-backend/shared'; +import R from 'ramda'; +import { UserError } from '../../../src/compiler/UserError'; +import { PostgresQuery } from '../../../src/adapter/PostgresQuery'; +import { prepareJsCompiler } from '../../unit/PrepareCompiler'; +import { dbRunner } from './PostgresDBRunner'; + +describe('PreAggregationsMultiStage', () => { + jest.setTimeout(200000); + + const { compiler, joinGraph, cubeEvaluator } = prepareJsCompiler(` + cube(\`visitors\`, { + sql: \` + select * from visitors WHERE \${FILTER_PARAMS.visitors.createdAt.filter('created_at')} + \`, + sqlAlias: 'vis', + + joins: { + visitor_checkins: { + relationship: 'hasMany', + sql: \`\${CUBE}.id = \${visitor_checkins}.visitor_id\` + } + }, + + measures: { + count: { + type: 'count' + }, + revenue: { + sql: 'amount', + type: 'sum' + }, + + + checkinsTotal: { + sql: \`\${checkinsCount}\`, + type: 'sum' + }, + + uniqueSourceCount: { + sql: 'source', + type: 'countDistinct' + }, + + countDistinctApprox: { + sql: 'id', + type: 'countDistinctApprox' + }, + revenuePerId: { + multi_stage: true, + sql: \`\${revenue} / \${id}\`, + type: 'sum', + add_group_by: [visitors.id], + }, + + revenueAndTime: { + multi_stage: true, + sql: \`LENGTH(CONCAT(\${revenue}, ' - ', \${createdAtDay}))\`, + type: 'sum', + add_group_by: [createdAtDay], + }, + + ratio: { + sql: \`\${uniqueSourceCount} / nullif(\${checkinsTotal}, 0)\`, + type: 'number' + }, + + testMeas: { + type: 'countDistinct', + sql: \`\${createdAtDay}\` + } + }, + + dimensions: { + id: { + type: 'number', + sql: 'id', + primaryKey: true + }, + source: { + type: 'string', + sql: 'source' + }, + createdAt: { + type: 'time', + sql: 'created_at', + }, + checkinsCount: { + type: 'number', + sql: \`\${visitor_checkins.count}\`, + subQuery: true, + propagateFiltersToSubQuery: true + }, + revTest: { + sql: \`CONCAT(\${source}, \${createdAtDay})\`, + type: 'string', + }, + + createdAtDay: { + type: 'time', + sql: \`\${createdAt.day}\`, + }, + + + + }, + + segments: { + google: { + sql: \`source = 'google'\` + } + }, + + preAggregations: { + revenuePerIdRollup: { + type: 'rollup', + measureReferences: [revenue], + dimensionReferences: [id], + timeDimensionReference: createdAt, + granularity: 'day', + partitionGranularity: 'month', + }, + revenueAndTimeAndCountRollup: { + type: 'rollup', + measureReferences: [revenue, count], + dimensionReferences: [source], + timeDimensionReference: createdAt, + granularity: 'day', + partitionGranularity: 'month', + }, + } + }) + + + + cube('visitor_checkins', { + sql: \` + select * from visitor_checkins + \`, + + sqlAlias: 'vc', + + measures: { + count: { + type: 'count' + } + }, + + dimensions: { + id: { + type: 'number', + sql: 'id', + primaryKey: true + }, + visitor_id: { + type: 'number', + sql: 'visitor_id' + }, + source: { + type: 'string', + sql: 'source' + }, + created_at: { + type: 'time', + sql: 'created_at', + } + }, + + }) + + + `); + + if (getEnv('nativeSqlPlanner')) { + it('simple multi stage with add_group_by', () => compiler.compile().then(() => { + const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, { + measures: [ + 'visitors.revenuePerId' + ], + timeDimensions: [{ + dimension: 'visitors.createdAt', + granularity: 'day', + dateRange: ['2017-01-01', '2017-01-30'] + }], + timezone: 'America/Los_Angeles', + order: [{ + id: 'visitors.createdAt' + }], + preAggregationsSchema: '' + }); + + const preAggregationsDescription: any = query.preAggregations?.preAggregationsDescription(); + const sqlAndParams = query.buildSqlAndParams(); + expect(preAggregationsDescription[0].tableName).toEqual('vis_revenue_per_id_rollup'); + expect(sqlAndParams[0]).toContain('vis_revenue_per_id_rollup'); + + return dbRunner.evaluateQueryWithPreAggregations(query).then(res => { + expect(res).toEqual( + [ + { + vis__created_at_day: '2017-01-02T00:00:00.000Z', + vis__revenue_per_id: '100.0000000000000000' + }, + { + vis__created_at_day: '2017-01-04T00:00:00.000Z', + vis__revenue_per_id: '100.0000000000000000' + }, + { + vis__created_at_day: '2017-01-05T00:00:00.000Z', + vis__revenue_per_id: '100.0000000000000000' + }, + { + vis__created_at_day: '2017-01-06T00:00:00.000Z', + vis__revenue_per_id: '200.0000000000000000' + } + ] + + ); + }); + })); + + it('simple multi stage with add_group_by and time proxy dimension', () => compiler.compile().then(() => { + const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, { + measures: [ + 'visitors.revenueAndTime' + ], + dimensions: ['visitors.source'], + timezone: 'America/Los_Angeles', + order: [{ + id: 'visitors.source' + }], + preAggregationsSchema: '' + }); + + const preAggregationsDescription: any = query.preAggregations?.preAggregationsDescription(); + const sqlAndParams = query.buildSqlAndParams(); + expect(preAggregationsDescription[0].tableName).toEqual('vis_revenue_and_time_and_count_rollup'); + expect(sqlAndParams[0]).toContain('vis_revenue_and_time_and_count_rollup'); + + return dbRunner.evaluateQueryWithPreAggregations(query).then(res => { + expect(res).toEqual( + [ + { vis__source: 'google', vis__revenue_and_time: '25' }, + { vis__source: 'some', vis__revenue_and_time: '50' }, + { vis__source: null, vis__revenue_and_time: '50' } + ] + + ); + }); + })); + + it('multi stage with add_group_by and time proxy dimension and regular measure', () => compiler.compile().then(() => { + const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, { + measures: [ + 'visitors.revenueAndTime', + 'visitors.count' + ], + dimensions: ['visitors.source'], + timezone: 'America/Los_Angeles', + order: [{ + id: 'visitors.source' + }], + preAggregationsSchema: '' + }); + + const preAggregationsDescription: any = query.preAggregations?.preAggregationsDescription(); + const sqlAndParams = query.buildSqlAndParams(); + expect(preAggregationsDescription[0].tableName).toEqual('vis_revenue_and_time_and_count_rollup'); + expect(sqlAndParams[0]).toContain('vis_revenue_and_time_and_count_rollup'); + expect(sqlAndParams[0]).not.toContain('select * from visitors'); + + return dbRunner.evaluateQueryWithPreAggregations(query).then(res => { + expect(res).toEqual( + [ + { vis__source: 'google', vis__count: '1', vis__revenue_and_time: '25' }, + { vis__source: 'some', vis__count: '2', vis__revenue_and_time: '50' }, + { vis__source: null, vis__count: '3', vis__revenue_and_time: '50' } + ] + + ); + }); + })); + } else { + it.skip('multi stage pre-aggregations', () => { + // Skipping because it works only in Tesseract + }); + } +}); diff --git a/packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations.test.ts b/packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations.test.ts index f734e969fac49..fc22bcedee4e8 100644 --- a/packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations.test.ts +++ b/packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations.test.ts @@ -1,4 +1,7 @@ import { PreAggregationPartitionRangeLoader } from '@cubejs-backend/query-orchestrator'; +import { + getEnv, +} from '@cubejs-backend/shared'; import { PostgresQuery } from '../../../src/adapter/PostgresQuery'; import { BigqueryQuery } from '../../../src/adapter/BigqueryQuery'; import { prepareJsCompiler } from '../../unit/PrepareCompiler'; @@ -93,6 +96,10 @@ describe('PreAggregations', () => { type: 'string', sql: 'source' }, + shortSource: { + type: 'string', + sql: \`SUBSTRING(\${source}, 0, 2)\` + }, sourceAndId: { type: 'string', sql: \`\${source} || '_' || \${id}\`, @@ -115,6 +122,10 @@ describe('PreAggregations', () => { type: 'time', sql: \`\${createdAt}\` }, + createdAtDay: { + type: 'time', + sql: \`\${createdAt.day}\` + }, checkinsCount: { type: 'number', sql: \`\${visitor_checkins.count}\`, @@ -529,6 +540,11 @@ describe('PreAggregations', () => { preAggregationsSchema: '' }); + const queryAndParams = query.buildSqlAndParams(); + console.log(queryAndParams); + console.log(query.preAggregations?.preAggregationsDescription()); + expect(query.preAggregations?.preAggregationForQuery?.canUsePreAggregation).toEqual(true); + return dbRunner.evaluateQueryWithPreAggregations(query).then(res => { expect(res).toEqual( [ @@ -553,9 +569,58 @@ describe('PreAggregations', () => { }); }); + if (getEnv('nativeSqlPlanner')) { + it('simple pre-aggregation proxy time dimension', () => compiler.compile().then(() => { + const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, { + measures: [ + 'visitors.count' + ], + dimensions: [ + 'visitors.createdAtDay', + ], + timezone: 'America/Los_Angeles', + order: [{ + id: 'visitors.createdAtDay' + }], + preAggregationsSchema: '' + }); + + const queryAndParams = query.buildSqlAndParams(); + console.log(queryAndParams); + console.log(query.preAggregations?.preAggregationsDescription()); + expect(query.preAggregations?.preAggregationForQuery?.canUsePreAggregation).toEqual(true); + + return dbRunner.evaluateQueryWithPreAggregations(query).then(res => { + expect(res).toEqual( + [ + { + visitors__created_at_day: '2016-09-06T00:00:00.000Z', + visitors__count: '1' + }, + { + visitors__created_at_day: '2017-01-02T00:00:00.000Z', + visitors__count: '1' + }, + { + visitors__created_at_day: '2017-01-04T00:00:00.000Z', + visitors__count: '1' + }, + { + visitors__created_at_day: '2017-01-05T00:00:00.000Z', + visitors__count: '1' + }, + { + visitors__created_at_day: '2017-01-06T00:00:00.000Z', + visitors__count: '2' + } + ] + ); + }); + })); + } + it('simple pre-aggregation (allowNonStrictDateRangeMatch: true)', async () => { await compiler.compile(); - const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, { measures: [ 'visitors.count' @@ -889,9 +954,91 @@ describe('PreAggregations', () => { }); }); - it('non-additive single value view filtered measure', async () => { + it('non-additive view dimension', async () => { await compiler.compile(); + const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, { + measures: [ + 'visitors_view.uniqueSourceCount' + ], + dimensions: [ + 'visitors_view.source' + ], + timeDimensions: [{ + dimension: 'visitors_view.signedUpAt', + granularity: 'day', + dateRange: ['2017-01-01', '2017-01-30'] + }], + timezone: 'America/Los_Angeles', + order: [{ + id: 'visitors_view.createdAt' + }], + preAggregationsSchema: '' + }); + + const queryAndParams = query.buildSqlAndParams(); + console.log(queryAndParams); + const preAggregationsDescription = query.preAggregations?.preAggregationsDescription(); + console.log(preAggregationsDescription); + expect((preAggregationsDescription)[0].loadSql[0]).toMatch(/visitors_unique_source_count/); + return dbRunner.evaluateQueryWithPreAggregations(query).then(res => { + expect(res).toEqual( + [ + { + visitors_view__source: 'google', + visitors_view__signed_up_at_day: '2017-01-05T00:00:00.000Z', + visitors_view__unique_source_count: '1' + }, + { + visitors_view__source: 'some', + visitors_view__signed_up_at_day: '2017-01-02T00:00:00.000Z', + visitors_view__unique_source_count: '1' + }, + { + visitors_view__source: 'some', + visitors_view__signed_up_at_day: '2017-01-04T00:00:00.000Z', + visitors_view__unique_source_count: '1' + }, + { + visitors_view__source: null, + visitors_view__signed_up_at_day: '2017-01-06T00:00:00.000Z', + visitors_view__unique_source_count: '0' + } + ] + + ); + }); + }); + it('non-additive proxy but not direct alias dimension', async () => { + await compiler.compile(); + const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, { + measures: [ + 'visitors_view.uniqueSourceCount' + ], + dimensions: [ + 'visitors.shortSource' + ], + timeDimensions: [{ + dimension: 'visitors_view.signedUpAt', + granularity: 'day', + dateRange: ['2017-01-01', '2017-01-30'] + }], + timezone: 'America/Los_Angeles', + order: [{ + id: 'visitors_view.createdAt' + }], + preAggregationsSchema: '' + }); + + const queryAndParams = query.buildSqlAndParams(); + console.log(queryAndParams); + const preAggregationsDescription = query.preAggregations?.preAggregationsDescription(); + console.log(preAggregationsDescription); + expect((preAggregationsDescription)[0].type).toEqual('originalSql'); + }); + + it('non-additive single value view filtered measure', async () => { + await compiler.compile(); const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, { measures: [ 'visitors_view.googleUniqueSourceCount' @@ -949,6 +1096,7 @@ describe('PreAggregations', () => { const queryAndParams = query.buildSqlAndParams(); console.log(queryAndParams); expect(queryAndParams[0]).toMatch(/count\(distinct/ig); + expect(queryAndParams[0]).toMatch(/visitors_default/ig); const preAggregationsDescription = query.preAggregations?.preAggregationsDescription(); console.log(preAggregationsDescription); expect((preAggregationsDescription).filter(p => p.type === 'rollup').length).toBe(0); @@ -971,7 +1119,6 @@ describe('PreAggregations', () => { it('multiplied measure match', async () => { await compiler.compile(); - const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, { measures: [ 'visitors.count' @@ -1588,7 +1735,6 @@ describe('PreAggregations', () => { it('partitioned', async () => { await compiler.compile(); - const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, { measures: [ 'visitors.checkinsTotal' @@ -1612,6 +1758,7 @@ describe('PreAggregations', () => { console.log(queryAndParams); const preAggregationsDescription = query.preAggregations?.preAggregationsDescription(); console.log(JSON.stringify(preAggregationsDescription, null, 2)); + expect(query.preAggregations?.preAggregationForQuery?.canUsePreAggregation).toEqual(true); const queries = dbRunner.tempTablePreAggregations(preAggregationsDescription); @@ -1668,6 +1815,7 @@ describe('PreAggregations', () => { const preAggregationsDescription = query.preAggregations?.preAggregationsDescription(); console.log(JSON.stringify(preAggregationsDescription, null, 2)); + expect(query.preAggregations?.preAggregationForQuery?.canUsePreAggregation).toEqual(true); const queries = dbRunner.tempTablePreAggregations(preAggregationsDescription); console.log(JSON.stringify(queries.concat(queryAndParams))); @@ -1716,6 +1864,7 @@ describe('PreAggregations', () => { const preAggregationsDescription = query.preAggregations?.preAggregationsDescription(); console.log(preAggregationsDescription); + expect(query.preAggregations?.preAggregationForQuery?.canUsePreAggregation).toEqual(true); const queries = dbRunner.tempTablePreAggregations(preAggregationsDescription); expect(queries.filter(([q]) => !!q.match(/3600/)).length).toBeGreaterThanOrEqual(1); @@ -1766,9 +1915,9 @@ describe('PreAggregations', () => { }); const queryAndParams = query.buildSqlAndParams(); - console.log(queryAndParams); const preAggregationsDescription = query.preAggregations?.preAggregationsDescription(); console.log(preAggregationsDescription); + expect(query.preAggregations?.preAggregationForQuery?.canUsePreAggregation).toEqual(true); const queries = dbRunner.tempTablePreAggregations(preAggregationsDescription); @@ -1846,6 +1995,7 @@ describe('PreAggregations', () => { console.log(queryAndParams); const preAggregationsDescription = query.preAggregations?.preAggregationsDescription(); console.log(preAggregationsDescription); + expect(query.preAggregations?.preAggregationForQuery?.canUsePreAggregation).toEqual(true); const queries = dbRunner.tempTablePreAggregations(preAggregationsDescription); diff --git a/packages/cubejs-schema-compiler/test/integration/utils/BaseDbRunner.ts b/packages/cubejs-schema-compiler/test/integration/utils/BaseDbRunner.ts index 939badf6ac9b2..8d7e0ee64fd42 100644 --- a/packages/cubejs-schema-compiler/test/integration/utils/BaseDbRunner.ts +++ b/packages/cubejs-schema-compiler/test/integration/utils/BaseDbRunner.ts @@ -171,8 +171,6 @@ export class BaseDbRunner { public async evaluateQueryWithPreAggregations(query, seed = this.nextSeed++) { const preAggregationsDescription = query.preAggregations?.preAggregationsDescription(); - // console.log(preAggregationsDescription); - await Promise.all(preAggregationsDescription.map( async desc => { if (desc.partitionGranularity) { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_query_options.rs b/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_query_options.rs index 4d7e20ab83d8e..da4ebe8da8c6b 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_query_options.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_query_options.rs @@ -61,6 +61,8 @@ pub struct BaseQueryOptionsStatic { pub ungrouped: Option, #[serde(rename = "exportAnnotatedSql")] pub export_annotated_sql: bool, + #[serde(rename = "preAggregationQuery")] + pub pre_aggregation_query: Option, } #[nativebridge::native_bridge(BaseQueryOptionsStatic)] diff --git a/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_tools.rs b/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_tools.rs index 8823f8f507bfb..3bfdb34a29907 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_tools.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_tools.rs @@ -1,6 +1,7 @@ use super::base_query_options::FilterItem; use super::filter_group::{FilterGroup, NativeFilterGroup}; use super::filter_params::{FilterParams, NativeFilterParams}; +use super::pre_aggregation_obj::{NativePreAggregationObj, PreAggregationObj}; use super::security_context::{NativeSecurityContext, SecurityContext}; use super::sql_templates_render::{NativeSqlTemplatesRender, SqlTemplatesRender}; use super::sql_utils::{NativeSqlUtils, SqlUtils}; @@ -58,4 +59,15 @@ pub trait BaseTools { source: String, origin: String, ) -> Result; + + fn get_pre_aggregation_by_name( + &self, + cube_name: String, + name: String, + ) -> Result, CubeError>; + fn pre_aggregation_table_name( + &self, + cube_name: String, + name: String, + ) -> Result; //TODO move to rust } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/evaluator.rs b/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/evaluator.rs index 64518acd4346d..c66a41d485321 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/evaluator.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/evaluator.rs @@ -4,10 +4,14 @@ use super::dimension_definition::{ }; use super::measure_definition::{MeasureDefinition, NativeMeasureDefinition}; use super::member_sql::{MemberSql, NativeMemberSql}; +use super::pre_aggregation_description::{ + NativePreAggregationDescription, PreAggregationDescription, +}; use super::segment_definition::{NativeSegmentDefinition, SegmentDefinition}; use cubenativeutils::wrappers::serializer::{ NativeDeserialize, NativeDeserializer, NativeSerialize, }; +use cubenativeutils::wrappers::NativeArray; use cubenativeutils::wrappers::NativeContextHolder; use cubenativeutils::wrappers::NativeObjectHandle; use cubenativeutils::CubeError; @@ -51,4 +55,9 @@ pub trait CubeEvaluator { sql: Rc, ) -> Result, CubeError>; fn resolve_granularity(&self, path: Vec) -> Result; + #[nbridge(vec)] + fn pre_aggregations_for_cube_as_array( + &self, + cube_name: String, + ) -> Result>, CubeError>; } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/mod.rs b/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/mod.rs index 1788defaab761..995438cb995b1 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/mod.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/mod.rs @@ -21,6 +21,8 @@ pub mod member_expression; pub mod member_order_by; pub mod member_sql; pub mod options_member; +pub mod pre_aggregation_description; +pub mod pre_aggregation_obj; pub mod security_context; pub mod segment_definition; pub mod sql_templates_render; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/pre_aggregation_description.rs b/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/pre_aggregation_description.rs new file mode 100644 index 0000000000000..72ffd10662151 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/pre_aggregation_description.rs @@ -0,0 +1,35 @@ +use super::member_sql::{MemberSql, NativeMemberSql}; +use cubenativeutils::wrappers::serializer::{ + NativeDeserialize, NativeDeserializer, NativeSerialize, +}; +use cubenativeutils::wrappers::NativeContextHolder; +use cubenativeutils::wrappers::NativeObjectHandle; +use cubenativeutils::CubeError; +use serde::{Deserialize, Serialize}; +use std::any::Any; +use std::rc::Rc; + +#[derive(Serialize, Deserialize, Debug)] +pub struct PreAggregationDescriptionStatic { + pub name: String, + #[serde(rename = "type")] + pub pre_aggregation_type: String, + pub granularity: Option, + #[serde(rename = "sqlAlias")] + pub sql_alias: Option, + pub external: Option, + #[serde(rename = "allowNonStrictDateRangeMatch")] + pub allow_non_strict_date_range_match: Option, +} + +#[nativebridge::native_bridge(PreAggregationDescriptionStatic)] +pub trait PreAggregationDescription { + #[nbridge(field, optional)] + fn measure_references(&self) -> Result>, CubeError>; + + #[nbridge(field, optional)] + fn dimension_references(&self) -> Result>, CubeError>; + + #[nbridge(field, optional)] + fn time_dimension_reference(&self) -> Result>, CubeError>; +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/pre_aggregation_obj.rs b/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/pre_aggregation_obj.rs new file mode 100644 index 0000000000000..39ae16b04ab1c --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/pre_aggregation_obj.rs @@ -0,0 +1,21 @@ +use cubenativeutils::wrappers::serializer::{NativeDeserialize, NativeSerialize}; +use cubenativeutils::wrappers::NativeContextHolder; +use cubenativeutils::wrappers::NativeObjectHandle; +use cubenativeutils::CubeError; +use serde::{Deserialize, Serialize}; +use std::any::Any; +use std::rc::Rc; + +#[derive(Serialize, Deserialize, Debug)] +pub struct PreAggregationObjStatic { + #[serde(rename = "tableName")] + pub table_name: Option, + #[serde(rename = "preAggregationName")] + pub pre_aggregation_name: Option, + pub cube: Option, + #[serde(rename = "preAggregationId")] + pub pre_aggregation_id: Option, +} + +#[nativebridge::native_bridge(PreAggregationObjStatic)] +pub trait PreAggregationObj {} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/aggregate_multiplied_subquery.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/aggregate_multiplied_subquery.rs index f0d9ed72f4a3a..64ae7a8af8c67 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/aggregate_multiplied_subquery.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/aggregate_multiplied_subquery.rs @@ -1,6 +1,5 @@ use super::pretty_print::*; use super::*; -use crate::planner::BaseCube; use std::rc::Rc; pub enum AggregateMultipliedSubquerySouce { @@ -12,7 +11,7 @@ pub struct AggregateMultipliedSubquery { pub schema: Rc, pub dimension_subqueries: Vec>, pub keys_subquery: Rc, - pub pk_cube: Rc, //FIXME may be duplication with information in keys_subquery + pub pk_cube: Rc, //FIXME may be duplication with information in keys_subquery pub source: Rc, } @@ -34,7 +33,9 @@ impl PrettyPrint for AggregateMultipliedSubquery { result.println("source:", &state); match self.source.as_ref() { AggregateMultipliedSubquerySouce::Cube => { - result.println(&format!("Cube: {}", self.pk_cube.name()), &details_state); + result.println("Cube:", &details_state); + self.pk_cube + .pretty_print(result, &details_state.new_level()); } AggregateMultipliedSubquerySouce::MeasureSubquery(measure_subquery) => { result.println( diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/cube.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/cube.rs index edc4dd14073d4..73a2ab6bcc8fc 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/cube.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/cube.rs @@ -1,9 +1,32 @@ +use super::*; use crate::planner::BaseCube; use std::rc::Rc; + +#[derive(Clone)] +pub struct OriginalSqlPreAggregation { + pub name: String, +} + +impl PrettyPrint for OriginalSqlPreAggregation { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println(&format!("OriginalSqlPreAggregation: {}", self.name), state); + } +} + #[derive(Clone)] pub struct Cube { pub name: String, pub cube: Rc, + pub original_sql_pre_aggregation: Option, +} + +impl PrettyPrint for Cube { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println(&format!("Cube: {}", self.name), state); + if let Some(original_sql_pre_aggregation) = &self.original_sql_pre_aggregation { + original_sql_pre_aggregation.pretty_print(result, state); + } + } } impl Cube { @@ -11,6 +34,18 @@ impl Cube { Rc::new(Self { name: cube.name().clone(), cube, + original_sql_pre_aggregation: None, + }) + } + + pub fn with_original_sql_pre_aggregation( + self: Rc, + original_sql_pre_aggregation: OriginalSqlPreAggregation, + ) -> Rc { + Rc::new(Self { + name: self.name.clone(), + cube: self.cube.clone(), + original_sql_pre_aggregation: Some(original_sql_pre_aggregation), }) } } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate.rs index 6d2a17b877f7c..ea13cdc5df52e 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate.rs @@ -4,27 +4,35 @@ use std::rc::Rc; pub struct MultiStageSubqueryRef { pub name: String, + pub symbols: Vec>, } impl PrettyPrint for MultiStageSubqueryRef { fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { result.println(&format!("MultiStageSubqueryRef: {}", self.name), state); + let state = state.new_level(); + result.println( + &format!("symbols: {}", print_symbols(&self.symbols)), + &state, + ); } } -pub enum FullKeyAggregateSource { +#[derive(Clone)] +pub enum ResolvedMultipliedMeasures { ResolveMultipliedMeasures(Rc), - MultiStageSubqueryRef(Rc), + PreAggregation(Rc), } -impl PrettyPrint for FullKeyAggregateSource { +impl PrettyPrint for ResolvedMultipliedMeasures { fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { match self { Self::ResolveMultipliedMeasures(resolve_multiplied_measures) => { resolve_multiplied_measures.pretty_print(result, state); } - Self::MultiStageSubqueryRef(subquery_ref) => { - subquery_ref.pretty_print(result, state); + Self::PreAggregation(pre_aggregation) => { + result.println("PreAggregation query:", state); + pre_aggregation.pretty_print(result, state); } } } @@ -33,7 +41,8 @@ impl PrettyPrint for FullKeyAggregateSource { pub struct FullKeyAggregate { pub join_dimensions: Vec>, pub use_full_join_and_coalesce: bool, - pub sources: Vec, + pub multiplied_measures_resolver: Option, + pub multi_stage_subquery_refs: Vec>, } impl PrettyPrint for FullKeyAggregate { @@ -52,9 +61,16 @@ impl PrettyPrint for FullKeyAggregate { ), &state, ); - result.println("sources:", &state); - for source in self.sources.iter() { - source.pretty_print(result, &details_state); + if let Some(resolve_multiplied_measures) = &self.multiplied_measures_resolver { + result.println("multiplied measures resolver:", &state); + resolve_multiplied_measures.pretty_print(result, &details_state); + } + + if !self.multi_stage_subquery_refs.is_empty() { + result.println("multi_stage_subquery_refs:", &state); + for subquery_ref in self.multi_stage_subquery_refs.iter() { + subquery_ref.pretty_print(result, &details_state); + } } } } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/join.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/join.rs index baac0baaf49ef..17b9b39a5ffa2 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/join.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/join.rs @@ -12,7 +12,9 @@ pub struct CubeJoinItem { impl PrettyPrint for CubeJoinItem { fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { - result.println(&format!("CubeJoinItem for cube: {}", self.cube.name), state); + result.println(&format!("CubeJoinItem: "), state); + let details_state = state.new_level(); + self.cube.pretty_print(result, &details_state); } } @@ -64,7 +66,9 @@ impl PrettyPrint for LogicalJoin { fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { result.println(&format!("Join: "), state); let state = state.new_level(); - result.println(&format!("root: {}", self.root.name), &state); + let details_state = state.new_level(); + result.println(&format!("root: "), &state); + self.root.pretty_print(result, &details_state); result.println(&format!("joins: "), &state); let state = state.new_level(); for join in self.joins.iter() { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/mod.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/mod.rs index 724e1a8f79ac7..09860945e97e7 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/mod.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/mod.rs @@ -8,7 +8,9 @@ mod join; mod keys_subquery; mod measure_subquery; mod multistage; -mod pretty_print; +pub mod optimizers; +mod pre_aggregation; +pub mod pretty_print; mod query; mod regular_measures_query; mod resolve_multiplied_measures; @@ -25,6 +27,8 @@ pub use join::*; pub use keys_subquery::*; pub use measure_subquery::*; pub use multistage::*; +pub use optimizers::*; +pub use pre_aggregation::*; pub use pretty_print::*; pub use query::*; pub use regular_measures_query::*; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/rolling_window.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/rolling_window.rs index 7a3e419424519..2d4b5f7699351 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/rolling_window.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/rolling_window.rs @@ -59,8 +59,8 @@ pub struct MultiStageRollingWindow { pub rolling_time_dimension: Rc, pub rolling_window: MultiStageRollingWindowType, pub order_by: Vec, - pub time_series_input: String, - pub measure_input: String, + pub time_series_input: MultiStageSubqueryRef, + pub measure_input: MultiStageSubqueryRef, pub time_dimension_in_measure_input: Rc, //time dimension in measure input can have different granularity } @@ -93,11 +93,10 @@ impl PrettyPrint for MultiStageRollingWindow { ); } } - result.println( - &format!("time_series_input: {}", self.time_series_input), - &state, - ); - result.println(&format!("measure_input: {}", self.measure_input), &state); + result.println("time_series_input:", &state); + self.time_series_input.pretty_print(result, &details_state); + result.println("measure_input:", &state); + self.measure_input.pretty_print(result, &details_state); result.println( &format!( "time_dimension_in_measure_input: {}", diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/common/cube_names_collector.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/common/cube_names_collector.rs new file mode 100644 index 0000000000000..91d2d69311514 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/common/cube_names_collector.rs @@ -0,0 +1,160 @@ +use crate::logical_plan::*; +use cubenativeutils::CubeError; +use itertools::Itertools; +use std::collections::HashSet; +use std::rc::Rc; + +pub struct CubeNamesCollector { + cube_names: HashSet, +} + +impl CubeNamesCollector { + pub fn new() -> Self { + Self { + cube_names: HashSet::new(), + } + } + + pub fn collect(&mut self, query: &Query) -> Result<(), CubeError> { + match query { + Query::SimpleQuery(query) => self.collect_from_simple_query(query), + Query::FullKeyAggregateQuery(query) => { + self.collect_from_full_key_aggregate_query(query) + } + } + } + + pub fn result(self) -> Vec { + self.cube_names.into_iter().collect_vec() + } + + fn collect_from_simple_query(&mut self, query: &SimpleQuery) -> Result<(), CubeError> { + self.collect_from_simple_query_source(&query.source)?; + self.collect_from_dimension_subqueries(&query.dimension_subqueries)?; + Ok(()) + } + + fn collect_from_full_key_aggregate_query( + &mut self, + query: &FullKeyAggregateQuery, + ) -> Result<(), CubeError> { + self.collect_from_full_key_aggregate(&query.source)?; + for member in query.multistage_members.iter() { + self.collect_from_multi_stage_member(member)?; + } + Ok(()) + } + + fn collect_from_multi_stage_member( + &mut self, + member: &Rc, + ) -> Result<(), CubeError> { + match &member.member_type { + MultiStageMemberLogicalType::LeafMeasure(leaf_measure) => { + self.collect_from_multi_stage_leaf_measure(leaf_measure) + } + _ => Ok(()), + } + } + + fn collect_from_multi_stage_leaf_measure( + &mut self, + leaf_measure: &MultiStageLeafMeasure, + ) -> Result<(), CubeError> { + self.collect(&leaf_measure.query)?; + Ok(()) + } + + fn collect_from_measure_subquery( + &mut self, + subquery: &Rc, + ) -> Result<(), CubeError> { + self.collect_from_logical_join(&subquery.source)?; + self.collect_from_dimension_subqueries(&subquery.dimension_subqueries)?; + Ok(()) + } + + fn collect_from_full_key_aggregate( + &mut self, + full_key_aggregate: &Rc, + ) -> Result<(), CubeError> { + if let Some(resolve_multiplied_measures) = &full_key_aggregate.multiplied_measures_resolver + { + self.collect_from_resolved_multiplied_measures(resolve_multiplied_measures)?; + } + Ok(()) + } + + fn collect_from_resolved_multiplied_measures( + &mut self, + resolved_multiplied_measures: &ResolvedMultipliedMeasures, + ) -> Result<(), CubeError> { + match resolved_multiplied_measures { + ResolvedMultipliedMeasures::ResolveMultipliedMeasures(resolve_multiplied_measures) => { + self.collect_from_multiplied_measures_resolver(resolve_multiplied_measures)? + } + ResolvedMultipliedMeasures::PreAggregation(_) => {} + } + Ok(()) + } + fn collect_from_multiplied_measures_resolver( + &mut self, + resolver: &ResolveMultipliedMeasures, + ) -> Result<(), CubeError> { + for regular_subquery in resolver.regular_measure_subqueries.iter() { + self.collect_from_simple_query(®ular_subquery)?; + } + for aggregate_multiplied_subquery in resolver.aggregate_multiplied_subqueries.iter() { + self.collect_from_aggregate_multiplied_subquery(&aggregate_multiplied_subquery)?; + } + Ok(()) + } + + fn collect_from_aggregate_multiplied_subquery( + &mut self, + subquery: &Rc, + ) -> Result<(), CubeError> { + self.collect_from_logical_join(&subquery.keys_subquery.source)?; + match subquery.source.as_ref() { + AggregateMultipliedSubquerySouce::Cube => { + self.cube_names.insert(subquery.pk_cube.name.clone()); + } + AggregateMultipliedSubquerySouce::MeasureSubquery(measure_subquery) => { + self.collect_from_measure_subquery(&measure_subquery)?; + } + } + Ok(()) + } + + fn collect_from_simple_query_source( + &mut self, + source: &SimpleQuerySource, + ) -> Result<(), CubeError> { + match source { + SimpleQuerySource::LogicalJoin(join) => self.collect_from_logical_join(join), + SimpleQuerySource::PreAggregation(_) => Ok(()), + } + } + + fn collect_from_logical_join(&mut self, join: &Rc) -> Result<(), CubeError> { + self.cube_names.insert(join.root.name.clone()); + for join_item in join.joins.iter() { + match join_item { + LogicalJoinItem::CubeJoinItem(cube_join_item) => { + self.cube_names.insert(cube_join_item.cube.name.clone()); + } + } + } + Ok(()) + } + + fn collect_from_dimension_subqueries( + &mut self, + dimension_subqueries: &Vec>, + ) -> Result<(), CubeError> { + for subquery in dimension_subqueries.iter() { + self.collect(&subquery.query)?; + } + Ok(()) + } +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/common/helper.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/common/helper.rs new file mode 100644 index 0000000000000..ee6abebb8e98b --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/common/helper.rs @@ -0,0 +1,54 @@ +use itertools::Itertools; + +use crate::logical_plan::*; +use crate::plan::FilterItem; +use crate::planner::sql_evaluator::MemberSymbol; +use std::rc::Rc; + +pub struct OptimizerHelper; + +impl OptimizerHelper { + pub fn new() -> Self { + Self + } + + pub fn all_measures( + &self, + schema: &Rc, + filters: &Rc, + ) -> Vec> { + let mut result = schema.measures.clone(); + self.fill_members_from_filters(&filters.measures_filter, &mut result); + result.into_iter().unique_by(|s| s.full_name()).collect() + } + + fn fill_members_from_filters( + &self, + filters: &Vec, + members: &mut Vec>, + ) { + for item in filters.iter() { + self.fill_members_from_filter_item(item, members); + } + } + + fn fill_members_from_filter_item( + &self, + item: &FilterItem, + members: &mut Vec>, + ) { + match item { + FilterItem::Group(group) => { + for item in group.items.iter() { + self.fill_members_from_filter_item(item, members) + } + } + FilterItem::Item(item) => { + members.push(item.member_evaluator().clone()); + } + FilterItem::Segment(segment) => { + members.push(segment.member_evaluator().clone()); + } + } + } +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/common/mod.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/common/mod.rs new file mode 100644 index 0000000000000..e2656873008f5 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/common/mod.rs @@ -0,0 +1,5 @@ +mod cube_names_collector; +mod helper; + +pub use cube_names_collector::*; +pub use helper::*; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/mod.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/mod.rs new file mode 100644 index 0000000000000..14e1a9da2eebe --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/mod.rs @@ -0,0 +1,5 @@ +mod common; +mod pre_aggregation; + +pub use common::*; +pub use pre_aggregation::*; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/compiled_pre_aggregation.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/compiled_pre_aggregation.rs new file mode 100644 index 0000000000000..e47d5bfd2805b --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/compiled_pre_aggregation.rs @@ -0,0 +1,137 @@ +use crate::cube_bridge::member_sql::MemberSql; +use crate::cube_bridge::pre_aggregation_description::PreAggregationDescription; +use crate::planner::query_tools::QueryTools; +use crate::planner::sql_evaluator::MemberSymbol; +use cubenativeutils::CubeError; +use std::fmt::Debug; +use std::rc::Rc; +#[derive(Clone)] +pub struct CompiledPreAggregation { + pub cube_name: String, + pub name: String, + pub granularity: Option, + pub external: Option, + pub measures: Vec>, + pub dimensions: Vec>, + pub time_dimensions: Vec<(Rc, Option)>, + pub allow_non_strict_date_range_match: bool, +} + +impl Debug for CompiledPreAggregation { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CompiledPreAggregation") + .field("cube_name", &self.cube_name) + .field("name", &self.name) + .field("granularity", &self.granularity) + .field("external", &self.external) + .field("measures", &self.measures) + .field("dimensions", &self.dimensions) + .field("time_dimensions", &self.time_dimensions) + .field( + "allow_non_strict_date_range_match", + &self.allow_non_strict_date_range_match, + ) + .finish() + } +} + +impl CompiledPreAggregation { + pub fn try_new( + query_tools: Rc, + cube_name: &String, + description: Rc, + ) -> Result, CubeError> { + let static_data = description.static_data(); + let measures = if let Some(refs) = description.measure_references()? { + Self::symbols_from_ref(query_tools.clone(), cube_name, refs, Self::check_is_measure)? + } else { + Vec::new() + }; + let dimensions = if let Some(refs) = description.dimension_references()? { + Self::symbols_from_ref( + query_tools.clone(), + cube_name, + refs, + Self::check_is_dimension, + )? + } else { + Vec::new() + }; + let time_dimensions = if let Some(refs) = description.time_dimension_reference()? { + let dims = Self::symbols_from_ref( + query_tools.clone(), + cube_name, + refs, + Self::check_is_time_dimension, + )?; + /* if dims.len() != 1 { + return Err(CubeError::user(format!( + "Pre aggregation should contains only one time dimension" + ))); + } */ + vec![(dims[0].clone(), static_data.granularity.clone())] //TODO remove unwrap + } else { + Vec::new() + }; + let allow_non_strict_date_range_match = description + .static_data() + .allow_non_strict_date_range_match + .unwrap_or(false); + let res = Rc::new(Self { + name: static_data.name.clone(), + cube_name: cube_name.clone(), + granularity: static_data.granularity.clone(), + external: static_data.external, + measures, + dimensions, + time_dimensions, + allow_non_strict_date_range_match, + }); + Ok(res) + } + + fn symbols_from_ref Result<(), CubeError>>( + query_tools: Rc, + cube_name: &String, + ref_func: Rc, + check_type_fn: F, + ) -> Result>, CubeError> { + let evaluator_compiler_cell = query_tools.evaluator_compiler().clone(); + let mut evaluator_compiler = evaluator_compiler_cell.borrow_mut(); + let sql_call = evaluator_compiler.compile_sql_call(cube_name, ref_func)?; + let mut res = Vec::new(); + for symbol in sql_call.get_dependencies().iter() { + check_type_fn(&symbol)?; + res.push(symbol.clone()); + } + Ok(res) + } + + fn check_is_measure(symbol: &MemberSymbol) -> Result<(), CubeError> { + symbol + .as_measure() + .map_err(|_| CubeError::user(format!("Pre-aggregation measure must be a measure")))?; + Ok(()) + } + + fn check_is_dimension(symbol: &MemberSymbol) -> Result<(), CubeError> { + symbol.as_dimension().map_err(|_| { + CubeError::user(format!("Pre-aggregation dimension must be a dimension")) + })?; + Ok(()) + } + + fn check_is_time_dimension(symbol: &MemberSymbol) -> Result<(), CubeError> { + let dimension = symbol.as_dimension().map_err(|_| { + CubeError::user(format!( + "Pre-aggregation time dimension must be a dimension" + )) + })?; + if dimension.dimension_type() != "time" { + return Err(CubeError::user(format!( + "Pre-aggregation time dimension must be a dimension" + ))); + } + Ok(()) + } +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/dimension_matcher.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/dimension_matcher.rs new file mode 100644 index 0000000000000..f61e941dedd2d --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/dimension_matcher.rs @@ -0,0 +1,257 @@ +use super::CompiledPreAggregation; +use super::MatchState; +use crate::plan::filter::FilterGroupOperator; +use crate::plan::FilterItem; +use crate::planner::filter::BaseFilter; +use crate::planner::query_tools::QueryTools; +use crate::planner::sql_evaluator::DimensionSymbol; +use crate::planner::sql_evaluator::MemberSymbol; +use crate::planner::sql_evaluator::TimeDimensionSymbol; +use crate::planner::GranularityHelper; +use cubenativeutils::CubeError; +use std::collections::HashMap; +use std::rc::Rc; + +pub struct DimensionMatcher<'a> { + query_tools: Rc, + pre_aggregation: &'a CompiledPreAggregation, + pre_aggregation_dimensions: HashMap, + pre_aggregation_time_dimensions: HashMap, bool)>, + result: MatchState, +} + +impl<'a> DimensionMatcher<'a> { + pub fn new(query_tools: Rc, pre_aggregation: &'a CompiledPreAggregation) -> Self { + let pre_aggregation_dimensions = pre_aggregation + .dimensions + .iter() + .map(|d| (d.full_name(), false)) + .collect(); + let pre_aggregation_time_dimensions = pre_aggregation + .time_dimensions + .iter() + .map(|(dim, granularity)| (dim.full_name(), (granularity.clone(), false))) + .collect::>(); + Self { + query_tools, + pre_aggregation, + pre_aggregation_dimensions, + pre_aggregation_time_dimensions, + result: MatchState::Full, + } + } + + pub fn try_match( + &mut self, + dimensions: &Vec>, + time_dimensions: &Vec>, + filters: &Vec, + time_dimension_filters: &Vec, + segments: &Vec, + ) -> Result<(), CubeError> { + for dimension in dimensions.iter() { + let dimension_match = self.try_match_symbol(dimension, true)?; + self.result = self.result.combine(&dimension_match); + if self.result == MatchState::NotMatched { + return Ok(()); + } + } + for time_dimension in time_dimensions.iter() { + let time_dimension_match = self.try_match_symbol(time_dimension, true)?; + self.result = self.result.combine(&time_dimension_match); + if self.result == MatchState::NotMatched { + return Ok(()); + } + } + + for filter in filters.iter() { + let filter_match = self.try_match_filter_item(filter, true)?; + self.result = self.result.combine(&filter_match); + if self.result == MatchState::NotMatched { + return Ok(()); + } + } + + for filter in time_dimension_filters.iter() { + let filter_match = self.try_match_filter_item(filter, true)?; + self.result = self.result.combine(&filter_match); + if self.result == MatchState::NotMatched { + return Ok(()); + } + } + + for segment in segments.iter() { + let segment_match = self.try_match_filter_item(segment, true)?; + self.result = self.result.combine(&segment_match); + if self.result == MatchState::NotMatched { + return Ok(()); + } + } + Ok(()) + } + + pub fn result(mut self) -> MatchState { + let dimension_coverage_result = if self.pre_aggregation_dimensions.values().all(|v| *v) { + MatchState::Full + } else { + MatchState::Partial + }; + self.result = self.result.combine(&dimension_coverage_result); + let time_dimension_coverage_result = + if self.pre_aggregation_time_dimensions.values().all(|v| v.1) { + MatchState::Full + } else { + MatchState::Partial + }; + self.result = self.result.combine(&time_dimension_coverage_result); + self.result + } + + fn try_match_symbol( + &mut self, + symbol: &Rc, + add_to_matched_dimension: bool, + ) -> Result { + match symbol.as_ref() { + MemberSymbol::Dimension(dimension) => { + self.try_match_dimension(dimension, add_to_matched_dimension) + } + MemberSymbol::TimeDimension(time_dimension) => { + self.try_match_time_dimension(time_dimension, add_to_matched_dimension) + } + MemberSymbol::MemberExpression(_member_expression) => Ok(MatchState::NotMatched), //TODO We don't allow to use pre-aggregations with member expressions before SQL API is ready for it + _ => Ok(MatchState::NotMatched), + } + } + + fn try_match_dimension( + &mut self, + dimension: &DimensionSymbol, + add_to_matched_dimension: bool, + ) -> Result { + if let Some(found) = self + .pre_aggregation_dimensions + .get_mut(&dimension.full_name()) + { + if add_to_matched_dimension { + *found = true; + } + Ok(MatchState::Full) + } else if dimension.owned_by_cube() { + Ok(MatchState::NotMatched) + } else { + let dependencies = dimension.get_dependencies(); + if dependencies.is_empty() { + Ok(MatchState::NotMatched) + } else { + let mut result = if dimension.is_reference() { + MatchState::Full + } else { + MatchState::Partial + }; + for dep in dimension.get_dependencies() { + let dep_match = self.try_match_symbol(&dep, add_to_matched_dimension)?; + if dep_match == MatchState::NotMatched { + return Ok(MatchState::NotMatched); + } + result = result.combine(&dep_match); + } + Ok(result) + } + } + } + + fn try_match_time_dimension( + &mut self, + time_dimension: &TimeDimensionSymbol, + add_to_matched_dimension: bool, + ) -> Result { + let granularity = if self.pre_aggregation.allow_non_strict_date_range_match { + time_dimension.granularity().clone() + } else { + time_dimension.rollup_granularity(self.query_tools.clone())? + }; + let base_symbol_name = time_dimension.base_symbol().full_name(); + if let Some(found) = self + .pre_aggregation_time_dimensions + .get_mut(&base_symbol_name) + { + if add_to_matched_dimension { + found.1 = true; + } + let pre_aggr_granularity = &found.0; + if granularity.is_none() || pre_aggr_granularity == &granularity { + Ok(MatchState::Full) + } else if pre_aggr_granularity.is_none() + || GranularityHelper::is_predefined_granularity( + pre_aggr_granularity.as_ref().unwrap(), + ) + { + let min_granularity = + GranularityHelper::min_granularity(&granularity, &pre_aggr_granularity)?; + if &min_granularity == pre_aggr_granularity { + Ok(MatchState::Partial) + } else { + Ok(MatchState::NotMatched) + } + } else { + Ok(MatchState::NotMatched) //TODO Custom granularities!!! + } + } else { + if time_dimension.owned_by_cube() { + Ok(MatchState::NotMatched) + } else { + let mut result = if time_dimension.is_reference() { + MatchState::Full + } else { + MatchState::Partial + }; + for dep in time_dimension.get_dependencies_as_time_dimensions() { + let dep_match = self.try_match_symbol(&dep, add_to_matched_dimension)?; + if dep_match == MatchState::NotMatched { + return Ok(MatchState::NotMatched); + } + result = result.combine(&dep_match); + } + Ok(result) + } + } + } + + fn try_match_filter_item( + &mut self, + filter_item: &FilterItem, + add_to_matched_dimension: bool, + ) -> Result { + match filter_item { + FilterItem::Item(filter) => self.try_match_filter(filter, add_to_matched_dimension), + FilterItem::Group(group) => { + let add_to_matched_dimension = + add_to_matched_dimension && group.operator == FilterGroupOperator::And; + let mut result = MatchState::Full; + for item in group.items.iter() { + result = result + .combine(&self.try_match_filter_item(item, add_to_matched_dimension)?); + } + Ok(result) + } + FilterItem::Segment(segment) => { + self.try_match_symbol(&segment.member_evaluator(), add_to_matched_dimension) + } + } + } + + fn try_match_filter( + &mut self, + filter: &Rc, + add_to_matched_dimension: bool, + ) -> Result { + let symbol = if let Some(time_dimension) = filter.time_dimension_symbol() { + time_dimension + } else { + filter.member_evaluator().clone() + }; + let add_to_matched_dimension = add_to_matched_dimension && filter.is_single_value_equal(); + self.try_match_symbol(&symbol, add_to_matched_dimension) + } +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/measure_matcher.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/measure_matcher.rs new file mode 100644 index 0000000000000..e5806db441222 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/measure_matcher.rs @@ -0,0 +1,50 @@ +use super::CompiledPreAggregation; +use crate::planner::sql_evaluator::MemberSymbol; +use cubenativeutils::CubeError; +use std::collections::HashSet; +use std::rc::Rc; +pub struct MeasureMatcher { + only_addictive: bool, + pre_aggregation_measures: HashSet, +} + +impl MeasureMatcher { + pub fn new(pre_aggregation: &CompiledPreAggregation, only_addictive: bool) -> Self { + let pre_aggregation_measures = pre_aggregation + .measures + .iter() + .map(|m| m.full_name()) + .collect(); + Self { + only_addictive, + pre_aggregation_measures, + } + } + + pub fn try_match(&self, symbol: &Rc) -> Result { + match symbol.as_ref() { + MemberSymbol::Measure(measure) => { + if self.pre_aggregation_measures.contains(&measure.full_name()) { + if !self.only_addictive || measure.is_addictive() { + return Ok(true); + } + } + } + MemberSymbol::MemberExpression(_) => { + return Ok(false); //TODO We not allow to use pre-aggregations with member expressions before sqlapi ready for it + } + _ => return Ok(false), + } + + if symbol.get_dependencies().is_empty() { + return Ok(false); + } + + for dep in symbol.get_dependencies() { + if !self.try_match(&dep)? { + return Ok(false); + } + } + Ok(true) + } +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/mod.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/mod.rs new file mode 100644 index 0000000000000..a3fbe777d9fe0 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/mod.rs @@ -0,0 +1,13 @@ +mod compiled_pre_aggregation; +mod dimension_matcher; +mod measure_matcher; +mod optimizer; +mod original_sql_collector; +mod original_sql_optimizer; + +pub use compiled_pre_aggregation::*; +use dimension_matcher::*; +use measure_matcher::*; +pub use optimizer::*; +pub use original_sql_collector::*; +pub use original_sql_optimizer::*; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs new file mode 100644 index 0000000000000..72a44fe013309 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs @@ -0,0 +1,532 @@ +use super::*; +use crate::cube_bridge::pre_aggregation_obj::PreAggregationObj; +use crate::logical_plan::*; +use crate::plan::FilterItem; +use crate::planner::query_tools::QueryTools; +use crate::planner::sql_evaluator::MemberSymbol; +use cubenativeutils::CubeError; +use std::collections::{HashMap, HashSet}; +use std::rc::Rc; + +#[derive(Clone, Debug, PartialEq)] +pub enum MatchState { + Partial, + Full, + NotMatched, +} + +impl MatchState { + pub fn combine(&self, other: &MatchState) -> MatchState { + if matches!(self, MatchState::NotMatched) || matches!(other, MatchState::NotMatched) { + return MatchState::NotMatched; + } + if matches!(self, MatchState::Partial) || matches!(other, MatchState::Partial) { + return MatchState::Partial; + } + return MatchState::Full; + } +} + +pub struct PreAggregationOptimizer { + query_tools: Rc, + used_pre_aggregations: HashMap<(String, String), Rc>, +} + +impl PreAggregationOptimizer { + pub fn new(query_tools: Rc) -> Self { + Self { + query_tools, + used_pre_aggregations: HashMap::new(), + } + } + + pub fn try_optimize(&mut self, plan: Rc) -> Result>, CubeError> { + let mut cube_names_collector = CubeNamesCollector::new(); + cube_names_collector.collect(&plan)?; + let cube_names = cube_names_collector.result(); + + let mut compiled_pre_aggregations = Vec::new(); + for cube_name in cube_names.iter() { + let pre_aggregations = self + .query_tools + .cube_evaluator() + .pre_aggregations_for_cube_as_array(cube_name.clone())?; + for pre_aggregation in pre_aggregations.iter() { + let compiled = CompiledPreAggregation::try_new( + self.query_tools.clone(), + cube_name, + pre_aggregation.clone(), + )?; + compiled_pre_aggregations.push(compiled); + } + } + + for pre_aggregation in compiled_pre_aggregations.iter() { + let new_query = self.try_rewrite_query(plan.clone(), pre_aggregation)?; + if new_query.is_some() { + return Ok(new_query); + } + } + + Ok(None) + } + + pub fn get_used_pre_aggregations(&self) -> Vec> { + self.used_pre_aggregations.values().cloned().collect() + } + + fn try_rewrite_query( + &mut self, + query: Rc, + pre_aggregation: &Rc, + ) -> Result>, CubeError> { + match query.as_ref() { + Query::SimpleQuery(query) => self.try_rewrite_simple_query(query, pre_aggregation), + Query::FullKeyAggregateQuery(query) => { + self.try_rewrite_full_key_aggregate_query(query, pre_aggregation) + } + } + } + + fn try_rewrite_simple_query( + &mut self, + query: &SimpleQuery, + pre_aggregation: &Rc, + ) -> Result>, CubeError> { + if self.is_schema_and_filters_match(&query.schema, &query.filter, pre_aggregation)? { + let mut new_query = SimpleQuery::clone(&query); + new_query.source = SimpleQuerySource::PreAggregation( + self.make_pre_aggregation_source(pre_aggregation)?, + ); + Ok(Some(Rc::new(Query::SimpleQuery(new_query)))) + } else { + Ok(None) + } + } + + fn try_rewrite_full_key_aggregate_query( + &mut self, + query: &FullKeyAggregateQuery, + pre_aggregation: &Rc, + ) -> Result>, CubeError> { + if !query.multistage_members.is_empty() { + return self + .try_rewrite_full_key_aggregate_query_with_multi_stages(query, pre_aggregation); + } + + if self.is_schema_and_filters_match(&query.schema, &query.filter, pre_aggregation)? { + let source = SimpleQuerySource::PreAggregation( + self.make_pre_aggregation_source(pre_aggregation)?, + ); + let new_query = SimpleQuery { + schema: query.schema.clone(), + dimension_subqueries: vec![], + filter: query.filter.clone(), + offset: query.offset, + limit: query.limit, + ungrouped: query.ungrouped, + order_by: query.order_by.clone(), + source, + }; + Ok(Some(Rc::new(Query::SimpleQuery(new_query)))) + } else { + Ok(None) + } + } + + fn try_rewrite_full_key_aggregate_query_with_multi_stages( + &mut self, + query: &FullKeyAggregateQuery, + pre_aggregation: &Rc, + ) -> Result>, CubeError> { + let used_multi_stage_symbols = self.collect_multi_stage_symbols(&query.source); + let mut multi_stages_queries = query.multistage_members.clone(); + let mut rewrited_multistage = multi_stages_queries + .iter() + .map(|query| (query.name.clone(), false)) + .collect::>(); + + for (_, multi_stage_name) in used_multi_stage_symbols.iter() { + self.try_rewrite_multistage( + multi_stage_name, + &mut multi_stages_queries, + &mut rewrited_multistage, + pre_aggregation, + )?; + } + let all_multi_stage_rewrited = rewrited_multistage.values().all(|v| *v); + if !all_multi_stage_rewrited { + return Ok(None); + } + + let source = if let Some(resolver_multiplied_measures) = + &query.source.multiplied_measures_resolver + { + if let ResolvedMultipliedMeasures::ResolveMultipliedMeasures( + resolver_multiplied_measures, + ) = resolver_multiplied_measures + { + if self.is_schema_and_filters_match( + &resolver_multiplied_measures.schema, + &resolver_multiplied_measures.filter, + &pre_aggregation, + )? { + let pre_aggregation_source = + self.make_pre_aggregation_source(pre_aggregation)?; + + let pre_aggregation_query = SimpleQuery { + schema: resolver_multiplied_measures.schema.clone(), + dimension_subqueries: vec![], + filter: resolver_multiplied_measures.filter.clone(), + offset: None, + limit: None, + ungrouped: false, + order_by: vec![], + source: SimpleQuerySource::PreAggregation(pre_aggregation_source), + }; + Rc::new(FullKeyAggregate { + join_dimensions: query.source.join_dimensions.clone(), + use_full_join_and_coalesce: query.source.use_full_join_and_coalesce, + multiplied_measures_resolver: Some( + ResolvedMultipliedMeasures::PreAggregation(Rc::new( + pre_aggregation_query, + )), + ), + multi_stage_subquery_refs: query.source.multi_stage_subquery_refs.clone(), + }) + } else { + return Ok(None); + } + } else { + query.source.clone() + } + } else { + query.source.clone() + }; + + let result = FullKeyAggregateQuery { + multistage_members: multi_stages_queries, + schema: query.schema.clone(), + filter: query.filter.clone(), + offset: query.offset, + limit: query.limit, + ungrouped: query.ungrouped, + order_by: query.order_by.clone(), + source, + }; + Ok(Some(Rc::new(Query::FullKeyAggregateQuery(result)))) + } + + fn try_rewrite_multistage( + &mut self, + multi_stage_name: &String, + multi_stage_queries: &mut Vec>, + rewrited_multistage: &mut HashMap, + pre_aggregation: &Rc, + ) -> Result<(), CubeError> { + if rewrited_multistage + .get(multi_stage_name) + .cloned() + .unwrap_or(false) + { + return Ok(()); + } + + if let Some(multi_stage_item) = multi_stage_queries + .iter() + .cloned() + .find(|query| &query.name == multi_stage_name) + { + match &multi_stage_item.member_type { + MultiStageMemberLogicalType::LeafMeasure(multi_stage_leaf_measure) => self + .try_rewrite_multistage_leaf_measure( + multi_stage_name, + multi_stage_leaf_measure, + multi_stage_queries, + rewrited_multistage, + pre_aggregation, + )?, + MultiStageMemberLogicalType::MeasureCalculation( + multi_stage_measure_calculation, + ) => self.try_rewrite_multistage_measure_calculation( + multi_stage_name, + multi_stage_measure_calculation, + multi_stage_queries, + rewrited_multistage, + pre_aggregation, + )?, + MultiStageMemberLogicalType::GetDateRange(multi_stage_get_date_range) => self + .try_rewrite_multistage_get_date_range( + multi_stage_name, + multi_stage_get_date_range, + multi_stage_queries, + rewrited_multistage, + pre_aggregation, + )?, + MultiStageMemberLogicalType::TimeSeries(multi_stage_time_series) => self + .try_rewrite_multistage_time_series( + multi_stage_name, + multi_stage_time_series, + multi_stage_queries, + rewrited_multistage, + pre_aggregation, + )?, + MultiStageMemberLogicalType::RollingWindow(multi_stage_rolling_window) => self + .try_rewrite_multistage_rolling_window( + multi_stage_name, + multi_stage_rolling_window, + multi_stage_queries, + rewrited_multistage, + pre_aggregation, + )?, + } + } + + Ok(()) + } + + fn try_rewrite_multistage_measure_calculation( + &mut self, + multi_stage_name: &String, + multi_stage_measure_calculation: &MultiStageMeasureCalculation, + multi_stage_queries: &mut Vec>, + rewrited_multistage: &mut HashMap, + pre_aggregation: &Rc, + ) -> Result<(), CubeError> { + let used_multi_stage_symbols = + self.collect_multi_stage_symbols(&multi_stage_measure_calculation.source); + for (_, multi_stage_name) in used_multi_stage_symbols.iter() { + self.try_rewrite_multistage( + multi_stage_name, + multi_stage_queries, + rewrited_multistage, + pre_aggregation, + )?; + } + rewrited_multistage.insert(multi_stage_name.clone(), true); + Ok(()) + } + + fn try_rewrite_multistage_rolling_window( + &mut self, + multi_stage_name: &String, + multi_stage_rolling_window: &MultiStageRollingWindow, + multi_stage_queries: &mut Vec>, + rewrited_multistage: &mut HashMap, + pre_aggregation: &Rc, + ) -> Result<(), CubeError> { + self.try_rewrite_multistage( + &multi_stage_rolling_window.time_series_input.name, + multi_stage_queries, + rewrited_multistage, + pre_aggregation, + )?; + self.try_rewrite_multistage( + &multi_stage_rolling_window.measure_input.name, + multi_stage_queries, + rewrited_multistage, + pre_aggregation, + )?; + rewrited_multistage.insert(multi_stage_name.clone(), true); + Ok(()) + } + + fn try_rewrite_multistage_time_series( + &mut self, + multi_stage_name: &String, + multi_stage_time_series: &MultiStageTimeSeries, + multi_stage_queries: &mut Vec>, + rewrited_multistage: &mut HashMap, + pre_aggregation: &Rc, + ) -> Result<(), CubeError> { + if let Some(get_date_range_ref) = &multi_stage_time_series.get_date_range_multistage_ref { + self.try_rewrite_multistage( + &get_date_range_ref, + multi_stage_queries, + rewrited_multistage, + pre_aggregation, + )?; + } + rewrited_multistage.insert(multi_stage_name.clone(), true); + Ok(()) + } + + fn try_rewrite_multistage_get_date_range( + &mut self, + _multi_stage_name: &String, + _multi_stage_get_date_range: &MultiStageGetDateRange, + _multi_stage_queries: &mut Vec>, + _rewrited_multistage: &mut HashMap, + _pre_aggregation: &Rc, + ) -> Result<(), CubeError> { + Ok(()) //TODO + } + + fn try_rewrite_multistage_leaf_measure( + &mut self, + multi_stage_name: &String, + multi_stage_leaf_measure: &MultiStageLeafMeasure, + multi_stage_queries: &mut Vec>, + rewrited_multistage: &mut HashMap, + pre_aggregation: &Rc, + ) -> Result<(), CubeError> { + if let Some(rewritten) = + self.try_rewrite_query(multi_stage_leaf_measure.query.clone(), pre_aggregation)? + { + let new_leaf = MultiStageLeafMeasure { + measure: multi_stage_leaf_measure.measure.clone(), + render_measure_as_state: multi_stage_leaf_measure.render_measure_as_state.clone(), + render_measure_for_ungrouped: multi_stage_leaf_measure + .render_measure_for_ungrouped + .clone(), + time_shifts: multi_stage_leaf_measure.time_shifts.clone(), + query: rewritten, + }; + let new_multistage = Rc::new(LogicalMultiStageMember { + name: multi_stage_name.clone(), + member_type: MultiStageMemberLogicalType::LeafMeasure(new_leaf), + }); + + rewrited_multistage.insert(multi_stage_name.clone(), true); + if let Some(query) = multi_stage_queries + .iter_mut() + .find(|query| &query.name == multi_stage_name) + { + *query = new_multistage; + } + Ok(()) + } else { + Ok(()) + } + } + + fn collect_multi_stage_symbols(&self, source: &FullKeyAggregate) -> HashMap { + let mut symbols = HashMap::new(); + for source in source.multi_stage_subquery_refs.iter() { + for symbol in source.symbols.iter() { + symbols.insert(symbol.full_name(), source.name.clone()); + } + } + symbols + } + + fn make_pre_aggregation_source( + &mut self, + pre_aggregation: &Rc, + ) -> Result, CubeError> { + let pre_aggregation_obj = self.query_tools.base_tools().get_pre_aggregation_by_name( + pre_aggregation.cube_name.clone(), + pre_aggregation.name.clone(), + )?; + if let Some(table_name) = &pre_aggregation_obj.static_data().table_name { + let schema = LogicalSchema { + time_dimensions: vec![], + dimensions: pre_aggregation + .dimensions + .iter() + .cloned() + .chain( + pre_aggregation + .time_dimensions + .iter() + .map(|(d, _)| d.clone()), + ) + .collect(), + measures: pre_aggregation.measures.iter().cloned().collect(), + multiplied_measures: HashSet::new(), + }; + let pre_aggregation = PreAggregation { + name: pre_aggregation.name.clone(), + time_dimensions: pre_aggregation.time_dimensions.clone(), + dimensions: pre_aggregation.dimensions.clone(), + measures: pre_aggregation.measures.clone(), + schema: Rc::new(schema), + external: pre_aggregation.external.unwrap_or_default(), + granularity: pre_aggregation.granularity.clone(), + table_name: table_name.clone(), + cube_name: pre_aggregation.cube_name.clone(), + }; + self.used_pre_aggregations.insert( + ( + pre_aggregation.cube_name.clone(), + pre_aggregation.name.clone(), + ), + pre_aggregation_obj.clone(), + ); + Ok(Rc::new(pre_aggregation)) + } else { + Err(CubeError::internal(format!( + "Cannot find pre aggregation object for cube {} and name {}", + pre_aggregation.cube_name, pre_aggregation.name + ))) + } + } + + fn is_schema_and_filters_match( + &self, + schema: &Rc, + filters: &Rc, + pre_aggregation: &CompiledPreAggregation, + ) -> Result { + let helper = OptimizerHelper::new(); + + let match_state = self.match_dimensions( + &schema.dimensions, + &schema.time_dimensions, + &filters.dimensions_filters, + &filters.time_dimensions_filters, + &filters.segments, + pre_aggregation, + )?; + + let all_measures = helper.all_measures(schema, filters); + if !schema.multiplied_measures.is_empty() && match_state == MatchState::Partial { + return Ok(false); + } + if match_state == MatchState::NotMatched { + return Ok(false); + } + let measures_match = self.try_match_measures( + &all_measures, + pre_aggregation, + match_state == MatchState::Partial, + )?; + Ok(measures_match) + } + + fn try_match_measures( + &self, + measures: &Vec>, + pre_aggregation: &CompiledPreAggregation, + only_addictive: bool, + ) -> Result { + let matcher = MeasureMatcher::new(pre_aggregation, only_addictive); + for measure in measures.iter() { + if !matcher.try_match(measure)? { + return Ok(false); + } + } + Ok(true) + } + + fn match_dimensions( + &self, + dimensions: &Vec>, + time_dimensions: &Vec>, + filters: &Vec, + time_dimension_filters: &Vec, + segments: &Vec, + pre_aggregation: &CompiledPreAggregation, + ) -> Result { + let mut matcher = DimensionMatcher::new(self.query_tools.clone(), pre_aggregation); + matcher.try_match( + dimensions, + time_dimensions, + filters, + time_dimension_filters, + segments, + )?; + let result = matcher.result(); + Ok(result) + } +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/original_sql_collector.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/original_sql_collector.rs new file mode 100644 index 0000000000000..183cdab2d3e7e --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/original_sql_collector.rs @@ -0,0 +1,44 @@ +use crate::logical_plan::*; +use crate::planner::query_tools::QueryTools; +use cubenativeutils::CubeError; +use std::collections::HashMap; +use std::rc::Rc; + +pub struct OriginalSqlCollector { + query_tools: Rc, +} + +impl OriginalSqlCollector { + pub fn new(query_tools: Rc) -> Self { + Self { query_tools } + } + + pub fn collect(&mut self, plan: &Query) -> Result, CubeError> { + let mut cube_names_collector = CubeNamesCollector::new(); + cube_names_collector.collect(&plan)?; + let cube_names = cube_names_collector.result(); + let mut result = HashMap::new(); + for cube_name in cube_names.iter() { + let pre_aggregations = self + .query_tools + .cube_evaluator() + .pre_aggregations_for_cube_as_array(cube_name.clone())?; + if let Some(found_pre_aggregation) = pre_aggregations + .iter() + .find(|p| p.static_data().pre_aggregation_type == "originalSql") + { + let name = if let Some(alias) = &found_pre_aggregation.static_data().sql_alias { + alias.clone() + } else { + found_pre_aggregation.static_data().name.clone() + }; + let table_name = self + .query_tools + .base_tools() + .pre_aggregation_table_name(cube_name.clone(), name)?; + result.insert(cube_name.clone(), table_name.clone()); + } + } + Ok(result) + } +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/original_sql_optimizer.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/original_sql_optimizer.rs new file mode 100644 index 0000000000000..4bd5c1ea078d3 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/original_sql_optimizer.rs @@ -0,0 +1,377 @@ +use super::*; +use crate::logical_plan::*; +use crate::planner::query_tools::QueryTools; +use cubenativeutils::CubeError; +use std::collections::HashMap; +use std::rc::Rc; + +pub struct OriginalSqlOptimizer { + query_tools: Rc, + foud_pre_aggregations: HashMap>, +} + +impl OriginalSqlOptimizer { + pub fn new(query_tools: Rc) -> Self { + Self { + query_tools, + foud_pre_aggregations: HashMap::new(), + } + } + + pub fn try_optimize(&mut self, plan: &Rc) -> Result>, CubeError> { + let res = match plan.as_ref() { + Query::SimpleQuery(query) => self + .try_optimize_simple_query(query)? + .map(|optimized| Rc::new(Query::SimpleQuery(optimized))), + Query::FullKeyAggregateQuery(query) => self + .try_optimize_full_key_aggregate_query(query)? + .map(|optimized| Rc::new(Query::FullKeyAggregateQuery(optimized))), + }; + Ok(res) + } + + fn try_optimize_full_key_aggregate_query( + &mut self, + query: &FullKeyAggregateQuery, + ) -> Result, CubeError> { + let optimized_source = self.try_optimize_full_key_aggregate(&query.source)?; + if optimized_source.is_some() { + Ok(Some(FullKeyAggregateQuery { + multistage_members: query.multistage_members.clone(), + schema: query.schema.clone(), + filter: query.filter.clone(), + offset: query.offset, + limit: query.limit, + ungrouped: query.ungrouped, + order_by: query.order_by.clone(), + source: optimized_source.unwrap_or_else(|| query.source.clone()), + })) + } else { + Ok(None) + } + } + + fn try_optimize_full_key_aggregate( + &mut self, + full_key_aggregate: &Rc, + ) -> Result>, CubeError> { + let res = if let Some(resolver) = &full_key_aggregate.multiplied_measures_resolver { + if let Some(optimized_resolver) = + self.try_optimize_resolved_multiplied_measures(resolver)? + { + Some(Rc::new(FullKeyAggregate { + multiplied_measures_resolver: Some(optimized_resolver), + multi_stage_subquery_refs: full_key_aggregate.multi_stage_subquery_refs.clone(), + join_dimensions: full_key_aggregate.join_dimensions.clone(), + use_full_join_and_coalesce: full_key_aggregate.use_full_join_and_coalesce, + })) + } else { + None + } + } else { + None + }; + Ok(res) + } + + fn try_optimize_resolved_multiplied_measures( + &mut self, + source: &ResolvedMultipliedMeasures, + ) -> Result, CubeError> { + let res = match source { + ResolvedMultipliedMeasures::ResolveMultipliedMeasures(resolve_multiplied_measures) => { + self.try_optimize_multiplied_measures_resolver(resolve_multiplied_measures)? + .map(|resolver| ResolvedMultipliedMeasures::ResolveMultipliedMeasures(resolver)) + } + ResolvedMultipliedMeasures::PreAggregation(_) => None, + }; + Ok(res) + } + + fn try_optimize_multiplied_measures_resolver( + &mut self, + resolver: &Rc, + ) -> Result>, CubeError> { + let optimized_regular_measure_subqueries = resolver + .regular_measure_subqueries + .iter() + .map(|subquery| self.try_optimize_simple_query(subquery)) + .collect::, _>>()?; + let optimized_multiplied_subqueries = resolver + .aggregate_multiplied_subqueries + .iter() + .map(|subquery| self.try_optimize_aggregate_multiplied_subquery(subquery)) + .collect::, _>>()?; + let res = if optimized_regular_measure_subqueries + .iter() + .any(|subquery| subquery.is_some()) + || optimized_multiplied_subqueries + .iter() + .any(|subquery| subquery.is_some()) + { + Some(Rc::new(ResolveMultipliedMeasures { + schema: resolver.schema.clone(), + filter: resolver.filter.clone(), + regular_measure_subqueries: optimized_regular_measure_subqueries + .into_iter() + .zip(resolver.regular_measure_subqueries.iter()) + .map(|(optimized, original)| { + optimized.map_or_else(|| original.clone(), |v| Rc::new(v)) + }) + .collect(), + aggregate_multiplied_subqueries: optimized_multiplied_subqueries + .into_iter() + .zip(resolver.aggregate_multiplied_subqueries.iter()) + .map(|(optimized, original)| optimized.unwrap_or_else(|| original.clone())) + .collect(), + })) + } else { + None + }; + Ok(res) + } + + fn try_optimize_simple_query( + &mut self, + query: &SimpleQuery, + ) -> Result, CubeError> { + let optimized_source = self.try_optimize_simple_query_source(&query.source)?; + let optimized_dimension_subqueries = + self.try_optimize_dimension_subqueries(&query.dimension_subqueries)?; + if optimized_source.is_some() || optimized_dimension_subqueries.is_some() { + Ok(Some(SimpleQuery { + source: optimized_source.unwrap_or_else(|| query.source.clone()), + dimension_subqueries: optimized_dimension_subqueries + .unwrap_or_else(|| query.dimension_subqueries.clone()), + schema: query.schema.clone(), + filter: query.filter.clone(), + offset: query.offset, + limit: query.limit, + ungrouped: query.ungrouped, + order_by: query.order_by.clone(), + })) + } else { + Ok(None) + } + } + + fn try_optimize_simple_query_source( + &mut self, + source: &SimpleQuerySource, + ) -> Result, CubeError> { + match source { + SimpleQuerySource::LogicalJoin(join) => Ok(self + .try_optimize_logical_join(join)? + .map(|join| SimpleQuerySource::LogicalJoin(join))), + SimpleQuerySource::PreAggregation(_) => Ok(None), + } + } + + fn try_optimize_aggregate_multiplied_subquery( + &mut self, + subquery: &Rc, + ) -> Result>, CubeError> { + let optimized_keys_subquery = self.try_optimize_keys_subquery(&subquery.keys_subquery)?; + let optimized_pk_cube = self.try_optimize_cube(subquery.pk_cube.clone())?; + let optimized_source = match subquery.source.as_ref() { + AggregateMultipliedSubquerySouce::Cube => None, + AggregateMultipliedSubquerySouce::MeasureSubquery(measure_subquery) => self + .try_optimize_measure_subquery(&measure_subquery)? + .map(|measure_subquery| { + Rc::new(AggregateMultipliedSubquerySouce::MeasureSubquery( + measure_subquery, + )) + }), + }; + let optimized_dimension_subqueries = + self.try_optimize_dimension_subqueries(&subquery.dimension_subqueries)?; + if optimized_keys_subquery.is_some() + || optimized_source.is_some() + || optimized_dimension_subqueries.is_some() + || optimized_pk_cube.is_some() + { + Ok(Some(Rc::new(AggregateMultipliedSubquery { + keys_subquery: optimized_keys_subquery + .unwrap_or_else(|| subquery.keys_subquery.clone()), + source: optimized_source.unwrap_or_else(|| subquery.source.clone()), + pk_cube: optimized_pk_cube.unwrap_or_else(|| subquery.pk_cube.clone()), + schema: subquery.schema.clone(), + dimension_subqueries: optimized_dimension_subqueries + .unwrap_or_else(|| subquery.dimension_subqueries.clone()), + }))) + } else { + Ok(None) + } + } + + fn try_optimize_keys_subquery( + &mut self, + subquery: &Rc, + ) -> Result>, CubeError> { + let optimized_source = self.try_optimize_logical_join(&subquery.source)?; + let optimized_dimension_subqueries = + self.try_optimize_dimension_subqueries(&subquery.dimension_subqueries)?; + if optimized_source.is_some() || optimized_dimension_subqueries.is_some() { + Ok(Some(Rc::new(KeysSubQuery { + key_cube_name: subquery.key_cube_name.clone(), + time_dimensions: subquery.time_dimensions.clone(), + dimensions: subquery.dimensions.clone(), + dimension_subqueries: optimized_dimension_subqueries + .unwrap_or_else(|| subquery.dimension_subqueries.clone()), + primary_keys_dimensions: subquery.primary_keys_dimensions.clone(), + filter: subquery.filter.clone(), + source: optimized_source.unwrap_or_else(|| subquery.source.clone()), + }))) + } else { + Ok(None) + } + } + + fn try_optimize_measure_subquery( + &mut self, + subquery: &Rc, + ) -> Result>, CubeError> { + let optimized_source = self.try_optimize_logical_join(&subquery.source)?; + let optimized_dimension_subqueries = + self.try_optimize_dimension_subqueries(&subquery.dimension_subqueries)?; + if optimized_source.is_some() || optimized_dimension_subqueries.is_some() { + Ok(Some(Rc::new(MeasureSubquery { + primary_keys_dimensions: subquery.primary_keys_dimensions.clone(), + measures: subquery.measures.clone(), + dimension_subqueries: optimized_dimension_subqueries + .unwrap_or_else(|| subquery.dimension_subqueries.clone()), + source: optimized_source.unwrap_or_else(|| subquery.source.clone()), + }))) + } else { + Ok(None) + } + } + + fn try_optimize_dimension_subqueries( + &mut self, + dimension_subqueries: &Vec>, + ) -> Result>>, CubeError> { + let optimized = dimension_subqueries + .iter() + .map(|subquery| self.try_optimize_dimension_subquery(subquery)) + .collect::, _>>()?; + let res = if optimized.iter().any(|subquery| subquery.is_some()) { + Some( + optimized + .into_iter() + .zip(dimension_subqueries.iter()) + .map(|(optimized, original)| optimized.unwrap_or_else(|| original.clone())) + .collect(), + ) + } else { + None + }; + Ok(res) + } + + fn try_optimize_dimension_subquery( + &mut self, + subquery: &Rc, + ) -> Result>, CubeError> { + if let Some(optimized) = self.try_optimize(&subquery.query)? { + Ok(Some(Rc::new(DimensionSubQuery { + query: optimized, + primary_keys_dimensions: subquery.primary_keys_dimensions.clone(), + subquery_dimension: subquery.subquery_dimension.clone(), + measure_for_subquery_dimension: subquery.measure_for_subquery_dimension.clone(), + }))) + } else { + Ok(None) + } + } + + fn try_optimize_logical_join( + &mut self, + join: &Rc, + ) -> Result>, CubeError> { + let optimized_root = self.try_optimize_cube(join.root.clone())?; + let optimized_items = join + .joins + .iter() + .map(|join_item| self.try_optimize_join_item(join_item)) + .collect::, _>>()?; + + let result = + if optimized_root.is_some() || optimized_items.iter().any(|item| item.is_some()) { + Some(Rc::new(LogicalJoin { + root: optimized_root.unwrap_or_else(|| join.root.clone()), + joins: optimized_items + .into_iter() + .zip(join.joins.iter()) + .map(|(optimized, original)| optimized.unwrap_or_else(|| original.clone())) + .collect(), + })) + } else { + None + }; + Ok(result) + } + + fn try_optimize_cube(&mut self, cube: Rc) -> Result>, CubeError> { + let res = if let Some(found_pre_aggregation) = + self.find_origin_sql_pre_aggregation(&cube.name)? + { + Some( + cube.with_original_sql_pre_aggregation(OriginalSqlPreAggregation { + name: found_pre_aggregation.name.clone(), + }), + ) + } else { + None + }; + Ok(res) + } + + fn try_optimize_join_item( + &mut self, + join_item: &LogicalJoinItem, + ) -> Result, CubeError> { + match join_item { + LogicalJoinItem::CubeJoinItem(cube_join_item) => { + if let Some(optimized_cube) = self.try_optimize_cube(cube_join_item.cube.clone())? { + Ok(Some(LogicalJoinItem::CubeJoinItem(CubeJoinItem { + cube: optimized_cube, + on_sql: cube_join_item.on_sql.clone(), + }))) + } else { + Ok(None) + } + } + } + } + + fn find_origin_sql_pre_aggregation( + &mut self, + cube_name: &String, + ) -> Result>, CubeError> { + let res = if let Some(found_pre_aggregation) = self.foud_pre_aggregations.get(cube_name) { + Some(found_pre_aggregation.clone()) + } else { + let pre_aggregations = self + .query_tools + .cube_evaluator() + .pre_aggregations_for_cube_as_array(cube_name.clone())?; + if let Some(found_pre_aggregation) = pre_aggregations + .iter() + .find(|p| p.static_data().pre_aggregation_type == "originalSql") + { + let compiled = CompiledPreAggregation::try_new( + self.query_tools.clone(), + cube_name, + found_pre_aggregation.clone(), + )?; + self.foud_pre_aggregations + .insert(cube_name.clone(), compiled.clone()); + Some(compiled) + } else { + None + } + }; + Ok(res) + } +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/pre_aggregation.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/pre_aggregation.rs new file mode 100644 index 0000000000000..7ed2add6d21f2 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/pre_aggregation.rs @@ -0,0 +1,57 @@ +use super::*; +use crate::planner::sql_evaluator::MemberSymbol; +use itertools::Itertools; +use std::rc::Rc; + +pub struct PreAggregation { + pub name: String, + pub schema: Rc, + pub measures: Vec>, + pub dimensions: Vec>, + pub time_dimensions: Vec<(Rc, Option)>, + pub external: bool, + pub granularity: Option, + pub table_name: String, + pub cube_name: String, +} + +impl PrettyPrint for PreAggregation { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println("PreAggregation: ", state); + let state = state.new_level(); + result.println(&format!("name: {}", self.name), &state); + result.println(&format!("cube_name: {}", self.cube_name), &state); + result.println(&format!("table_name: {}", self.table_name), &state); + result.println(&format!("external: {}", self.external), &state); + result.println( + &format!( + "granularity: {}", + self.granularity.clone().unwrap_or("None".to_string()) + ), + &state, + ); + result.println( + &format!( + "-time_dimensions: {}", + &self + .time_dimensions + .iter() + .map(|(d, granularity)| format!( + "({} {})", + d.full_name(), + granularity.clone().unwrap_or("None".to_string()) + )) + .join(", ") + ), + &state, + ); + result.println( + &format!("-dimensions: {}", print_symbols(&self.dimensions)), + &state, + ); + result.println( + &format!("-measures: {}", print_symbols(&self.measures)), + &state, + ); + } +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/query.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/query.rs index 74c92a8c0c452..27b561a8f9f8c 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/query.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/query.rs @@ -8,8 +8,8 @@ pub enum Query { impl PrettyPrint for Query { fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { match self { - Query::SimpleQuery(query) => query.pretty_print(result, state), - Query::FullKeyAggregateQuery(query) => query.pretty_print(result, state), + Self::SimpleQuery(query) => query.pretty_print(result, state), + Self::FullKeyAggregateQuery(query) => query.pretty_print(result, state), } } } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/resolve_multiplied_measures.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/resolve_multiplied_measures.rs index 61aecacfc0a95..498a48ba9ca39 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/resolve_multiplied_measures.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/resolve_multiplied_measures.rs @@ -2,6 +2,8 @@ use super::*; use std::rc::Rc; pub struct ResolveMultipliedMeasures { + pub schema: Rc, + pub filter: Rc, pub regular_measure_subqueries: Vec>, pub aggregate_multiplied_subqueries: Vec>, } @@ -11,6 +13,10 @@ impl PrettyPrint for ResolveMultipliedMeasures { result.println("ResolveMultipliedMeasures: ", state); let state = state.new_level(); let details_state = state.new_level(); + result.println("schema:", &state); + self.schema.pretty_print(result, &details_state); + result.println("filter:", &state); + self.filter.pretty_print(result, &details_state); result.println("regular_measure_subqueries:", &state); for subquery in self.regular_measure_subqueries.iter() { subquery.pretty_print(result, &details_state); diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/simple_query.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/simple_query.rs index 0c5b5148740ba..cc9fe645f5946 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/simple_query.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/simple_query.rs @@ -1,6 +1,23 @@ use super::*; use crate::planner::query_properties::OrderByItem; use std::rc::Rc; + +#[derive(Clone)] +pub enum SimpleQuerySource { + LogicalJoin(Rc), + PreAggregation(Rc), +} +impl PrettyPrint for SimpleQuerySource { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + match self { + SimpleQuerySource::LogicalJoin(join) => join.pretty_print(result, state), + SimpleQuerySource::PreAggregation(pre_aggregation) => { + pre_aggregation.pretty_print(result, state) + } + } + } +} +#[derive(Clone)] pub struct SimpleQuery { pub schema: Rc, pub dimension_subqueries: Vec>, @@ -9,7 +26,7 @@ pub struct SimpleQuery { pub limit: Option, pub ungrouped: bool, pub order_by: Vec, - pub source: Rc, + pub source: SimpleQuerySource, } impl PrettyPrint for SimpleQuery { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/builder.rs b/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/builder.rs index 8ca076b70bd75..695f1f1773131 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/builder.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/builder.rs @@ -8,6 +8,7 @@ use crate::planner::sql_evaluator::MeasureTimeShift; use crate::planner::sql_evaluator::MemberSymbol; use crate::planner::sql_evaluator::ReferencesBuilder; use crate::planner::sql_templates::PlanSqlTemplates; +use crate::planner::BaseMemberHelper; use crate::planner::SqlJoinCondition; use crate::planner::{BaseMember, MemberSymbolRef}; use cubenativeutils::CubeError; @@ -22,6 +23,7 @@ struct PhysicalPlanBuilderContext { pub render_measure_as_state: bool, //Render measure as state, for example hll state for count_approx pub render_measure_for_ungrouped: bool, pub time_shifts: HashMap, + pub original_sql_pre_aggregations: HashMap, } impl Default for PhysicalPlanBuilderContext { @@ -31,6 +33,7 @@ impl Default for PhysicalPlanBuilderContext { render_measure_as_state: false, render_measure_for_ungrouped: false, time_shifts: HashMap::new(), + original_sql_pre_aggregations: HashMap::new(), } } } @@ -41,6 +44,7 @@ impl PhysicalPlanBuilderContext { factory.set_time_shifts(self.time_shifts.clone()); factory.set_count_approx_as_state(self.render_measure_as_state); factory.set_ungrouped_measure(self.render_measure_for_ungrouped); + factory.set_original_sql_pre_aggregations(self.original_sql_pre_aggregations.clone()); factory } } @@ -59,8 +63,14 @@ impl PhysicalPlanBuilder { } } - pub fn build(&self, logical_plan: Rc) -> Result, CubeError> { - self.build_impl(logical_plan, &PhysicalPlanBuilderContext::default()) + pub fn build( + &self, + logical_plan: Rc, + original_sql_pre_aggregations: HashMap, + ) -> Result, CubeError> { + let mut context = PhysicalPlanBuilderContext::default(); + context.original_sql_pre_aggregations = original_sql_pre_aggregations; + self.build_impl(logical_plan, &context) } fn build_impl( @@ -82,15 +92,33 @@ impl PhysicalPlanBuilder { context: &PhysicalPlanBuilderContext, ) -> Result, CubeError> { let mut render_references = HashMap::new(); - let from = self.process_logical_join( - &logical_plan.source, - context, - &logical_plan.dimension_subqueries, - &mut render_references, - )?; - let mut select_builder = SelectBuilder::new(from); + let mut measure_references = HashMap::new(); let mut context_factory = context.make_sql_nodes_factory(); + let from = match &logical_plan.source { + SimpleQuerySource::LogicalJoin(join) => self.process_logical_join( + &join, + context, + &logical_plan.dimension_subqueries, + &mut render_references, + )?, + SimpleQuerySource::PreAggregation(pre_aggregation) => { + let res = self.process_pre_aggregation( + pre_aggregation, + context, + &mut render_references, + &mut measure_references, + )?; + for member in logical_plan.schema.time_dimensions.iter() { + context_factory.add_dimensions_with_ignored_timezone(member.full_name()); + } + context_factory.set_use_local_tz_in_date_range(true); + res + } + }; + + let mut select_builder = SelectBuilder::new(from); context_factory.set_ungrouped(logical_plan.ungrouped); + context_factory.set_pre_aggregation_measures_references(measure_references); let mut group_by = Vec::new(); for member in logical_plan.schema.dimensions.iter() { @@ -144,6 +172,72 @@ impl PhysicalPlanBuilder { Ok(res) } + fn process_pre_aggregation( + &self, + pre_aggregation: &Rc, + _context: &PhysicalPlanBuilderContext, + render_references: &mut HashMap, + measure_references: &mut HashMap, + ) -> Result, CubeError> { + let mut pre_aggregation_schema = Schema::empty(); + let pre_aggregation_alias = PlanSqlTemplates::memeber_alias_name( + &pre_aggregation.cube_name, + &pre_aggregation.name, + &None, + ); + for dim in pre_aggregation.dimensions.iter() { + let alias = BaseMemberHelper::default_alias( + &dim.cube_name(), + &dim.name(), + &dim.alias_suffix(), + self.query_tools.clone(), + )?; + render_references.insert( + dim.full_name(), + QualifiedColumnName::new(Some(pre_aggregation_alias.clone()), alias.clone()), + ); + pre_aggregation_schema.add_column(SchemaColumn::new(alias, Some(dim.full_name()))); + } + for (dim, granularity) in pre_aggregation.time_dimensions.iter() { + let alias = BaseMemberHelper::default_alias( + &dim.cube_name(), + &dim.name(), + granularity, + self.query_tools.clone(), + )?; + render_references.insert( + dim.full_name(), + QualifiedColumnName::new(Some(pre_aggregation_alias.clone()), alias.clone()), + ); + if let Some(granularity) = &granularity { + render_references.insert( + format!("{}_{}", dim.full_name(), granularity), + QualifiedColumnName::new(Some(pre_aggregation_alias.clone()), alias.clone()), + ); + } + pre_aggregation_schema.add_column(SchemaColumn::new(alias, Some(dim.full_name()))); + } + for meas in pre_aggregation.measures.iter() { + let alias = BaseMemberHelper::default_alias( + &meas.cube_name(), + &meas.name(), + &meas.alias_suffix(), + self.query_tools.clone(), + )?; + measure_references.insert( + meas.full_name(), + QualifiedColumnName::new(Some(pre_aggregation_alias.clone()), alias.clone()), + ); + pre_aggregation_schema.add_column(SchemaColumn::new(alias, Some(meas.full_name()))); + } + let from = From::new_from_table_reference( + pre_aggregation.table_name.clone(), + Rc::new(pre_aggregation_schema), + Some(pre_aggregation_alias), + ); + Ok(from) + } + fn build_full_key_aggregate_query( &self, logical_plan: &FullKeyAggregateQuery, @@ -282,27 +376,24 @@ impl PhysicalPlanBuilder { multi_stage_schemas: &HashMap>, ) -> Result<(Rc, usize), CubeError> { let mut joins = Vec::new(); - for source in full_key_aggregate.sources.iter() { - match source { - FullKeyAggregateSource::ResolveMultipliedMeasures(resolve_multiplied_measures) => { - joins.append(&mut self.process_resolve_multiplied_measures( - resolve_multiplied_measures, - context, - )?); - } - FullKeyAggregateSource::MultiStageSubqueryRef(subquery_ref) => { - if let Some(schema) = multi_stage_schemas.get(&subquery_ref.name) { - joins.push(SingleSource::TableReference( - subquery_ref.name.clone(), - schema.clone(), - )); - } else { - return Err(CubeError::internal(format!( - "MultiStageSubqueryRef not found: {}", - subquery_ref.name - ))); - } - } + if let Some(resolver_multiplied_measures) = &full_key_aggregate.multiplied_measures_resolver + { + joins.append( + &mut self + .process_resolved_multiplied_measures(resolver_multiplied_measures, context)?, + ); + } + for subquery_ref in full_key_aggregate.multi_stage_subquery_refs.iter() { + if let Some(schema) = multi_stage_schemas.get(&subquery_ref.name) { + joins.push(SingleSource::TableReference( + subquery_ref.name.clone(), + schema.clone(), + )); + } else { + return Err(CubeError::internal(format!( + "MultiStageSubqueryRef not found: {}", + subquery_ref.name + ))); } } @@ -365,6 +456,25 @@ impl PhysicalPlanBuilder { Ok((result, joins.len())) } + fn process_resolved_multiplied_measures( + &self, + resolved_multiplied_measures: &ResolvedMultipliedMeasures, + context: &PhysicalPlanBuilderContext, + ) -> Result, CubeError> { + match resolved_multiplied_measures { + ResolvedMultipliedMeasures::ResolveMultipliedMeasures(resolve_multiplied_measures) => { + self.process_resolve_multiplied_measures(resolve_multiplied_measures, context) + } + ResolvedMultipliedMeasures::PreAggregation(pre_aggregation_query) => { + let pre_aggregation_query = + self.build_simple_query(pre_aggregation_query, context)?; + let source = + SingleSource::Subquery(Rc::new(QueryPlan::Select(pre_aggregation_query))); + Ok(vec![source]) + } + } + } + fn process_resolve_multiplied_measures( &self, resolve_multiplied_measures: &Rc, @@ -527,8 +637,9 @@ impl PhysicalPlanBuilder { .keys_subquery .primary_keys_dimensions; let pk_cube = aggregate_multiplied_subquery.pk_cube.clone(); - let pk_cube_alias = - pk_cube.default_alias_with_prefix(&Some(format!("{}_key", pk_cube.default_alias()))); + let pk_cube_alias = pk_cube + .cube + .default_alias_with_prefix(&Some(format!("{}_key", pk_cube.cube.default_alias()))); match aggregate_multiplied_subquery.source.as_ref() { AggregateMultipliedSubquerySouce::Cube => { let conditions = primary_keys_dimensions @@ -547,7 +658,7 @@ impl PhysicalPlanBuilder { .collect::, _>>()?; join_builder.left_join_cube( - pk_cube.clone(), + pk_cube.cube.clone(), Some(pk_cube_alias.clone()), JoinCondition::new_dimension_join(conditions, false), ); @@ -889,8 +1000,8 @@ impl PhysicalPlanBuilder { context: &PhysicalPlanBuilderContext, ) -> Result, CubeError> { let time_dimension = rolling_window.rolling_time_dimension.clone(); - let time_series_ref = rolling_window.time_series_input.clone(); - let measure_input_ref = rolling_window.measure_input.clone(); + let time_series_ref = rolling_window.time_series_input.name.clone(); + let measure_input_ref = rolling_window.measure_input.name.clone(); let time_series_schema = if let Some(schema) = multi_stage_schemas.get(&time_series_ref) { schema.clone() diff --git a/rust/cubesqlplanner/cubesqlplanner/src/plan/query_plan.rs b/rust/cubesqlplanner/cubesqlplanner/src/plan/query_plan.rs index 8957321274111..e2abb89ee52b8 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/plan/query_plan.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/plan/query_plan.rs @@ -3,6 +3,7 @@ use crate::planner::sql_templates::PlanSqlTemplates; use cubenativeutils::CubeError; use std::rc::Rc; +#[derive(Clone)] pub enum QueryPlan { Select(Rc