Skip to content

Commit fa284a4

Browse files
authored
feat: Support rollupLambda across different cubes and data sources (#6245)
* feat: Support rollupLambda across different cubes and data sources * Fix linter
1 parent 3bba803 commit fa284a4

File tree

6 files changed

+141
-127
lines changed

6 files changed

+141
-127
lines changed

packages/cubejs-schema-compiler/src/adapter/PostgresQuery.js

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,4 @@ export class PostgresQuery extends BaseQuery {
4343
countDistinctApprox(sql) {
4444
return `round(hll_cardinality(hll_add_agg(hll_hash_any(${sql}))))`;
4545
}
46-
47-
preAggregationTableName(cube, preAggregationName, skipSchema) {
48-
const name = super.preAggregationTableName(cube, preAggregationName, skipSchema);
49-
if (name.length > 64) {
50-
throw new UserError(`Postgres can not work with table names that longer than 64 symbols. Consider using the 'sqlAlias' attribute in your cube and in your pre-aggregation definition for ${name}.`);
51-
}
52-
return name;
53-
}
5446
}

packages/cubejs-schema-compiler/src/adapter/PreAggregations.js

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -867,6 +867,9 @@ export class PreAggregations {
867867
throw new UserError(`rollupLambda '${cube}.${preAggregationName}' should reference at least on rollup`);
868868
}
869869
referencedPreAggregations.forEach((referencedPreAggregation, i) => {
870+
if (i === referencedPreAggregations.length - 1 && preAggObj.preAggregation.unionWithSourceData && preAggObj.cube !== referencedPreAggregations[i].cube) {
871+
throw new UserError(`unionWithSourceData can be enabled only for pre-aggregation within '${preAggObj.cube}' cube but '${referencedPreAggregations[i].preAggregationName}' pre-aggregation is defined within '${referencedPreAggregations[i].cube}' cube`);
872+
}
870873
referencedPreAggregations[i] = {
871874
...referencedPreAggregations[i],
872875
preAggregation: {
@@ -1068,33 +1071,55 @@ export class PreAggregations {
10681071
);
10691072
}
10701073

1071-
rollupLambdaUnion(preAggregationForQuery) {
1074+
rollupLambdaUnion(preAggregationForQuery, rollupGranularity) {
10721075
if (!preAggregationForQuery.referencedPreAggregations) {
10731076
return this.preAggregationTableName(
10741077
preAggregationForQuery.cube,
10751078
preAggregationForQuery.preAggregationName,
10761079
preAggregationForQuery.preAggregation
10771080
);
10781081
}
1079-
const tables = preAggregationForQuery.referencedPreAggregations.map(preAggregation => this.preAggregationTableName(
1080-
preAggregation.cube,
1081-
preAggregation.preAggregationName,
1082-
preAggregation.preAggregation
1083-
));
1082+
1083+
const targetDimensionsReferences = this.dimensionsRenderedReference(preAggregationForQuery);
1084+
const targetTimeDimensionsReferences = this.timeDimensionsRenderedReference(rollupGranularity, preAggregationForQuery);
1085+
const targetMeasuresReferences = this.measureAliasesRenderedReference(preAggregationForQuery);
1086+
1087+
const columnsFor = (targetReferences, references, preAggregation) => Object.keys(targetReferences).map(
1088+
member => `${references[this.query.cubeEvaluator.pathFromArray([preAggregation.cube, member.split('.')[1]])]} ${targetReferences[member]}`
1089+
);
1090+
1091+
const tables = preAggregationForQuery.referencedPreAggregations.map(preAggregation => {
1092+
const dimensionsReferences = this.dimensionsRenderedReference(preAggregation);
1093+
const timeDimensionsReferences = this.timeDimensionsRenderedReference(rollupGranularity, preAggregation);
1094+
const measuresReferences = this.measureAliasesRenderedReference(preAggregation);
1095+
1096+
return {
1097+
tableName: this.preAggregationTableName(
1098+
preAggregation.cube,
1099+
preAggregation.preAggregationName,
1100+
preAggregation.preAggregation
1101+
),
1102+
columns: columnsFor(targetDimensionsReferences, dimensionsReferences, preAggregation)
1103+
.concat(columnsFor(targetTimeDimensionsReferences, timeDimensionsReferences, preAggregation))
1104+
.concat(columnsFor(targetMeasuresReferences, measuresReferences, preAggregation))
1105+
};
1106+
});
10841107
if (tables.length === 1) {
1085-
return tables[0];
1108+
return tables[0].tableName;
10861109
}
1087-
const union = tables.map(table => `SELECT * FROM ${table}`).join(' UNION ALL ');
1110+
const union = tables.map(table => `SELECT ${table.columns.join(', ')} FROM ${table.tableName}`).join(' UNION ALL ');
10881111
return `(${union})`;
10891112
}
10901113

10911114
rollupPreAggregation(preAggregationForQuery, measures, isFullSimpleQuery, filters) {
10921115
let toJoin;
1116+
// TODO granularity shouldn't be null?
1117+
const rollupGranularity = preAggregationForQuery.references.timeDimensions[0]?.granularity || 'day';
10931118

10941119
const sqlAndAlias = (preAgg) => ({
10951120
preAggregation: preAgg,
10961121
alias: this.query.cubeAlias(this.query.cubeEvaluator.pathFromArray([preAgg.cube, preAgg.preAggregationName])),
1097-
sql: this.rollupLambdaUnion(preAgg)
1122+
sql: this.rollupLambdaUnion(preAgg, rollupGranularity)
10981123
});
10991124

11001125
if (preAggregationForQuery.preAggregation.type === 'rollupJoin') {
@@ -1141,9 +1166,6 @@ export class PreAggregations {
11411166
}))
11421167
).filter(f => !!f);
11431168

1144-
// TODO granularity shouldn't be null?
1145-
const rollupGranularity = preAggregationForQuery.references.timeDimensions[0]?.granularity || 'day';
1146-
11471169
const renderedReference = {
11481170
...(this.measuresRenderedReference(preAggregationForQuery)),
11491171
...(this.dimensionsRenderedReference(preAggregationForQuery)),
@@ -1186,6 +1208,19 @@ export class PreAggregations {
11861208
)(this.rollupMeasures(preAggregationForQuery));
11871209
}
11881210

1211+
measureAliasesRenderedReference(preAggregationForQuery) {
1212+
return R.pipe(
1213+
R.map(path => {
1214+
const measure = this.query.newMeasure(path);
1215+
return [
1216+
path,
1217+
measure.aliasName(),
1218+
];
1219+
}),
1220+
R.fromPairs,
1221+
)(this.rollupMeasures(preAggregationForQuery));
1222+
}
1223+
11891224
dimensionsRenderedReference(preAggregationForQuery) {
11901225
return R.pipe(
11911226
R.map(path => {

packages/cubejs-testing/birdbox-fixtures/lambda/schema/Orders.js

Lines changed: 53 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@ cube(`Orders`, {
1010

1111
ordersByCompletedAtLambda: {
1212
type: `rollupLambda`,
13-
rollups: [ordersByCompletedAt, ordersByCompletedByDay, AOrdersByCompletedByHour],
14-
unionWithSourceData: true,
13+
rollups: [ordersByCompletedAt, ordersByCompletedByDay, RealTimeOrders.AOrdersByCompletedByHour],
1514
},
1615

1716
ordersByCompletedAtAndUserIdLambda: {
@@ -47,10 +46,10 @@ cube(`Orders`, {
4746
granularity: `day`,
4847
partitionGranularity: `month`,
4948
buildRangeStart: {
50-
sql: `SELECT DATE('2020-02-7')`,
49+
sql: `SELECT DATE('2021-01-1')`,
5150
},
5251
buildRangeEnd: {
53-
sql: `SELECT DATE('2020-05-1')`,
52+
sql: `SELECT DATE('2021-12-1')`,
5453
},
5554
refreshKey: {
5655
every: '1 day'
@@ -64,27 +63,10 @@ cube(`Orders`, {
6463
granularity: `day`,
6564
partitionGranularity: `day`,
6665
buildRangeStart: {
67-
sql: `SELECT DATE('2020-05-1')`,
66+
sql: `SELECT DATE('2021-12-1')`,
6867
},
6968
buildRangeEnd: {
70-
sql: `SELECT DATE('2020-05-5')`,
71-
},
72-
refreshKey: {
73-
every: '1 day'
74-
},
75-
},
76-
77-
AOrdersByCompletedByHour: {
78-
measures: [count],
79-
dimensions: [status],
80-
timeDimension: completedAt,
81-
granularity: `day`,
82-
partitionGranularity: `hour`,
83-
buildRangeStart: {
84-
sql: `SELECT DATE('2020-05-5')`,
85-
},
86-
buildRangeEnd: {
87-
sql: `SELECT DATE('2020-05-7')`,
69+
sql: `SELECT DATE('2021-12-31')`,
8870
},
8971
refreshKey: {
9072
every: '1 day'
@@ -101,7 +83,7 @@ cube(`Orders`, {
10183
sql: `SELECT DATE('2020-02-7')`,
10284
},
10385
buildRangeEnd: {
104-
sql: `SELECT DATE('2020-05-7')`,
86+
sql: `SELECT DATE('2020-12-1')`,
10587
},
10688
refreshKey: {
10789
every: '1 day'
@@ -146,3 +128,50 @@ cube(`Orders`, {
146128
},
147129
},
148130
});
131+
132+
cube(`RealTimeOrders`, {
133+
sql: `SELECT * FROM public.orders`,
134+
135+
measures: {
136+
count: {
137+
type: `count`,
138+
},
139+
},
140+
141+
dimensions: {
142+
id: {
143+
sql: `id`,
144+
type: `number`,
145+
primaryKey: true,
146+
},
147+
148+
status: {
149+
sql: `status`,
150+
type: `string`,
151+
},
152+
153+
completedAt: {
154+
sql: `completed_at`,
155+
type: `time`,
156+
},
157+
},
158+
159+
preAggregations: {
160+
AOrdersByCompletedByHour: {
161+
measures: [count],
162+
dimensions: [status],
163+
timeDimension: completedAt,
164+
granularity: `day`,
165+
partitionGranularity: `hour`,
166+
buildRangeStart: {
167+
sql: `SELECT DATE('2021-12-29')`,
168+
},
169+
buildRangeEnd: {
170+
sql: `SELECT DATE('2022-01-01')`,
171+
},
172+
refreshKey: {
173+
every: '1 day'
174+
},
175+
},
176+
}
177+
});

packages/cubejs-testing/test/smoke-lambda.test.ts

Lines changed: 39 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -21,40 +21,6 @@ async function runScheduledRefresh(client: any) {
2121
);
2222
}
2323

24-
async function checkCubestoreState(cubestore: any) {
25-
let rows = await cubestore.query('SELECT table_schema, table_name, build_range_end FROM information_schema.tables ORDER BY table_name', []);
26-
const table = rows[3];
27-
rows = R.map(
28-
// eslint-disable-next-line camelcase
29-
({ table_schema, table_name, build_range_end }) => ({ table_schema, table_name: table_name.split('_').slice(0, -3).join('_'), build_range_end }),
30-
rows
31-
);
32-
expect(rows.slice(0, 4)).toEqual([
33-
{
34-
table_schema: 'dev_pre_aggregations',
35-
table_name: 'orders__a_orders_by_completed_by_hour2020050500',
36-
build_range_end: '2020-05-05T00:59:59.999Z',
37-
},
38-
{
39-
table_schema: 'dev_pre_aggregations',
40-
table_name: 'orders__a_orders_by_completed_by_hour2020050501',
41-
build_range_end: '2020-05-05T01:59:59.999Z',
42-
},
43-
{
44-
table_schema: 'dev_pre_aggregations',
45-
table_name: 'orders__a_orders_by_completed_by_hour2020050502',
46-
build_range_end: '2020-05-05T02:59:59.999Z',
47-
},
48-
{
49-
table_schema: 'dev_pre_aggregations',
50-
table_name: 'orders__a_orders_by_completed_by_hour2020050503',
51-
build_range_end: '2020-05-05T03:59:59.999Z',
52-
},
53-
]);
54-
rows = await cubestore.query(`SELECT * FROM ${table.table_schema}.${table.table_name}`, []);
55-
expect(rows.length).toEqual(1);
56-
}
57-
5824
describe('lambda', () => {
5925
jest.setTimeout(60 * 5 * 1000);
6026

@@ -140,7 +106,7 @@ describe('lambda', () => {
140106
expect(Object.keys(response.loadResponse.results[0].usedPreAggregations)).toEqual([
141107
'dev_pre_aggregations.orders_orders_by_completed_at',
142108
'dev_pre_aggregations.orders_orders_by_completed_by_day',
143-
'dev_pre_aggregations.orders__a_orders_by_completed_by_hour'
109+
'dev_pre_aggregations.real_time_orders__a_orders_by_completed_by_hour'
144110
]);
145111

146112
// With lambda-view we observe all 'fresh' data, with no partition/buildRange limit.
@@ -166,49 +132,6 @@ describe('lambda', () => {
166132
},
167133
]
168134
);
169-
170-
await checkCubestoreState(cubestore);
171-
172-
// add a row to (2021-01-06T00:00:00.000, shipped)
173-
// add a row to (2021-12-30T00:00:00.000, shipped)
174-
// add 2 rows to (_, completed), should not be visible in the results
175-
await postgres.query(`
176-
INSERT INTO public.Orders
177-
(id, user_id, number, status, completed_at, created_at, product_id)
178-
VALUES
179-
(1000000, 123, 321, 'shipped', '2021-01-06T09:00:00.000', '2021-01-05T09:00:00.000', 25),
180-
(1000001, 123, 321, 'completed', '2021-01-06T09:00:00.000', '2021-01-05T09:00:00.000', 25),
181-
(1000002, 123, 321, 'shipped', '2021-12-30T09:00:00.000', '2021-12-20T09:00:00.000', 25),
182-
(1000003, 123, 321, 'completed', '2021-12-30T09:00:00.000', '2021-12-20T09:00:00.000', 25);
183-
`);
184-
185-
// wait past refreshKey: { every: '1 second' } to invalidate the cached lambda query
186-
await pausePromise(2000);
187-
188-
const response2 = await client.load(query);
189-
190-
expect(response2.rawData()).toEqual(
191-
[
192-
{
193-
'Orders.completedAt': '2021-12-30T00:00:00.000',
194-
'Orders.completedAt.day': '2021-12-30T00:00:00.000',
195-
'Orders.count': '1',
196-
'Orders.status': 'shipped',
197-
},
198-
{
199-
'Orders.completedAt': '2021-01-07T00:00:00.000',
200-
'Orders.completedAt.day': '2021-01-07T00:00:00.000',
201-
'Orders.count': '1',
202-
'Orders.status': 'shipped',
203-
},
204-
{
205-
'Orders.completedAt': '2021-01-06T00:00:00.000',
206-
'Orders.completedAt.day': '2021-01-06T00:00:00.000',
207-
'Orders.count': '3',
208-
'Orders.status': 'shipped',
209-
},
210-
]
211-
);
212135
});
213136

214137
test('query month', async () => {
@@ -234,6 +157,44 @@ describe('lambda', () => {
234157

235158
// With lambda-view we observe all 'fresh' data, with no partition/buildRange limit.
236159
expect(response.rawData()).toEqual(
160+
[
161+
{
162+
'Orders.completedAt': '2021-01-01T00:00:00.000',
163+
'Orders.completedAt.month': '2021-01-01T00:00:00.000',
164+
'Orders.count': '125',
165+
},
166+
{
167+
'Orders.completedAt': '2020-12-01T00:00:00.000',
168+
'Orders.completedAt.month': '2020-12-01T00:00:00.000',
169+
'Orders.count': '808',
170+
},
171+
{
172+
'Orders.completedAt': '2020-11-01T00:00:00.000',
173+
'Orders.completedAt.month': '2020-11-01T00:00:00.000',
174+
'Orders.count': '730',
175+
},
176+
]
177+
);
178+
179+
// add a row to (2021-01-06T00:00:00.000, shipped)
180+
// add a row to (2021-12-30T00:00:00.000, shipped)
181+
// add 2 rows to (_, completed), should not be visible in the results
182+
await postgres.query(`
183+
INSERT INTO public.Orders
184+
(id, user_id, number, status, completed_at, created_at, product_id)
185+
VALUES
186+
(1000000, 123, 321, 'shipped', '2021-01-06T09:00:00.000Z', '2021-01-05T09:00:00.000Z', 25),
187+
(1000001, 123, 321, 'completed', '2021-01-06T09:00:00.000Z', '2021-01-05T09:00:00.000Z', 25),
188+
(1000002, 123, 321, 'shipped', '2021-12-30T09:00:00.000Z', '2021-12-20T09:00:00.000Z', 25),
189+
(1000003, 123, 321, 'completed', '2021-12-30T09:00:00.000Z', '2021-12-20T09:00:00.000Z', 25);
190+
`);
191+
192+
// wait past refreshKey: { every: '1 second' } to invalidate the cached lambda query
193+
await pausePromise(2000);
194+
195+
const response2 = await client.load(query);
196+
197+
expect(response2.rawData()).toEqual(
237198
[
238199
{
239200
'Orders.completedAt': '2021-12-01T00:00:00.000',
@@ -311,13 +272,10 @@ describe('lambda', () => {
311272
},
312273
]
313274
);
314-
315-
await checkCubestoreState(cubestore);
316275
});
317276

318277
test('refresh', async () => {
319278
await runScheduledRefresh(client);
320-
await checkCubestoreState(cubestore);
321279
});
322280

323281
it('Pre-aggregations API', async () => {

rust/cubestore/cubestore/src/remotefs/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ impl LocalDirRemoteFs {
326326
pub async fn remove_empty_paths(root: PathBuf, path: PathBuf) -> Result<(), CubeError> {
327327
if let Some(parent_path) = path.parent() {
328328
let mut dir = fs::read_dir(parent_path).await?;
329-
if dir.next_entry().await?.is_none() {
329+
if !parent_path.starts_with("temp-uploads") && dir.next_entry().await?.is_none() {
330330
fs::remove_dir(parent_path).await?;
331331
}
332332
if root != parent_path.to_path_buf() {

0 commit comments

Comments
 (0)