Skip to content

Commit 36180b7

Browse files
KSDaemonmarianore-muttdata
authored andcommitted
fix(schema-compiler): Make RefreshKeySql timezone-aware (cube-js#9479)
* code polish * a bit of types * code polish * grammar fix * code polish * fix(schema-compiler): Make RefreshKeySql timezone-aware * update moment-timezone * fix docs * fix tests
1 parent 635d44a commit 36180b7

File tree

11 files changed

+36
-36
lines changed

11 files changed

+36
-36
lines changed

docs/pages/reference/data-model/cube.mdx

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ When applicable, it will be displayed in [Playground][ref-playground] and expose
358358
to data consumers via [APIs and integrations][ref-apis].
359359

360360
A description can give a hint both to your team and end users, making sure they
361-
interpret the data correctly.
361+
interpret the data correctly.
362362

363363
<CodeTabs>
364364

@@ -478,7 +478,8 @@ cubes:
478478
479479
`every` - can be set as an interval with granularities `second`, `minute`,
480480
`hour`, `day`, and `week` or accept CRON string with some limitations. If you
481-
set `every` as CRON string, you can use the `timezone` parameter.
481+
set `every` as CRON string, you can use the `timezone` parameter. It takes
482+
precedence over the query timezone.
482483

483484
For example:
484485

@@ -643,4 +644,4 @@ The `access_policy` parameter is used to configure [data access policies][ref-re
643644
[ref-ref-joins]: /reference/data-model/joins
644645
[ref-ref-pre-aggs]: /reference/data-model/pre-aggregations
645646
[ref-ref-dap]: /reference/data-model/data-access-policies
646-
[ref-syntax-cube-sql]: /product/data-modeling/syntax#cubesql-function
647+
[ref-syntax-cube-sql]: /product/data-modeling/syntax#cubesql-function
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
export class BaseQueueEventsBus {
22
protected readonly subscribers: Record<string, any> = {};
33

4-
public subscribe(id, callback) {
4+
public subscribe(id: string, callback) {
55
this.subscribers[id] = { id, callback };
66
}
77

8-
public unsubscribe(id) {
8+
public unsubscribe(id: string) {
99
delete this.subscribers[id];
1010
}
1111
}

packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ export class PreAggregations {
350350
}
351351

352352
/**
353-
* Determines whether the partition table is already exists or not.
353+
* Determines whether the partition table already exists or not.
354354
*/
355355
public async isPartitionExist(
356356
request: string,
@@ -512,7 +512,7 @@ export class PreAggregations {
512512
*/
513513
public async checkPartitionsBuildRangeCache(queryBody) {
514514
const preAggregations = queryBody.preAggregations || [];
515-
const result = await Promise.all(
515+
return Promise.all(
516516
preAggregations.map(async (preAggregation) => {
517517
const { preAggregationStartEndQueries } = preAggregation;
518518
const invalidate =
@@ -538,7 +538,6 @@ export class PreAggregations {
538538
};
539539
})
540540
);
541-
return result;
542541
}
543542

544543
public async expandPartitionsInPreAggregations(queryBody: Query): Promise<Query> {

packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import {
99
InlineTables,
1010
CacheDriverInterface,
1111
TableStructure,
12-
DriverInterface, QueryKey,
12+
DriverInterface,
1313
} from '@cubejs-backend/base-driver';
1414

1515
import { QueryQueue } from './QueryQueue';
@@ -208,9 +208,7 @@ export class QueryCache {
208208
.cacheKeyQueriesFrom(queryBody)
209209
.map(replacePreAggregationTableNames);
210210

211-
const renewalThreshold =
212-
queryBody.cacheKeyQueries &&
213-
queryBody.cacheKeyQueries.renewalThreshold;
211+
const renewalThreshold = queryBody.cacheKeyQueries?.renewalThreshold;
214212

215213
const expireSecs = this.getExpireSecs(queryBody);
216214

@@ -350,7 +348,7 @@ export class QueryCache {
350348
}
351349

352350
private cacheKeyQueriesFrom(queryBody: QueryBody): QueryWithParams[] {
353-
return queryBody.cacheKeyQueries && queryBody.cacheKeyQueries.queries ||
351+
return queryBody.cacheKeyQueries?.queries ||
354352
queryBody.cacheKeyQueries ||
355353
[];
356354
}

packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -448,11 +448,11 @@ export class QueryOrchestrator {
448448
return this.preAggregations.cancelQueriesFromQueue(queryKeys, dataSource);
449449
}
450450

451-
public async subscribeQueueEvents(id, callback) {
451+
public async subscribeQueueEvents(id: string, callback) {
452452
return this.getQueueEventsBus().subscribe(id, callback);
453453
}
454454

455-
public async unSubscribeQueueEvents(id) {
455+
public async unSubscribeQueueEvents(id: string) {
456456
return this.getQueueEventsBus().unsubscribe(id);
457457
}
458458

packages/cubejs-schema-compiler/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
"joi": "^17.13.3",
5151
"js-yaml": "^4.1.0",
5252
"lru-cache": "^11.1.0",
53-
"moment-timezone": "^0.5.46",
53+
"moment-timezone": "^0.5.48",
5454
"node-dijkstra": "^2.5.0",
5555
"ramda": "^0.27.2",
5656
"syntax-error": "^1.3.0",

packages/cubejs-schema-compiler/src/adapter/BaseQuery.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3714,7 +3714,7 @@ export class BaseQuery {
37143714

37153715
let utcOffset = 0;
37163716

3717-
if (refreshKey.timezone) {
3717+
if (refreshKey.timezone || this.timezone) {
37183718
utcOffset = moment.tz(refreshKey.timezone).utcOffset() * 60;
37193719
}
37203720

@@ -3733,7 +3733,9 @@ export class BaseQuery {
37333733
const every = refreshKey.every || '1 hour';
37343734

37353735
if (/^(\d+) (second|minute|hour|day|week)s?$/.test(every)) {
3736-
return [this.floorSql(`(${this.unixTimestampSql()}) / ${this.parseSecondDuration(every)}`), external, this];
3736+
const utcOffset = this.timezone ? moment.tz(this.timezone).utcOffset() * 60 : 0;
3737+
const utcOffsetPrefix = utcOffset ? `${utcOffset} + ` : '';
3738+
return [this.floorSql(`(${utcOffsetPrefix}${this.unixTimestampSql()}) / ${this.parseSecondDuration(every)}`), external, this];
37373739
}
37383740

37393741
const { dayOffset, utcOffset, interval } = this.calcIntervalForCronString(refreshKey);

packages/cubejs-schema-compiler/test/integration/mssql/mssql-pre-aggregations.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ describe('MSSqlPreAggregations', () => {
276276

277277
expect(preAggregationsDescription[0].invalidateKeyQueries[0][0].replace(/(\r\n|\n|\r)/gm, '')
278278
.replace(/\s+/g, ' '))
279-
.toMatch('SELECT CASE WHEN CURRENT_TIMESTAMP < DATEADD(day, 7, CAST(@_1 AS DATETIMEOFFSET)) THEN FLOOR((DATEDIFF(SECOND,\'1970-01-01\', GETUTCDATE())) / 3600) END');
279+
.toMatch('SELECT CASE WHEN CURRENT_TIMESTAMP < DATEADD(day, 7, CAST(@_1 AS DATETIMEOFFSET)) THEN FLOOR((-25200 + DATEDIFF(SECOND,\'1970-01-01\', GETUTCDATE())) / 3600) END');
280280

281281
return dbRunner
282282
.evaluateQueryWithPreAggregations(query)

packages/cubejs-schema-compiler/test/unit/base-query.test.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1183,7 +1183,7 @@ describe('SQL Generation', () => {
11831183
const utcOffset = moment.tz('America/Los_Angeles').utcOffset() * 60;
11841184
expect(query.everyRefreshKeySql({
11851185
every: '1 hour'
1186-
})).toEqual(['FLOOR((EXTRACT(EPOCH FROM NOW())) / 3600)', false, expect.any(BaseQuery)]);
1186+
})).toEqual(['FLOOR((-25200 + EXTRACT(EPOCH FROM NOW())) / 3600)', false, expect.any(BaseQuery)]);
11871187

11881188
// Standard syntax (minutes hours day month dow)
11891189
expect(query.everyRefreshKeySql({ every: '0 * * * *', timezone }))
@@ -1290,7 +1290,7 @@ describe('SQL Generation', () => {
12901290
expect(query.cacheKeyQueries()).toEqual([
12911291
[
12921292
// Postgres dialect
1293-
'SELECT FLOOR((EXTRACT(EPOCH FROM NOW())) / 600) as refresh_key',
1293+
'SELECT FLOOR((-25200 + EXTRACT(EPOCH FROM NOW())) / 600) as refresh_key',
12941294
[],
12951295
{
12961296
// false, because there is no externalQueryClass
@@ -1311,15 +1311,15 @@ describe('SQL Generation', () => {
13111311
],
13121312
timeDimensions: [],
13131313
filters: [],
1314-
timezone: 'America/Los_Angeles',
1314+
timezone: 'Europe/London',
13151315
externalQueryClass: MssqlQuery
13161316
});
13171317

13181318
// Query should not match any pre-aggregation!
13191319
expect(query.cacheKeyQueries()).toEqual([
13201320
[
13211321
// MSSQL dialect, because externalQueryClass
1322-
'SELECT FLOOR((DATEDIFF(SECOND,\'1970-01-01\', GETUTCDATE())) / 600) as refresh_key',
1322+
'SELECT FLOOR((3600 + DATEDIFF(SECOND,\'1970-01-01\', GETUTCDATE())) / 600) as refresh_key',
13231323
[],
13241324
{
13251325
// true, because externalQueryClass
@@ -1352,7 +1352,7 @@ describe('SQL Generation', () => {
13521352
expect(preAggregations[0].invalidateKeyQueries).toEqual([
13531353
[
13541354
// MSSQL dialect
1355-
'SELECT FLOOR((DATEDIFF(SECOND,\'1970-01-01\', GETUTCDATE())) / 3600) as refresh_key',
1355+
'SELECT FLOOR((-25200 + DATEDIFF(SECOND,\'1970-01-01\', GETUTCDATE())) / 3600) as refresh_key',
13561356
[],
13571357
{
13581358
external: true,
@@ -1405,15 +1405,15 @@ describe('SQL Generation', () => {
14051405
dateRange: ['2016-12-30', '2017-01-05']
14061406
}],
14071407
filters: [],
1408-
timezone: 'America/Los_Angeles',
1408+
timezone: 'Asia/Tokyo',
14091409
externalQueryClass: MssqlQuery
14101410
});
14111411

14121412
const preAggregations: any = query.newPreAggregations().preAggregationsDescription();
14131413
expect(preAggregations.length).toEqual(1);
14141414
expect(preAggregations[0].invalidateKeyQueries).toEqual([
14151415
[
1416-
'SELECT CASE\n WHEN CURRENT_TIMESTAMP < CAST(@_1 AS DATETIMEOFFSET) THEN FLOOR((DATEDIFF(SECOND,\'1970-01-01\', GETUTCDATE())) / 3600) END as refresh_key',
1416+
'SELECT CASE\n WHEN CURRENT_TIMESTAMP < CAST(@_1 AS DATETIMEOFFSET) THEN FLOOR((32400 + DATEDIFF(SECOND,\'1970-01-01\', GETUTCDATE())) / 3600) END as refresh_key',
14171417
[
14181418
'__TO_PARTITION_RANGE',
14191419
],
@@ -1496,7 +1496,7 @@ describe('SQL Generation', () => {
14961496
expect(preAggregations.length).toEqual(1);
14971497
expect(preAggregations[0].invalidateKeyQueries).toEqual([
14981498
[
1499-
'SELECT FLOOR((EXTRACT(EPOCH FROM NOW())) / 600) as refresh_key',
1499+
'SELECT FLOOR((-25200 + EXTRACT(EPOCH FROM NOW())) / 600) as refresh_key',
15001500
[],
15011501
{
15021502
external: false,
@@ -1527,7 +1527,7 @@ describe('SQL Generation', () => {
15271527
expect(preAggregations.length).toEqual(1);
15281528
expect(preAggregations[0].invalidateKeyQueries).toEqual([
15291529
[
1530-
'SELECT FLOOR((DATEDIFF(SECOND,\'1970-01-01\', GETUTCDATE())) / 600) as refresh_key',
1530+
'SELECT FLOOR((-25200 + DATEDIFF(SECOND,\'1970-01-01\', GETUTCDATE())) / 600) as refresh_key',
15311531
[],
15321532
{
15331533
external: true,

packages/cubejs-server-core/src/core/OrchestratorApi.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ export class OrchestratorApi {
7171
* error otherwise.
7272
*/
7373
public async executeQuery(query: QueryBody) {
74-
const queryForLog = query.query && query.query.replace(/\s+/g, ' ');
74+
const queryForLog = query.query?.replace(/\s+/g, ' ');
7575
const startQueryTime = (new Date()).getTime();
7676

7777
try {
@@ -174,7 +174,7 @@ export class OrchestratorApi {
174174
}
175175

176176
/**
177-
* Tests worker's connections to the Cubstore and, if not in the rollup only
177+
* Tests worker's connections to the Cubestore and, if not in the rollup only
178178
* mode, to the datasources.
179179
*/
180180
public async testConnection() {
@@ -297,11 +297,11 @@ export class OrchestratorApi {
297297
return this.orchestrator.cancelPreAggregationQueriesFromQueue(queryKeys, dataSource);
298298
}
299299

300-
public async subscribeQueueEvents(id, callback) {
300+
public async subscribeQueueEvents(id: string, callback) {
301301
return this.orchestrator.subscribeQueueEvents(id, callback);
302302
}
303303

304-
public async unSubscribeQueueEvents(id) {
304+
public async unSubscribeQueueEvents(id: string) {
305305
return this.orchestrator.unSubscribeQueueEvents(id);
306306
}
307307

0 commit comments

Comments
 (0)