Skip to content

Commit 84fff0d

Browse files
authored
feat: Multiple rollups in rollupLambda support (#6008)
1 parent b39920f commit 84fff0d

File tree

7 files changed

+230
-66
lines changed

7 files changed

+230
-66
lines changed

packages/cubejs-backend-shared/src/time.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,3 +106,16 @@ export const addSecondsToLocalTimestamp = (timestamp: string, timezone: string,
106106
.add(seconds, 'second')
107107
.toDate();
108108
};
109+
110+
export const reformatInIsoLocal = (timestamp: string): string => {
111+
if (!timestamp) {
112+
return timestamp;
113+
}
114+
if (timestamp.length === 23) {
115+
return timestamp;
116+
}
117+
if (timestamp.length === 24) {
118+
return timestamp.replace('Z', '');
119+
}
120+
return moment.tz(timestamp, 'UTC').utc().format(moment.HTML5_FMT.DATETIME_LOCAL_MS);
121+
};

packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import {
88
FROM_PARTITION_RANGE,
99
getEnv,
1010
inDbTimeZone,
11-
MAX_SOURCE_ROW_LIMIT,
11+
MAX_SOURCE_ROW_LIMIT, reformatInIsoLocal,
1212
timeSeries,
1313
TO_PARTITION_RANGE,
1414
utcToLocalTimeZone,
@@ -192,6 +192,8 @@ export type PreAggregationDescription = {
192192
buildRangeEnd?: string;
193193
updateWindowSeconds?: number;
194194
sealAt?: string;
195+
rollupLambdaId?: string;
196+
lastRollupLambda?: boolean;
195197
};
196198

197199
const tablesToVersionEntries = (schema, tables: TableCacheEntry[]): VersionEntry[] => R.sortBy(
@@ -456,6 +458,8 @@ type LoadPreAggregationResult = {
456458
buildRangeEnd?: string;
457459
lambdaTable?: InlineTable;
458460
queryKey?: any[];
461+
rollupLambdaId?: string;
462+
partitionRange?: QueryDateRange;
459463
};
460464

461465
export class PreAggregationLoader {
@@ -1440,7 +1444,7 @@ export class PreAggregationPartitionRangeLoader {
14401444
// eslint-disable-next-line no-use-before-define
14411445
private readonly preAggregations: PreAggregations,
14421446
private readonly preAggregation: PreAggregationDescription,
1443-
private readonly preAggregationsTablesToTempTables: any,
1447+
private readonly preAggregationsTablesToTempTables: [string, LoadPreAggregationResult][],
14441448
private readonly loadCache: any,
14451449
private readonly options: PreAggsPartitionRangeLoaderOpts = {
14461450
maxPartitions: 10000,
@@ -1562,7 +1566,7 @@ export class PreAggregationPartitionRangeLoader {
15621566
);
15631567
const [_, buildRangeEnd] = buildRange;
15641568
const loadRange: [string, string] = [...range];
1565-
if (this.preAggregation.unionWithSourceData && buildRangeEnd < range[1]) {
1569+
if (buildRangeEnd < range[1]) {
15661570
loadRange[1] = buildRangeEnd;
15671571
}
15681572
const sealAt = addSecondsToLocalTimestamp(
@@ -1581,7 +1585,7 @@ export class PreAggregationPartitionRangeLoader {
15811585
.map(q => ({ ...q, sql: this.replacePartitionSqlAndParams(q.sql, range, partitionTableName) })),
15821586
previewSql: this.preAggregation.previewSql &&
15831587
this.replacePartitionSqlAndParams(this.preAggregation.previewSql, range, partitionTableName),
1584-
buildRangeEnd,
1588+
buildRangeEnd: loadRange[1],
15851589
sealAt, // Used only for kSql pre aggregations
15861590
};
15871591
}
@@ -1600,34 +1604,61 @@ export class PreAggregationPartitionRangeLoader {
16001604
this.loadCache,
16011605
this.options,
16021606
));
1603-
const resolveResults = await Promise.all(partitionLoaders.map(l => l.loadPreAggregation(false)));
1604-
const loadResults = resolveResults.filter(res => res !== null);
1607+
const resolveResults = await Promise.all(partitionLoaders.map(async (l, i) => {
1608+
const result = await l.loadPreAggregation(false);
1609+
return result && {
1610+
...result,
1611+
partitionRange: partitionRanges[i]
1612+
};
1613+
}));
1614+
let loadResults = resolveResults.filter(res => res !== null);
16051615
if (this.options.externalRefresh && loadResults.length === 0) {
16061616
throw new Error(
16071617
// eslint-disable-next-line no-use-before-define
16081618
PreAggregations.noPreAggregationPartitionsBuiltMessage(partitionLoaders.map(p => p.preAggregation))
16091619
);
16101620
}
1621+
1622+
let lambdaTable: InlineTable;
1623+
let emptyResult = false;
1624+
1625+
if (this.preAggregation.rollupLambdaId) {
1626+
if (this.lambdaQuery && loadResults.length > 0) {
1627+
const { buildRangeEnd } = loadResults[loadResults.length - 1];
1628+
lambdaTable = await this.downloadLambdaTable(buildRangeEnd);
1629+
}
1630+
const rollupLambdaResults = this.preAggregationsTablesToTempTables.filter(tempTableResult => tempTableResult[1].rollupLambdaId === this.preAggregation.rollupLambdaId);
1631+
const lastResult = rollupLambdaResults[rollupLambdaResults.length - 1];
1632+
const filteredResults = loadResults.filter(
1633+
r => (this.preAggregation.lastRollupLambda || reformatInIsoLocal(r.buildRangeEnd) === reformatInIsoLocal(r.partitionRange[1])) &&
1634+
(!lastResult || !lastResult[1].buildRangeEnd || reformatInIsoLocal(lastResult[1].buildRangeEnd) < reformatInIsoLocal(r.partitionRange[0]))
1635+
);
1636+
if (filteredResults.length === 0) {
1637+
emptyResult = true;
1638+
loadResults = [loadResults[loadResults.length - 1]];
1639+
} else {
1640+
loadResults = filteredResults;
1641+
}
1642+
}
1643+
16111644
const allTableTargetNames = loadResults.map(targetTableName => targetTableName.targetTableName);
16121645
let lastUpdatedAt = getLastUpdatedAtTimestamp(loadResults.map(r => r.lastUpdatedAt));
1613-
let lambdaTable: InlineTable;
16141646

1615-
if (this.lambdaQuery && loadResults.length > 0) {
1616-
const { buildRangeEnd } = loadResults[loadResults.length - 1];
1617-
lambdaTable = await this.downloadLambdaTable(buildRangeEnd);
1647+
if (lambdaTable) {
16181648
allTableTargetNames.push(lambdaTable.name);
16191649
lastUpdatedAt = Date.now();
16201650
}
16211651

16221652
const unionTargetTableName = allTableTargetNames
1623-
.map(targetTableName => `SELECT * FROM ${targetTableName}`)
1653+
.map(targetTableName => `SELECT * FROM ${targetTableName}${emptyResult ? ' WHERE 1 = 0' : ''}`)
16241654
.join(' UNION ALL ');
16251655
return {
1626-
targetTableName: allTableTargetNames.length === 1 ? allTableTargetNames[0] : `(${unionTargetTableName})`,
1656+
targetTableName: allTableTargetNames.length === 1 && !emptyResult ? allTableTargetNames[0] : `(${unionTargetTableName})`,
16271657
refreshKeyValues: loadResults.map(t => t.refreshKeyValues),
16281658
lastUpdatedAt,
1629-
buildRangeEnd: undefined,
1659+
buildRangeEnd: !emptyResult && loadResults.length && loadResults[loadResults.length - 1].buildRangeEnd,
16301660
lambdaTable,
1661+
rollupLambdaId: this.preAggregation.rollupLambdaId,
16311662
};
16321663
} else {
16331664
return new PreAggregationLoader(
@@ -1713,9 +1744,7 @@ export class PreAggregationPartitionRangeLoader {
17131744
));
17141745
if (partitionRanges.length > this.options.maxPartitions) {
17151746
throw new Error(
1716-
`The maximum number of partitions (${
1717-
this.options.maxPartitions
1718-
}) was reached for the pre-aggregation`
1747+
`Pre-aggregation '${this.preAggregation.tableName}' requested to build ${partitionRanges.length} partitions which exceeds the maximum number of partitions per pre-aggregation of ${this.options.maxPartitions}`
17191748
);
17201749
}
17211750
return { buildRange: dateRange, partitionRanges };

packages/cubejs-query-orchestrator/test/unit/QueryOrchestrator.test.js

Lines changed: 93 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,22 @@ import { QueryOrchestrator } from '../../src/orchestrator/QueryOrchestrator';
33

44
class MockDriver {
55
constructor({ csvImport } = {}) {
6-
this.tables = [];
6+
this.tablesObj = [];
77
this.tablesReady = [];
88
this.executedQueries = [];
99
this.cancelledQueries = [];
1010
this.csvImport = csvImport;
1111
this.now = new Date().getTime();
1212
}
1313

14+
get tables() {
15+
return this.tablesObj.map(t => t.tableName || t);
16+
}
17+
18+
resetTables() {
19+
this.tablesObj = [];
20+
}
21+
1422
query(query) {
1523
this.executedQueries.push(query);
1624
let promise = Promise.resolve([query]);
@@ -26,6 +34,10 @@ class MockDriver {
2634
promise = promise.then(() => [{ now: new Date().toJSON() }]);
2735
}
2836

37+
if (query.match(/^SELECT '(\d+-\d+-\d+)'/)) {
38+
promise = promise.then(() => [{ date: new Date(`${query.match(/^SELECT '(\d+-\d+-\d+)'/)[1]}T00:00:00.000Z`).toJSON() }]);
39+
}
40+
2941
if (query.match(/^SELECT MAX\(timestamp\)/)) {
3042
promise = promise.then(() => [{ max: new Date('2021-06-01T00:00:00.000Z').toJSON() }]);
3143
}
@@ -56,7 +68,11 @@ class MockDriver {
5668
if (this.tablesQueryDelay) {
5769
await this.delay(this.tablesQueryDelay);
5870
}
59-
return this.tables.filter(t => t.split('.')[0] === schema).map(t => ({ table_name: t.replace(`${schema}.`, '') }));
71+
return this.tablesObj.filter(t => (t.tableName || t).split('.')[0] === schema)
72+
.map(t => ({
73+
table_name: (t.tableName || t).replace(`${schema}.`, ''),
74+
build_range_end: t.buildRangeEnd
75+
}));
6076
}
6177

6278
delay(timeout) {
@@ -69,15 +85,15 @@ class MockDriver {
6985
}
7086

7187
loadPreAggregationIntoTable(preAggregationTableName, loadSql) {
72-
this.tables.push(preAggregationTableName.substring(0, 100));
88+
this.tablesObj.push({ tableName: preAggregationTableName.substring(0, 100) });
7389
const promise = this.query(loadSql);
7490
const resPromise = promise.then(() => this.tablesReady.push(preAggregationTableName.substring(0, 100)));
7591
resPromise.cancel = promise.cancel;
7692
return resPromise;
7793
}
7894

7995
async dropTable(tableName) {
80-
this.tables = this.tables.filter(t => t !== tableName);
96+
this.tablesObj = this.tablesObj.filter(t => (t.tableName || t) !== tableName);
8197
return this.query(`DROP TABLE ${tableName}`);
8298
}
8399

@@ -109,12 +125,12 @@ class ExternalMockDriver extends MockDriver {
109125
}
110126

111127
async uploadTable(table) {
112-
this.tables.push(table.substring(0, 100));
128+
this.tablesObj.push({ tableName: table.substring(0, 100) });
113129
throw new Error('uploadTable has been called instead of uploadTableWithIndexes');
114130
}
115131

116-
async uploadTableWithIndexes(table, columns, tableData, indexesSql) {
117-
this.tables.push(table.substring(0, 100));
132+
async uploadTableWithIndexes(table, columns, tableData, indexesSql, uniqueKeyColumns, queryTracingObj, externalOptions) {
133+
this.tablesObj.push({ tableName: table.substring(0, 100), buildRangeEnd: queryTracingObj?.buildRangeEnd });
118134
if (tableData.csvFile) {
119135
this.csvFiles.push(tableData.csvFile);
120136
}
@@ -209,7 +225,7 @@ describe('QueryOrchestrator', () => {
209225
}),
210226
},
211227
preAggregationsOptions: {
212-
maxPartitions: 32,
228+
maxPartitions: 100,
213229
queueOptions: () => ({
214230
executionTimeout: 2,
215231
concurrency: 2,
@@ -535,7 +551,7 @@ describe('QueryOrchestrator', () => {
535551
});
536552

537553
test('save structure versions', async () => {
538-
mockDriver.tables = [];
554+
mockDriver.resetTables();
539555
await queryOrchestrator.fetchQuery({
540556
query: 'SELECT * FROM stb_pre_aggregations.orders',
541557
values: [],
@@ -1017,7 +1033,7 @@ describe('QueryOrchestrator', () => {
10171033
await expect(async () => {
10181034
await queryOrchestrator.fetchQuery(query);
10191035
}).rejects.toThrow(
1020-
'The maximum number of partitions (32) was reached for the pre-aggregation'
1036+
'Pre-aggregation \'stb_pre_aggregations.orders_d\' requested to build 745 partitions which exceeds the maximum number of partitions per pre-aggregation of 100'
10211037
);
10221038
});
10231039

@@ -1137,6 +1153,73 @@ describe('QueryOrchestrator', () => {
11371153
expect(result.data[0]).toMatch(/orders_d20210601/);
11381154
});
11391155

1156+
test('lambda partitions', async () => {
1157+
const query = (matchedTimeDimensionDateRange) => ({
1158+
query: 'SELECT * FROM stb_pre_aggregations.orders_d UNION ALL SELECT * FROM stb_pre_aggregations.orders_h',
1159+
values: [],
1160+
cacheKeyQueries: {
1161+
queries: []
1162+
},
1163+
preAggregations: [{
1164+
preAggregationsSchema: 'stb_pre_aggregations',
1165+
tableName: 'stb_pre_aggregations.orders_d',
1166+
loadSql: [
1167+
'CREATE TABLE stb_pre_aggregations.orders_d AS SELECT * FROM public.orders WHERE timestamp >= ? AND timestamp <= ?',
1168+
['__FROM_PARTITION_RANGE', '__TO_PARTITION_RANGE']
1169+
],
1170+
invalidateKeyQueries: [['SELECT CASE WHEN NOW() > ? THEN NOW() END as now', ['__TO_PARTITION_RANGE'], {
1171+
renewalThreshold: 1,
1172+
updateWindowSeconds: 86400,
1173+
renewalThresholdOutsideUpdateWindow: 86400,
1174+
incremental: true
1175+
}]],
1176+
preAggregationStartEndQueries: [
1177+
['SELECT MIN(timestamp) FROM orders', []],
1178+
['SELECT \'2021-05-31\'', []],
1179+
],
1180+
external: true,
1181+
partitionGranularity: 'day',
1182+
timezone: 'UTC',
1183+
rollupLambdaId: 'orders.d_lambda',
1184+
matchedTimeDimensionDateRange
1185+
}, {
1186+
preAggregationsSchema: 'stb_pre_aggregations',
1187+
tableName: 'stb_pre_aggregations.orders_h',
1188+
loadSql: [
1189+
'CREATE TABLE stb_pre_aggregations.orders_h AS SELECT * FROM public.orders WHERE timestamp >= ? AND timestamp <= ?',
1190+
['__FROM_PARTITION_RANGE', '__TO_PARTITION_RANGE']
1191+
],
1192+
invalidateKeyQueries: [['SELECT CASE WHEN NOW() > ? THEN NOW() END as now', ['__TO_PARTITION_RANGE'], {
1193+
renewalThreshold: 1,
1194+
updateWindowSeconds: 86400,
1195+
renewalThresholdOutsideUpdateWindow: 86400,
1196+
incremental: true
1197+
}]],
1198+
preAggregationStartEndQueries: [
1199+
['SELECT \'2021-05-30\'', []],
1200+
['SELECT MAX(timestamp) FROM orders', []],
1201+
],
1202+
external: true,
1203+
partitionGranularity: 'hour',
1204+
timezone: 'UTC',
1205+
rollupLambdaId: 'orders.d_lambda',
1206+
lastRollupLambda: true,
1207+
matchedTimeDimensionDateRange
1208+
}],
1209+
requestId: 'lambda partitions',
1210+
external: true,
1211+
});
1212+
let result = await queryOrchestrator.fetchQuery(query());
1213+
console.log(JSON.stringify(result, null, 2));
1214+
expect(result.data[0]).toMatch(/orders_d20210501/);
1215+
expect(result.data[0]).not.toMatch(/orders_h2021053000/);
1216+
expect(result.data[0]).toMatch(/orders_h2021053100/);
1217+
1218+
result = await queryOrchestrator.fetchQuery(query(['2021-05-31T00:00:00.000', '2021-05-31T23:59:59.999']));
1219+
console.log(JSON.stringify(result, null, 2));
1220+
expect(result.data[0]).toMatch(/orders_h2021053100/);
1221+
});
1222+
11401223
test('loadRefreshKeys', async () => {
11411224
const preAggregationsLoadCacheByDataSource = {};
11421225
const preAggregationExternalRefreshKey = {

packages/cubejs-schema-compiler/src/adapter/BaseQuery.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,7 @@ class BaseQuery {
576576
const preAggForQuery = this.preAggregations.findPreAggregationForQuery();
577577
const result = {};
578578
if (preAggForQuery && preAggForQuery.preAggregation.unionWithSourceData) {
579-
const lambdaPreAgg = preAggForQuery.referencedPreAggregations[0];
579+
const lambdaPreAgg = preAggForQuery.referencedPreAggregations[preAggForQuery.referencedPreAggregations.length - 1];
580580
// TODO(cristipp) Use source query instead of preaggregation references.
581581
const references = this.cubeEvaluator.evaluatePreAggregationReferences(lambdaPreAgg.cube, lambdaPreAgg.preAggregation);
582582
const lambdaQuery = this.newSubQuery(

0 commit comments

Comments
 (0)