Skip to content

Commit 656edcf

Browse files
committed
fix: Last lambda partition is incorrectly filtered in case of middle partition is filtered out
1 parent 1d5a1f6 commit 656edcf

File tree

2 files changed

+86
-2
lines changed

2 files changed

+86
-2
lines changed

packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1654,10 +1654,9 @@ export class PreAggregationPartitionRangeLoader {
16541654
lambdaTable = await this.downloadLambdaTable(buildRangeEnd);
16551655
}
16561656
const rollupLambdaResults = this.preAggregationsTablesToTempTables.filter(tempTableResult => tempTableResult[1].rollupLambdaId === this.preAggregation.rollupLambdaId);
1657-
const lastResult = rollupLambdaResults[rollupLambdaResults.length - 1];
16581657
const filteredResults = loadResults.filter(
16591658
r => (this.preAggregation.lastRollupLambda || reformatInIsoLocal(r.buildRangeEnd) === reformatInIsoLocal(r.partitionRange[1])) &&
1660-
(!lastResult || !lastResult[1].buildRangeEnd || reformatInIsoLocal(lastResult[1].buildRangeEnd) < reformatInIsoLocal(r.partitionRange[0]))
1659+
rollupLambdaResults.every(result => !result[1].buildRangeEnd || reformatInIsoLocal(result[1].buildRangeEnd) < reformatInIsoLocal(r.partitionRange[0]))
16611660
);
16621661
if (filteredResults.length === 0) {
16631662
emptyResult = true;

packages/cubejs-query-orchestrator/test/unit/QueryOrchestrator.test.js

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1241,6 +1241,91 @@ describe('QueryOrchestrator', () => {
12411241
expect(result.data[0]).toMatch(/orders_h2021060100_uozkyaur_d004iq51/);
12421242
});
12431243

1244+
test('lambda partitions week', async () => {
1245+
const query = (matchedTimeDimensionDateRange) => ({
1246+
query: 'SELECT * FROM stb_pre_aggregations.orders_w UNION ALL SELECT * FROM stb_pre_aggregations.orders_d UNION ALL SELECT * FROM stb_pre_aggregations.orders_h',
1247+
values: [],
1248+
cacheKeyQueries: {
1249+
queries: []
1250+
},
1251+
preAggregations: [{
1252+
preAggregationsSchema: 'stb_pre_aggregations',
1253+
tableName: 'stb_pre_aggregations.orders_w',
1254+
loadSql: [
1255+
'CREATE TABLE stb_pre_aggregations.orders_w AS SELECT * FROM public.orders WHERE timestamp >= ? AND timestamp <= ?',
1256+
['__FROM_PARTITION_RANGE', '__TO_PARTITION_RANGE']
1257+
],
1258+
invalidateKeyQueries: [['SELECT CASE WHEN NOW() > ? THEN NOW() END as now', ['__TO_PARTITION_RANGE'], {
1259+
renewalThreshold: 1,
1260+
updateWindowSeconds: 86400,
1261+
renewalThresholdOutsideUpdateWindow: 86400,
1262+
incremental: true
1263+
}]],
1264+
preAggregationStartEndQueries: [
1265+
['SELECT MIN(timestamp) FROM orders', []],
1266+
['SELECT \'2021-05-31\'', []],
1267+
],
1268+
external: true,
1269+
partitionGranularity: 'week',
1270+
timezone: 'UTC',
1271+
rollupLambdaId: 'orders.d_lambda',
1272+
matchedTimeDimensionDateRange
1273+
}, {
1274+
preAggregationsSchema: 'stb_pre_aggregations',
1275+
tableName: 'stb_pre_aggregations.orders_d',
1276+
loadSql: [
1277+
'CREATE TABLE stb_pre_aggregations.orders_d AS SELECT * FROM public.orders WHERE timestamp >= ? AND timestamp <= ?',
1278+
['__FROM_PARTITION_RANGE', '__TO_PARTITION_RANGE']
1279+
],
1280+
invalidateKeyQueries: [['SELECT CASE WHEN NOW() > ? THEN NOW() END as now', ['__TO_PARTITION_RANGE'], {
1281+
renewalThreshold: 1,
1282+
updateWindowSeconds: 86400,
1283+
renewalThresholdOutsideUpdateWindow: 86400,
1284+
incremental: true
1285+
}]],
1286+
preAggregationStartEndQueries: [
1287+
['SELECT MIN(timestamp) FROM orders', []],
1288+
['SELECT \'2021-05-31\'', []],
1289+
],
1290+
external: true,
1291+
partitionGranularity: 'day',
1292+
timezone: 'UTC',
1293+
rollupLambdaId: 'orders.d_lambda',
1294+
matchedTimeDimensionDateRange
1295+
}, {
1296+
preAggregationsSchema: 'stb_pre_aggregations',
1297+
tableName: 'stb_pre_aggregations.orders_h',
1298+
loadSql: [
1299+
'CREATE TABLE stb_pre_aggregations.orders_h AS SELECT * FROM public.orders WHERE timestamp >= ? AND timestamp <= ?',
1300+
['__FROM_PARTITION_RANGE', '__TO_PARTITION_RANGE']
1301+
],
1302+
invalidateKeyQueries: [['SELECT CASE WHEN NOW() > ? THEN NOW() END as now', ['__TO_PARTITION_RANGE'], {
1303+
renewalThreshold: 1,
1304+
updateWindowSeconds: 86400,
1305+
renewalThresholdOutsideUpdateWindow: 86400,
1306+
incremental: true
1307+
}]],
1308+
preAggregationStartEndQueries: [
1309+
['SELECT \'2021-05-30\'', []],
1310+
['SELECT MAX(timestamp) FROM orders', []],
1311+
],
1312+
external: true,
1313+
partitionGranularity: 'hour',
1314+
timezone: 'UTC',
1315+
rollupLambdaId: 'orders.d_lambda',
1316+
lastRollupLambda: true,
1317+
matchedTimeDimensionDateRange
1318+
}],
1319+
requestId: 'lambda partitions',
1320+
external: true,
1321+
});
1322+
const result = await queryOrchestrator.fetchQuery(query());
1323+
console.log(JSON.stringify(result, null, 2));
1324+
expect(result.data[0]).not.toMatch(/orders_h2021053000/);
1325+
expect(result.data[0]).toMatch(/orders_h2021053100/);
1326+
expect(result.data[0]).toMatch(/orders_h2021060100_uozkyaur_d004iq51/);
1327+
});
1328+
12441329
test('real-time sealing partitions', async () => {
12451330
const query = (matchedTimeDimensionDateRange) => ({
12461331
query: 'SELECT * FROM stb_pre_aggregations.orders_d',

0 commit comments

Comments
 (0)