Skip to content

Commit 6520279

Browse files
committed
fix: Incorrect partition filter for real time partitions
1 parent 1356aa7 commit 6520279

File tree

2 files changed

+46
-4
lines changed

2 files changed

+46
-4
lines changed

packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ export type PreAggregationDescription = {
181181
timezone: string;
182182
indexesSql: IndexDescription[];
183183
invalidateKeyQueries: QueryWithParams[];
184+
partitionInvalidateKeyQueries: QueryWithParams[];
184185
structureVersionLoadSql: QueryWithParams;
185186
sql: QueryWithParams;
186187
loadSql: QueryWithParams;
@@ -1571,12 +1572,14 @@ export class PreAggregationPartitionRangeLoader {
15711572
);
15721573
const [_, buildRangeEnd] = buildRange;
15731574
const loadRange: [string, string] = [...range];
1575+
const partitionInvalidateKeyQueries = this.preAggregation.partitionInvalidateKeyQueries || this.preAggregation.invalidateKeyQueries;
1576+
// `partitionInvalidateKeyQueries = []` in case of real time
1577+
if ((!partitionInvalidateKeyQueries || partitionInvalidateKeyQueries.length > 0) && buildRangeEnd < range[1]) {
1578+
loadRange[1] = buildRangeEnd;
1579+
}
15741580
const sealAt = addSecondsToLocalTimestamp(
15751581
loadRange[1], this.preAggregation.timezone, this.preAggregation.updateWindowSeconds || 0
15761582
).toISOString();
1577-
if (buildRangeEnd < range[1]) {
1578-
loadRange[1] = buildRangeEnd;
1579-
}
15801583
return {
15811584
...this.preAggregation,
15821585
tableName: partitionTableName,
@@ -1588,6 +1591,8 @@ export class PreAggregationPartitionRangeLoader {
15881591
this.replacePartitionSqlAndParams(this.preAggregation.sql, loadRange, partitionTableName),
15891592
invalidateKeyQueries: (this.preAggregation.invalidateKeyQueries || [])
15901593
.map(q => this.replacePartitionSqlAndParams(q, range, partitionTableName)),
1594+
partitionInvalidateKeyQueries: this.preAggregation.partitionInvalidateKeyQueries &&
1595+
this.preAggregation.partitionInvalidateKeyQueries.map(q => this.replacePartitionSqlAndParams(q, range, partitionTableName)),
15911596
indexesSql: (this.preAggregation.indexesSql || [])
15921597
.map(q => ({ ...q, sql: this.replacePartitionSqlAndParams(q.sql, range, partitionTableName) })),
15931598
previewSql: this.preAggregation.previewSql &&

packages/cubejs-query-orchestrator/test/unit/QueryOrchestrator.test.js

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1228,7 +1228,6 @@ describe('QueryOrchestrator', () => {
12281228
expect(result.data[0]).not.toMatch(/orders_h2021053000/);
12291229
expect(result.data[0]).toMatch(/orders_h2021053100/);
12301230
expect(result.data[0]).toMatch(/orders_h2021060100_uozkyaur_d004iq51/);
1231-
expect(externalMockDriver.tablesObj.find(t => t.tableName.indexOf('stb_pre_aggregations.orders_h2021060100') !== -1).sealAt).toBe('2021-06-01T00:59:59.999Z');
12321231

12331232
result = await queryOrchestrator.fetchQuery(query(['2021-05-31T00:00:00.000', '2021-05-31T23:59:59.999']));
12341233
console.log(JSON.stringify(result, null, 2));
@@ -1242,6 +1241,44 @@ describe('QueryOrchestrator', () => {
12421241
expect(result.data[0]).toMatch(/orders_h2021060100_uozkyaur_d004iq51/);
12431242
});
12441243

1244+
test('real-time sealing partitions', async () => {
1245+
const query = (matchedTimeDimensionDateRange) => ({
1246+
query: 'SELECT * FROM stb_pre_aggregations.orders_d',
1247+
values: [],
1248+
cacheKeyQueries: {
1249+
queries: []
1250+
},
1251+
preAggregations: [{
1252+
preAggregationsSchema: 'stb_pre_aggregations',
1253+
tableName: 'stb_pre_aggregations.orders_d',
1254+
loadSql: [
1255+
'CREATE TABLE stb_pre_aggregations.orders_d AS SELECT * FROM public.orders WHERE timestamp >= ? AND timestamp <= ?',
1256+
['__FROM_PARTITION_RANGE', '__TO_PARTITION_RANGE']
1257+
],
1258+
invalidateKeyQueries: [['SELECT CASE WHEN NOW() > ? THEN NOW() END as now', ['__TO_PARTITION_RANGE'], {
1259+
renewalThreshold: 1,
1260+
updateWindowSeconds: 86400,
1261+
renewalThresholdOutsideUpdateWindow: 86400,
1262+
incremental: true
1263+
}]],
1264+
partitionInvalidateKeyQueries: [],
1265+
preAggregationStartEndQueries: [
1266+
['SELECT MIN(timestamp) FROM orders', []],
1267+
['SELECT \'2021-05-31\'', []],
1268+
],
1269+
external: true,
1270+
partitionGranularity: 'day',
1271+
timezone: 'UTC',
1272+
matchedTimeDimensionDateRange
1273+
}],
1274+
requestId: 'real-time sealing partitions',
1275+
external: true,
1276+
});
1277+
const result = await queryOrchestrator.fetchQuery(query());
1278+
console.log(JSON.stringify(result, null, 2));
1279+
expect(externalMockDriver.tablesObj.find(t => t.tableName.indexOf('stb_pre_aggregations.orders_d20210531') !== -1).sealAt).toBe('2021-05-31T23:59:59.999Z');
1280+
});
1281+
12451282
test('loadRefreshKeys', async () => {
12461283
const preAggregationsLoadCacheByDataSource = {};
12471284
const preAggregationExternalRefreshKey = {

0 commit comments

Comments
 (0)