Skip to content

Commit b5701e3

Browse files
feat(server-core,api-gateway): Allow manual rebuilding pre-aggregation partitions within date Range (#9342)
* spelling * add dateRange to PreAggsJobsRequest * pass dateRange to preAggregations(filter) * a bit of code polishment * add preAggsJobsRequestSchema validator * fix PreAggsJobsRequest types * specify type for parseLocalDate() * refresh only matched partitions * add html reporters * some code polishment * fix/add some tests to dateParser() * code polishment * add another check * add tests for api-gw preagg jobs endpoint * align refactor some types * code polish * refresh only matched partitions fix * code polish * add tests for pre-agg jobs * jest reporters * add tests for refreshScheduler.getCachedBuildJobs() * improve caching in pre-agg-load-cache * increase delay in tests to pass * use asyncDebounce for caching in pre-agg-load-cache * Revert "use asyncDebounce for caching in pre-agg-load-cache" This reverts commit a353cf1. * Revert "improve caching in pre-agg-load-cache" This reverts commit f8f640c. * set coverageReporters * add docs * fix regexp * fix dateParser + tests * fix preAggsJobsRequestSchema validator * fix error message Co-authored-by: Igor Lukanin <[email protected]> * simplify regexp in dateParser * tests polish * little fix --------- Co-authored-by: Igor Lukanin <[email protected]>
1 parent f354570 commit b5701e3

File tree

23 files changed

+541
-159
lines changed

23 files changed

+541
-159
lines changed

docs/pages/product/apis-integrations/rest-api/reference.mdx

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ Trigger pre-aggregation build jobs or retrieve statuses of such jobs.
273273
| `selector.datasources` | Array of data source names which have pre-aggregations defined ||
274274
| `selector.cubes` | Array of cube names which contain pre-aggregations ||
275275
| `selector.preAggregations` | Array of pre-aggregation names ||
276+
| `selector.dateRange` | Date Range tuple ['range-date-start', 'range-date-end'] ||
276277

277278
To trigger pre-aggregation builds, send a `POST` request with a payload
278279
including `post` as the `action` and `selector` properties. The response will
@@ -334,6 +335,27 @@ curl \
334335
https://localhost:4000/cubejs-api/v1/pre-aggregations/jobs
335336
```
336337

338+
Example request triggering builds of the `main` pre-aggregation defined in the
339+
`orders` cube within date range with some security context data
340+
and an `America/Los_Angeles` timezone:
341+
342+
```bash{outputLines: 2-13}
343+
curl \
344+
-d '{
345+
"action": "post",
346+
"selector": {
347+
"contexts": [{ "securityContext": { "tenantId": "tenant1" } }],
348+
"timezones": ["America/Los_Angeles"],
349+
"preAggregations": ["orders.main"],
350+
"dateRange": ["2020-01-01", "2020-02-01"]
351+
}
352+
}' \
353+
-H "Authorization: EXAMPLE-API-TOKEN" \
354+
-H "Content-Type: application/json" \
355+
-X POST \
356+
https://localhost:4000/cubejs-api/v1/pre-aggregations/jobs
357+
```
358+
337359
Example response:
338360

339361
```json

packages/cubejs-api-gateway/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
"jest": {
7474
"testEnvironment": "node",
7575
"collectCoverage": false,
76+
"coverageReporters": ["text", "html"],
7677
"coverageDirectory": "coverage/",
7778
"collectCoverageFrom": [
7879
"dist/src/**/*.js",

packages/cubejs-api-gateway/src/dateParser.js

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,18 +65,19 @@ export function dateParser(dateString, timezone, now = new Date()) {
6565
moment.tz(timezone).endOf('day').add(1, 'day')
6666
];
6767
} else if (dateString.match(/^from (.*) to (.*)$/)) {
68-
// eslint-disable-next-line no-unused-vars,@typescript-eslint/no-unused-vars
69-
const [all, from, to] = dateString.match(/^from (.*) to (.*)$/);
68+
let [, from, to] = dateString.match(/^from(.{0,50})to(.{0,50})$/);
69+
from = from.trim();
70+
to = to.trim();
7071

7172
const current = moment(now).tz(timezone);
72-
const fromResults = parse(from, new Date(current.format(moment.HTML5_FMT.DATETIME_LOCAL_MS)));
73-
const toResults = parse(to, new Date(current.format(moment.HTML5_FMT.DATETIME_LOCAL_MS)));
73+
const fromResults = parse(from.trim(), new Date(current.format(moment.HTML5_FMT.DATETIME_LOCAL_MS)));
74+
const toResults = parse(to.trim(), new Date(current.format(moment.HTML5_FMT.DATETIME_LOCAL_MS)));
7475

7576
if (!Array.isArray(fromResults) || !fromResults.length) {
7677
throw new UserError(`Can't parse date: '${from}'`);
7778
}
7879

79-
if (!Array.isArray(fromResults) || !fromResults.length) {
80+
if (!Array.isArray(toResults) || !toResults.length) {
8081
throw new UserError(`Can't parse date: '${to}'`);
8182
}
8283

@@ -88,8 +89,10 @@ export function dateParser(dateString, timezone, now = new Date()) {
8889

8990
momentRange = [momentRange[0].startOf(exactGranularity), momentRange[1].endOf(exactGranularity)];
9091
} else {
91-
const results = parse(dateString, new Date(moment().tz(timezone).format(moment.HTML5_FMT.DATETIME_LOCAL_MS)));
92-
if (!results || !results.length) {
92+
const current = moment(now).tz(timezone);
93+
const results = parse(dateString, new Date(current.format(moment.HTML5_FMT.DATETIME_LOCAL_MS)));
94+
95+
if (!results?.length) {
9396
throw new UserError(`Can't parse date: '${dateString}'`);
9497
}
9598

packages/cubejs-api-gateway/src/gateway.ts

Lines changed: 52 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import structuredClone from '@ungap/structured-clone';
88
import {
99
getEnv,
1010
getRealType,
11+
parseLocalDate,
1112
QueryAlias,
1213
} from '@cubejs-backend/shared';
1314
import {
@@ -81,7 +82,9 @@ import {
8182
normalizeQuery,
8283
normalizeQueryCancelPreAggregations,
8384
normalizeQueryPreAggregationPreview,
84-
normalizeQueryPreAggregations, remapToQueryAdapterFormat,
85+
normalizeQueryPreAggregations,
86+
preAggsJobsRequestSchema,
87+
remapToQueryAdapterFormat,
8588
} from './query';
8689
import { cachedHandler } from './cached-handler';
8790
import { createJWKsFetcher } from './jwk';
@@ -770,8 +773,8 @@ class ApiGateway {
770773
preAggregations: [{ id: preAggregationId }]
771774
}
772775
);
773-
const { partitions } = (preAggregationPartitions && preAggregationPartitions[0] || {});
774-
const preAggregationPartition = partitions && partitions.find(p => p?.tableName === versionEntry.table_name);
776+
const { partitions } = (preAggregationPartitions?.[0] || {});
777+
const preAggregationPartition = partitions?.find(p => p?.tableName === versionEntry.table_name);
775778

776779
res({
777780
preview: preAggregationPartition && await orchestratorApi.getPreAggregationPreview(
@@ -845,7 +848,6 @@ class ApiGateway {
845848
* ]
846849
* }
847850
* ```
848-
* TODO (buntarb): selector object validator.
849851
*/
850852
private async preAggregationsJobs(req: Request, res: ExpressResponse) {
851853
const response = this.resToResultFn(res);
@@ -855,49 +857,26 @@ class ApiGateway {
855857
let result;
856858
try {
857859
await this.assertApiScope('jobs', req?.context?.securityContext);
860+
861+
if (!query || Object.keys(query).length === 0) {
862+
throw new UserError('No job description provided');
863+
}
864+
865+
const { error } = preAggsJobsRequestSchema.validate(query);
866+
if (error) {
867+
throw new UserError(`Invalid Job query format: ${error.message || error.toString()}`);
868+
}
869+
858870
switch (query.action) {
859871
case 'post':
860-
if (
861-
!(<PreAggsSelector>query.selector).timezones ||
862-
(<PreAggsSelector>query.selector).timezones.length === 0
863-
) {
864-
throw new UserError(
865-
'A user\'s selector must contain at least one time zone.'
866-
);
867-
}
868-
if (
869-
!(<PreAggsSelector>query.selector).contexts ||
870-
(
871-
<{securityContext: any}[]>(
872-
<PreAggsSelector>query.selector
873-
).contexts
874-
).length === 0
875-
) {
876-
throw new UserError(
877-
'A user\'s selector must contain at least one context element.'
878-
);
879-
} else {
880-
let e = false;
881-
(<{securityContext: any}[]>(
882-
<PreAggsSelector>query.selector
883-
).contexts).forEach((c) => {
884-
if (!c.securityContext) e = true;
885-
});
886-
if (e) {
887-
throw new UserError(
888-
'Every context element must contain the ' +
889-
'\'securityContext\' property.'
890-
);
891-
}
892-
}
893872
result = await this.preAggregationsJobsPOST(
894873
context,
895874
<PreAggsSelector>query.selector
896875
);
897876
if (result.length === 0) {
898877
throw new UserError(
899878
'A user\'s selector doesn\'t match any of the ' +
900-
'pre-aggregations described by the Cube schemas.'
879+
'pre-aggregations defined in the data model.'
901880
);
902881
}
903882
break;
@@ -928,30 +907,38 @@ class ApiGateway {
928907
selector: PreAggsSelector,
929908
): Promise<string[]> {
930909
let jobs: string[] = [];
931-
if (!selector.contexts?.length) {
932-
jobs = await this.postPreAggregationsBuildJobs(
933-
context,
934-
selector,
935-
);
936-
} else {
937-
const promise = Promise.all(
938-
selector.contexts.map(async (config) => {
939-
const ctx = <RequestContext>{
940-
...context,
941-
...config,
942-
};
943-
const _jobs = await this.postPreAggregationsBuildJobs(
944-
ctx,
945-
selector,
946-
);
947-
return _jobs;
948-
})
949-
);
950-
const resolve = await promise;
951-
resolve.forEach((_jobs) => {
952-
jobs = jobs.concat(_jobs);
953-
});
910+
911+
// There might be a few contexts but dateRange if present is still the same
912+
// so let's normalize it only once.
913+
// It's expected that selector.dateRange is provided in local time (without timezone)
914+
// At the same time it is ok to get timestamps with `Z` (in UTC).
915+
if (selector.dateRange) {
916+
const start = parseLocalDate([{ val: selector.dateRange[0] }], 'UTC');
917+
const end = parseLocalDate([{ val: selector.dateRange[1] }], 'UTC');
918+
if (!start || !end) {
919+
throw new UserError(`Cannot parse selector date range ${selector.dateRange}`);
920+
}
921+
selector.dateRange = [start, end];
954922
}
923+
924+
const promise = Promise.all(
925+
selector.contexts.map(async (config) => {
926+
const ctx = <RequestContext>{
927+
...context,
928+
...config,
929+
};
930+
const _jobs = await this.postPreAggregationsBuildJobs(
931+
ctx,
932+
selector,
933+
);
934+
return _jobs;
935+
})
936+
);
937+
const resolve = await promise;
938+
resolve.forEach((_jobs) => {
939+
jobs = jobs.concat(_jobs);
940+
});
941+
955942
return jobs;
956943
}
957944

@@ -963,7 +950,7 @@ class ApiGateway {
963950
selector: PreAggsSelector
964951
): Promise<string[]> {
965952
const compiler = await this.getCompilerApi(context);
966-
const { timezones } = selector;
953+
const { timezones, dateRange } = selector;
967954
const preaggs = await compiler.preAggregations({
968955
dataSources: selector.dataSources,
969956
cubes: selector.cubes,
@@ -979,12 +966,13 @@ class ApiGateway {
979966
{
980967
metadata: undefined,
981968
timezones,
969+
dateRange,
982970
preAggregations: preaggs.map(p => ({
983971
id: p.id,
984-
cacheOnly: undefined, // boolean
972+
cacheOnly: false,
985973
partitions: undefined, // string[]
986974
})),
987-
forceBuildPreAggregations: undefined,
975+
forceBuildPreAggregations: false,
988976
throwErrors: false,
989977
}
990978
);

packages/cubejs-api-gateway/src/query.js

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@ import { getEnv } from '@cubejs-backend/shared';
66
import { UserError } from './UserError';
77
import { dateParser } from './dateParser';
88
import { QueryType } from './types/enums';
9+
import { PreAggsJobsRequest } from "./types/request";
910

1011
const getQueryGranularity = (queries) => R.pipe(
11-
R.map(({ timeDimensions }) => timeDimensions[0] && timeDimensions[0].granularity || null),
12+
R.map(({ timeDimensions }) => timeDimensions[0]?.granularity),
1213
R.filter(Boolean),
1314
R.uniq
1415
)(queries);
@@ -145,6 +146,36 @@ const normalizeQueryOrder = order => {
145146
return result;
146147
};
147148

149+
export const preAggsJobsRequestSchema = Joi.object({
150+
action: Joi.string().valid('post', 'get').required(),
151+
selector: Joi.when('action', {
152+
is: 'post',
153+
then: Joi.object({
154+
contexts: Joi.array().items(
155+
Joi.object({
156+
securityContext: Joi.required(),
157+
})
158+
).min(1).required(),
159+
timezones: Joi.array().items(Joi.string()).min(1).required(),
160+
dataSources: Joi.array().items(Joi.string()),
161+
cubes: Joi.array().items(Joi.string()),
162+
preAggregations: Joi.array().items(Joi.string()),
163+
dateRange: Joi.array().length(2).items(Joi.string()),
164+
}).optional(),
165+
otherwise: Joi.forbidden(),
166+
}),
167+
tokens: Joi.when('action', {
168+
is: 'get',
169+
then: Joi.array().items(Joi.string()).min(1).required(),
170+
otherwise: Joi.forbidden(),
171+
}),
172+
resType: Joi.when('action', {
173+
is: 'get',
174+
then: Joi.string().valid('object').optional(),
175+
otherwise: Joi.forbidden(),
176+
}),
177+
});
178+
148179
const DateRegex = /^\d\d\d\d-\d\d-\d\d$/;
149180

150181
const normalizeQueryFilters = (filter) => (
@@ -196,9 +227,9 @@ const normalizeQuery = (query, persistent) => {
196227
if (error) {
197228
throw new UserError(`Invalid query format: ${error.message || error.toString()}`);
198229
}
199-
const validQuery = query.measures && query.measures.length ||
200-
query.dimensions && query.dimensions.length ||
201-
query.timeDimensions && query.timeDimensions.filter(td => !!td.granularity).length;
230+
const validQuery = query.measures?.length ||
231+
query.dimensions?.length ||
232+
query.timeDimensions?.filter(td => !!td.granularity).length;
202233
if (!validQuery) {
203234
throw new UserError(
204235
'Query should contain either measures, dimensions or timeDimensions with granularities in order to be valid'

packages/cubejs-api-gateway/src/types/request.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ type ResponseResultFn =
108108
(
109109
message: (Record<string, any> | Record<string, any>[]) | DataResult | ErrorResponse,
110110
extra?: { status: number }
111-
) => void;
111+
) => void | Promise<void>;
112112

113113
/**
114114
* Base HTTP request parameters map data type.
@@ -148,11 +148,12 @@ type SqlApiRequest = BaseRequest & {
148148
* Pre-aggregations selector object.
149149
*/
150150
type PreAggsSelector = {
151-
contexts?: {securityContext: any}[],
151+
contexts: {securityContext: any}[],
152152
timezones: string[],
153153
dataSources?: string[],
154154
cubes?: string[],
155155
preAggregations?: string[],
156+
dateRange?: [string, string], // We expect only single date Range for rebuilding
156157
};
157158

158159
/**
@@ -177,7 +178,7 @@ type PreAggJob = {
177178
* The `/cubejs-system/v1/pre-aggregations/jobs` endpoint object type.
178179
*/
179180
type PreAggsJobsRequest = {
180-
action: 'post' | 'get' | 'delete',
181+
action: 'post' | 'get',
181182
selector?: PreAggsSelector,
182183
tokens?: string[]
183184
resType?: 'object' | 'array'

packages/cubejs-api-gateway/test/dateParser.test.js

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,4 +168,28 @@ describe('dateParser', () => {
168168

169169
Date.now.mockRestore();
170170
});
171+
172+
test('throws error on from invalid date to date', () => {
173+
expect(() => dateParser('from invalid to 2020-02-02', 'UTC')).toThrow(
174+
'Can\'t parse date: \'invalid\''
175+
);
176+
});
177+
178+
test('throws error on from date to invalid date', () => {
179+
expect(() => dateParser('from 2020-02-02 to invalid', 'UTC')).toThrow(
180+
'Can\'t parse date: \'invalid\''
181+
);
182+
});
183+
184+
test('from 12AM till now by hour', () => {
185+
Date.now = jest.fn().mockReturnValue(new Date(2021, 2, 5, 13, 0, 0, 0));
186+
expect(dateParser('2 weeks ago by hour', 'UTC', new Date(Date.UTC(2021, 2, 5, 13, 0, 0, 0)))).toStrictEqual(
187+
[
188+
'2021-02-19T13:00:00.000',
189+
'2021-02-19T13:59:59.999'
190+
]
191+
);
192+
193+
Date.now.mockRestore();
194+
});
171195
});

0 commit comments

Comments
 (0)