Skip to content

Commit f602fee

Browse files
authored
fix: Pre-aggregation table is not found after it was successfully created for CUBEJS_DROP_PRE_AGG_WITHOUT_TOUCH strategy (#5858)
1 parent 8bd19fa commit f602fee

File tree

2 files changed

+60
-13
lines changed

2 files changed

+60
-13
lines changed

packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,30 @@ import crypto from 'crypto';
22
import R from 'ramda';
33
import {
44
addSecondsToLocalTimestamp,
5+
BUILD_RANGE_END_LOCAL,
6+
BUILD_RANGE_START_LOCAL,
57
extractDate,
68
FROM_PARTITION_RANGE,
79
getEnv,
810
inDbTimeZone,
9-
timeSeries,
1011
MAX_SOURCE_ROW_LIMIT,
12+
timeSeries,
1113
TO_PARTITION_RANGE,
12-
BUILD_RANGE_START_LOCAL,
13-
BUILD_RANGE_END_LOCAL,
1414
utcToLocalTimeZone,
1515
} from '@cubejs-backend/shared';
1616

17-
import { cancelCombinator, SaveCancelFn, DriverInterface, BaseDriver,
17+
import {
18+
BaseDriver,
19+
cancelCombinator,
1820
DownloadTableData,
21+
DriverCapabilities,
22+
DriverInterface,
1923
InlineTable,
24+
SaveCancelFn,
2025
StreamOptions,
2126
UnloadOptions,
22-
DriverCapabilities } from '@cubejs-backend/base-driver';
23-
import {
24-
Query,
25-
QueryCache,
26-
QueryTuple,
27-
QueryWithParams,
28-
QueryBody,
29-
PreAggTableToTempTable,
30-
} from './QueryCache';
27+
} from '@cubejs-backend/base-driver';
28+
import { PreAggTableToTempTable, Query, QueryBody, QueryCache, QueryTuple, QueryWithParams } from './QueryCache';
3129
import { ContinueWaitError } from './ContinueWaitError';
3230
import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory';
3331
import { QueryQueue } from './QueryQueue';
@@ -866,6 +864,7 @@ export class PreAggregationLoader {
866864
}
867865

868866
public refresh(newVersionEntry: VersionEntry, invalidationKeys: InvalidationKeys, client) {
867+
this.updateLastTouch(this.targetTableName(newVersionEntry));
869868
let refreshStrategy = this.refreshStoreInSourceStrategy;
870869
if (this.preAggregation.external) {
871870
const readOnly =

packages/cubejs-query-orchestrator/test/unit/QueryOrchestrator.test.js

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ describe('QueryOrchestrator', () => {
171171
let externalMockDriver = null;
172172
let queryOrchestrator = null;
173173
let queryOrchestratorExternalRefresh = null;
174+
let queryOrchestratorDropWithoutTouch = null;
174175
let testCount = 1;
175176

176177
beforeEach(() => {
@@ -227,6 +228,14 @@ describe('QueryOrchestrator', () => {
227228
externalRefresh: true,
228229
},
229230
});
231+
queryOrchestratorDropWithoutTouch =
232+
new QueryOrchestrator(redisPrefix, driverFactory, logger, {
233+
...options,
234+
preAggregationsOptions: {
235+
...options.preAggregationsOptions,
236+
dropPreAggregationsWithoutTouch: true,
237+
},
238+
});
230239
mockDriver = mockDriverLocal;
231240
fooMockDriver = fooMockDriverLocal;
232241
barMockDriver = barMockDriverLocal;
@@ -238,6 +247,7 @@ describe('QueryOrchestrator', () => {
238247
afterEach(async () => {
239248
await queryOrchestrator.cleanup();
240249
await queryOrchestratorExternalRefresh.cleanup();
250+
await queryOrchestratorDropWithoutTouch.cleanup();
241251
});
242252

243253
test('basic', async () => {
@@ -1300,4 +1310,42 @@ describe('QueryOrchestrator', () => {
13001310

13011311
expect(streamingSourceMockDriver.downloadTableStreamOffset).toBe('earliest');
13021312
});
1313+
1314+
test('drop without touch does not affect tables in progress', async () => {
1315+
const firstQuery = queryOrchestratorDropWithoutTouch.fetchQuery({
1316+
query: 'SELECT * FROM stb_pre_aggregations.orders_delay_d20181102',
1317+
values: [],
1318+
cacheKeyQueries: {
1319+
renewalThreshold: 21600,
1320+
queries: []
1321+
},
1322+
preAggregations: [{
1323+
preAggregationsSchema: 'stb_pre_aggregations',
1324+
tableName: 'stb_pre_aggregations.orders_delay_d20181102',
1325+
loadSql: ['CREATE TABLE stb_pre_aggregations.orders_d20181102 AS SELECT * FROM public.orders_delay', []],
1326+
invalidateKeyQueries: [['SELECT 2', []]]
1327+
}],
1328+
requestId: 'drop without touch does not affect tables in progress'
1329+
});
1330+
const promises = [firstQuery];
1331+
for (let i = 0; i < 10; i++) {
1332+
promises.push(queryOrchestratorDropWithoutTouch.fetchQuery({
1333+
query: `SELECT * FROM stb_pre_aggregations.orders_d201811${i}`,
1334+
values: [],
1335+
cacheKeyQueries: {
1336+
renewalThreshold: 21600,
1337+
queries: []
1338+
},
1339+
preAggregations: [{
1340+
preAggregationsSchema: 'stb_pre_aggregations',
1341+
tableName: `stb_pre_aggregations.orders_d201811${i}`,
1342+
loadSql: [`CREATE TABLE stb_pre_aggregations.orders_d201811${i} AS SELECT * FROM public.orders`, []],
1343+
invalidateKeyQueries: [['SELECT 2', []]]
1344+
}],
1345+
requestId: 'drop without touch does not affect tables in progress'
1346+
}));
1347+
}
1348+
await Promise.all(promises);
1349+
expect(mockDriver.tables).toContainEqual(expect.stringMatching(/orders_delay/));
1350+
});
13031351
});

0 commit comments

Comments
 (0)