Skip to content

Commit f87be98

Browse files
authored
fix: Cross data source rollupLambda uses incorrect refreshKey (#6288)
* fix: Cross data source `rollupLambda` uses incorrect refreshKey * fix(ksql-driver): Missing quote identifier * fix(ksql-driver): Missing quote identifier
1 parent f75ed99 commit f87be98

File tree

4 files changed

+116
-15
lines changed

4 files changed

+116
-15
lines changed

packages/cubejs-ksql-driver/src/KsqlQuery.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ export class KsqlQuery extends BaseQuery {
6969
}
7070

7171
public groupByClause() {
72+
if (this.ungrouped) {
73+
return '';
74+
}
7275
const dimensionsForSelect: any[] = this.dimensionsForSelect();
7376
const dimensionColumns = dimensionsForSelect.map(s => s.selectColumns() && s.dimensionSql())
7477
.reduce((a, b) => a.concat(b), [])
@@ -99,7 +102,12 @@ export class KsqlQuery extends BaseQuery {
99102

100103
public preAggregationReadOnly(cube: string, preAggregation: any) {
101104
const [sql] = this.preAggregationSql(cube, preAggregation);
102-
return preAggregation.type === 'originalSql' && Boolean(KsqlQuery.extractTableFromSimpleSelectAsteriskQuery(sql));
105+
return preAggregation.type === 'originalSql' && Boolean(KsqlQuery.extractTableFromSimpleSelectAsteriskQuery(sql)) ||
106+
preAggregation.type === 'rollup' && !!this.dimensionsForSelect().find(d => d.definition().primaryKey);
107+
}
108+
109+
public preAggregationAllowUngroupingWithPrimaryKey(_cube: any, _preAggregation: any) {
110+
return true;
103111
}
104112

105113
public static extractTableFromSimpleSelectAsteriskQuery(sql: string) {

packages/cubejs-schema-compiler/src/adapter/BaseQuery.js

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -338,9 +338,6 @@ class BaseQuery {
338338
throw new UserError(`Ungrouped query requires primary keys to be present in dimensions: ${missingPrimaryKeys.map(k => `'${k}'`).join(', ')}. Pass allowUngroupedWithoutPrimaryKey option to disable this check.`);
339339
}
340340
}
341-
if (this.measures.length) {
342-
throw new UserError('Measures aren\'t allowed in ungrouped query');
343-
}
344341
if (this.measureFilters.length) {
345342
throw new UserError('Measure filters aren\'t allowed in ungrouped query');
346343
}
@@ -1956,6 +1953,13 @@ class BaseQuery {
19561953
) {
19571954
return evaluateSql === '*' ? '1' : evaluateSql;
19581955
}
1956+
if (this.ungrouped) {
1957+
if (symbol.type === 'count' || symbol.type === 'countDistinct' || symbol.type === 'countDistinctApprox') {
1958+
return '1';
1959+
} else {
1960+
return evaluateSql;
1961+
}
1962+
}
19591963
if ((this.safeEvaluateSymbolContext().ungroupedAliases || {})[measurePath]) {
19601964
evaluateSql = (this.safeEvaluateSymbolContext().ungroupedAliases || {})[measurePath];
19611965
}
@@ -2368,6 +2372,10 @@ class BaseQuery {
23682372
return false;
23692373
}
23702374

2375+
preAggregationAllowUngroupingWithPrimaryKey(_cube, _preAggregation) {
2376+
return false;
2377+
}
2378+
23712379
// eslint-disable-next-line consistent-return
23722380
preAggregationQueryForSqlEvaluation(cube, preAggregation) {
23732381
if (preAggregation.type === 'autoRollup') {

packages/cubejs-schema-compiler/src/adapter/PreAggregations.js

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,8 @@ export class PreAggregations {
148148

149149
const tableName = this.preAggregationTableName(cube, preAggregationName, preAggregation);
150150
const invalidateKeyQueries = this.query.preAggregationInvalidateKeyQueries(cube, preAggregation);
151-
const partitionInvalidateKeyQueries = this.query.partitionInvalidateKeyQueries && this.query.partitionInvalidateKeyQueries(cube, preAggregation);
151+
const queryForSqlEvaluation = this.query.preAggregationQueryForSqlEvaluation(cube, preAggregation);
152+
const partitionInvalidateKeyQueries = queryForSqlEvaluation.partitionInvalidateKeyQueries && queryForSqlEvaluation.partitionInvalidateKeyQueries(cube, preAggregation);
152153

153154
const matchedTimeDimension =
154155
preAggregation.partitionGranularity &&
@@ -161,11 +162,10 @@ export class PreAggregations {
161162
td.isDateOperator() &&
162163
td.camelizeOperator === 'inDateRange' // TODO support all date operators
163164
);
164-
const queryForSqlEvaluation = this.query.preAggregationQueryForSqlEvaluation(cube, preAggregation);
165165

166166
const uniqueKeyColumnsDefault = () => null;
167167
const uniqueKeyColumns = ({
168-
rollup: () => this.query.preAggregationQueryForSqlEvaluation(cube, preAggregation).dimensionColumns(),
168+
rollup: () => queryForSqlEvaluation.dimensionColumns(),
169169
originalSql: () => preAggregation.uniqueKeyColumns || null
170170
}[preAggregation.type] || uniqueKeyColumnsDefault)();
171171

@@ -180,18 +180,18 @@ export class PreAggregations {
180180
partitionInvalidateKeyQueries,
181181
type: preAggregation.type,
182182
external: preAggregation.external,
183-
previewSql: this.query.preAggregationPreviewSql(tableName),
184-
preAggregationsSchema: this.query.preAggregationSchema(),
185-
loadSql: this.query.preAggregationLoadSql(cube, preAggregation, tableName),
186-
sql: this.query.preAggregationSql(cube, preAggregation),
183+
previewSql: queryForSqlEvaluation.preAggregationPreviewSql(tableName),
184+
preAggregationsSchema: queryForSqlEvaluation.preAggregationSchema(),
185+
loadSql: queryForSqlEvaluation.preAggregationLoadSql(cube, preAggregation, tableName),
186+
sql: queryForSqlEvaluation.preAggregationSql(cube, preAggregation),
187187
uniqueKeyColumns,
188188
aggregationsColumns,
189189
dataSource: queryForSqlEvaluation.dataSource,
190190
// in fact we can reference preAggregation.granularity however accessing timeDimensions is more strict and consistent
191191
granularity: references.timeDimensions[0]?.granularity,
192192
partitionGranularity: preAggregation.partitionGranularity,
193193
updateWindowSeconds: preAggregation.refreshKey && preAggregation.refreshKey.updateWindow &&
194-
this.query.parseSecondDuration(preAggregation.refreshKey.updateWindow),
194+
queryForSqlEvaluation.parseSecondDuration(preAggregation.refreshKey.updateWindow),
195195
preAggregationStartEndQueries:
196196
(preAggregation.partitionGranularity || references.timeDimensions[0]?.granularity) &&
197197
this.refreshRangeQuery().preAggregationStartEndQueries(cube, preAggregation),
@@ -208,7 +208,7 @@ export class PreAggregations {
208208
const indexName = this.preAggregationTableName(cube, `${foundPreAggregation.sqlAlias || preAggregationName}_${index}`, preAggregation, true);
209209
return {
210210
indexName,
211-
sql: this.query.indexSql(
211+
sql: queryForSqlEvaluation.indexSql(
212212
cube,
213213
preAggregation,
214214
preAggregation.indexes[index],
@@ -226,11 +226,11 @@ export class PreAggregations {
226226
return {
227227
indexName,
228228
type: preAggregation.indexes[index].type,
229-
columns: this.query.evaluateIndexColumns(cube, preAggregation.indexes[index])
229+
columns: queryForSqlEvaluation.evaluateIndexColumns(cube, preAggregation.indexes[index])
230230
};
231231
}
232232
),
233-
readOnly: preAggregation.readOnly || this.query.preAggregationReadOnly(cube, preAggregation),
233+
readOnly: preAggregation.readOnly || queryForSqlEvaluation.preAggregationReadOnly(cube, preAggregation),
234234
streamOffset: preAggregation.streamOffset,
235235
unionWithSourceData: preAggregation.unionWithSourceData,
236236
rollupLambdaId: preAggregation.rollupLambdaId,
@@ -978,6 +978,7 @@ export class PreAggregations {
978978

979979
rollupPreAggregationQuery(cube, aggregation) {
980980
const references = this.evaluateAllReferences(cube, aggregation);
981+
const cubeQuery = this.query.newSubQueryForCube(cube, {});
981982
return this.query.newSubQueryForCube(
982983
cube,
983984
{
@@ -987,6 +988,8 @@ export class PreAggregations {
987988
timeDimensions: this.mergePartitionTimeDimensions(references, aggregation.partitionTimeDimensions),
988989
preAggregationQuery: true,
989990
useOriginalSqlPreAggregationsInPreAggregation: aggregation.useOriginalSqlPreAggregations,
991+
ungrouped: cubeQuery.preAggregationAllowUngroupingWithPrimaryKey(cube, aggregation) &&
992+
!!references.dimensions.find(d => this.query.cubeEvaluator.dimensionByPath(d).primaryKey)
990993
}
991994
);
992995
}

packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations.test.ts

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,46 @@ describe('PreAggregations', () => {
410410
}
411411
},
412412
});
413+
414+
cube('LambdaVisitors', {
415+
extends: visitors,
416+
417+
preAggregations: {
418+
partitionedLambda: {
419+
type: 'rollupLambda',
420+
rollups: [partitioned, RealTimeLambdaVisitors.partitioned]
421+
},
422+
partitioned: {
423+
type: 'rollup',
424+
measures: [count],
425+
dimensions: [id, source],
426+
timeDimension: createdAt,
427+
granularity: 'day',
428+
partitionGranularity: 'month',
429+
refreshKey: {
430+
every: '1 hour',
431+
incremental: true,
432+
updateWindow: '1 day'
433+
}
434+
}
435+
}
436+
});
437+
438+
cube('RealTimeLambdaVisitors', {
439+
dataSource: 'ksql',
440+
extends: visitors,
441+
442+
preAggregations: {
443+
partitioned: {
444+
type: 'rollup',
445+
measures: [count],
446+
dimensions: [id, source],
447+
timeDimension: createdAt,
448+
granularity: 'day',
449+
partitionGranularity: 'day'
450+
}
451+
}
452+
});
413453
`);
414454

415455
it('simple pre-aggregation', () => compiler.compile().then(() => {
@@ -1629,4 +1669,46 @@ describe('PreAggregations', () => {
16291669
);
16301670
});
16311671
}));
1672+
1673+
it('lambda cross data source refresh key and ungrouped', () => compiler.compile().then(() => {
1674+
const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, {
1675+
measures: [
1676+
'LambdaVisitors.count'
1677+
],
1678+
dimensions: [
1679+
'LambdaVisitors.source'
1680+
],
1681+
timeDimensions: [{
1682+
dimension: 'LambdaVisitors.createdAt',
1683+
granularity: 'day',
1684+
dateRange: ['2017-01-01', '2017-01-25']
1685+
}],
1686+
timezone: 'America/Los_Angeles',
1687+
order: [{
1688+
id: 'LambdaVisitors.createdAt'
1689+
}],
1690+
preAggregationsSchema: '',
1691+
queryFactory: {
1692+
createQuery: (cube, compilers, options) => {
1693+
if (cube === 'RealTimeLambdaVisitors') {
1694+
// eslint-disable-next-line global-require
1695+
const { KsqlQuery } = require('../../../../../cubejs-ksql-driver');
1696+
return new KsqlQuery(compilers, options);
1697+
} else {
1698+
return new PostgresQuery(compilers, options);
1699+
}
1700+
}
1701+
}
1702+
});
1703+
1704+
const queryAndParams = query.buildSqlAndParams();
1705+
console.log(queryAndParams);
1706+
const preAggregationsDescription: any = query.preAggregations?.preAggregationsDescription();
1707+
console.log(JSON.stringify(preAggregationsDescription, null, 2));
1708+
const { partitionInvalidateKeyQueries, loadSql } = preAggregationsDescription.find(p => p.preAggregationId === 'RealTimeLambdaVisitors.partitioned');
1709+
1710+
expect(partitionInvalidateKeyQueries).toStrictEqual([]);
1711+
expect(loadSql[0]).not.toMatch(/GROUP BY/);
1712+
expect(loadSql[0]).toMatch(/1 `real_time_lambda_visitors__count`/);
1713+
}));
16321714
});

0 commit comments

Comments
 (0)