Skip to content

Commit 02a8e02

Browse files
authored
feat: persistent queue (#5793)
1 parent ad6c1e8 commit 02a8e02

File tree

12 files changed

+369
-35
lines changed

12 files changed

+369
-35
lines changed

packages/cubejs-api-gateway/src/gateway.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1208,7 +1208,7 @@ class ApiGateway {
12081208
renewQuery: normalizedQuery.renewQuery,
12091209
requestId: context.requestId,
12101210
context,
1211-
persistent: apiType === 'sql',
1211+
persistent: false, // apiType === 'sql',
12121212
}];
12131213
if (normalizedQuery.total) {
12141214
const normalizedTotal = structuredClone(normalizedQuery);

packages/cubejs-query-orchestrator/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@
4141
"moment-range": "^4.0.2",
4242
"moment-timezone": "^0.5.33",
4343
"ramda": "^0.27.2",
44-
"redis": "^3.0.2"
44+
"redis": "^3.0.2",
45+
"uuid": "^8.3.2"
4546
},
4647
"devDependencies": {
4748
"@cubejs-backend/linter": "^0.31.0",
@@ -51,6 +52,7 @@
5152
"@types/node": "^12",
5253
"@types/ramda": "^0.27.32",
5354
"@types/redis": "^2.8.28",
55+
"@types/uuid": "^8.3.0",
5456
"jest": "^26.6.3",
5557
"typescript": "~4.1.5"
5658
},
Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
import crypto from 'crypto';
1+
import { getCacheHash } from './utils';
22

33
export abstract class BaseQueueDriver {
44
public redisHash(queryKey) {
5-
return typeof queryKey === 'string' && queryKey.length < 256 ?
6-
queryKey :
7-
crypto.createHash('md5').update(JSON.stringify(queryKey)).digest('hex');
5+
return getCacheHash(queryKey);
86
}
97
}

packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@ export class LocalQueueDriverConnection {
4949
return res;
5050
}
5151

52+
/**
53+
* Returns promise wich will be resolved with the specified by the
54+
* queryKey query result or null if query was not added to the
55+
* processing.
56+
* @param {*} queryKey
57+
* @returns {Promise<null | *>}
58+
*/
5259
async getResult(queryKey) {
5360
const resultListKey = this.resultListKey(queryKey);
5461
if (this.resultPromises[resultListKey] && this.resultPromises[resultListKey].resolved) {
@@ -66,6 +73,21 @@ export class LocalQueueDriverConnection {
6673
)(queueObj);
6774
}
6875

76+
/**
77+
* Adds specified by the queryKey query to the queue, returns tuple
78+
* with the operation result.
79+
*
80+
* @typedef {[added: number, _b: null, _c: null, toProcessLength: number, addedTime: number]} AddedTuple
81+
*
82+
* @param {number} keyScore
83+
* @param {*} queryKey
84+
* @param {number} orphanedTime
85+
* @param {string} queryHandler (for the regular query is eq to 'query')
86+
* @param {*} query
87+
* @param {number} priority
88+
* @param {*} options
89+
* @returns {AddedTuple}
90+
*/
6991
addToQueue(keyScore, queryKey, orphanedTime, queryHandler, query, priority, options) {
7092
const queryQueueObj = {
7193
queryHandler,
@@ -185,6 +207,11 @@ export class LocalQueueDriverConnection {
185207
return this.queryDef[this.redisHash(queryKey)];
186208
}
187209

210+
/**
211+
* Updates heart beat for the processing query by its `queryKey`.
212+
*
213+
* @param {string} queryKey
214+
*/
188215
updateHeartBeat(queryKey) {
189216
const key = this.redisHash(queryKey);
190217
if (this.heartBeat[key]) {
@@ -245,14 +272,31 @@ export class LocalQueueDriverConnection {
245272
release() {
246273
}
247274

275+
/**
276+
* Returns cache key to the specified by the queryKey query and the
277+
* specified by the suffix query state.
278+
* @param {*} queryKey
279+
* @param {string} suffix
280+
* @returns {string}
281+
*/
248282
queryRedisKey(queryKey, suffix) {
249283
return `${this.redisQueuePrefix}_${this.redisHash(queryKey)}_${suffix}`;
250284
}
251285

286+
/**
287+
* Returns cache key to the cached query result.
288+
* @param {*} queryKey
289+
* @returns {string}
290+
*/
252291
resultListKey(queryKey) {
253292
return this.queryRedisKey(queryKey, 'RESULT');
254293
}
255294

295+
/**
296+
* Returns hash sum of the query specified by the queryKey.
297+
* @param {*} queryKey
298+
* @returns {string}
299+
*/
256300
redisHash(queryKey) {
257301
return this.driver.redisHash(queryKey);
258302
}

packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -704,7 +704,16 @@ export class PreAggregationLoader {
704704
if (this.isJob) {
705705
// We don't want to wait for the jobed build query result. So we run the
706706
// executeInQueue method and immediately return the LoadPreAggregationResult object.
707-
this.executeInQueue(invalidationKeys, this.priority(10), newVersionEntry);
707+
this
708+
.executeInQueue(invalidationKeys, this.priority(10), newVersionEntry)
709+
.catch((e: any) => {
710+
this.logger('Pre-aggregations build job error', {
711+
preAggregation: this.preAggregation,
712+
requestId: this.requestId,
713+
newVersionEntry,
714+
error: (e.stack || e),
715+
});
716+
});
708717
const targetTableName = this.targetTableName(newVersionEntry);
709718
this.updateLastTouch(targetTableName);
710719
return {

packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import crypto from 'crypto';
21
import csvWriter from 'csv-write-stream';
32
import LRUCache from 'lru-cache';
43
import { MaybeCancelablePromise, streamToArray } from '@cubejs-backend/shared';
@@ -11,6 +10,7 @@ import { LocalCacheDriver } from './LocalCacheDriver';
1110
import { CacheDriverInterface } from './cache-driver.interface';
1211
import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory';
1312
import { PreAggregationDescription } from './PreAggregations';
13+
import { getCacheHash } from './utils';
1414

1515
type QueryOptions = {
1616
external?: boolean;
@@ -335,6 +335,8 @@ export class QueryCache {
335335
if (queryBody.invalidate) {
336336
key.push(queryBody.invalidate);
337337
}
338+
// @ts-ignore
339+
key.persistent = queryBody.persistent;
338340
return <CacheKey>key;
339341
}
340342

@@ -387,7 +389,6 @@ export class QueryCache {
387389
useCsvQuery?: boolean,
388390
}
389391
) {
390-
persistent = persistent || false;
391392
const queue = external
392393
? this.getExternalQueue()
393394
: await this.getQueue(dataSource);
@@ -399,14 +400,14 @@ export class QueryCache {
399400
query,
400401
values,
401402
requestId,
402-
persistent,
403403
inlineTables,
404404
useCsvQuery,
405405
},
406406
priority,
407407
{
408408
stageQueryKey: cacheKey,
409409
requestId,
410+
persistent,
410411
}
411412
);
412413
}
@@ -840,7 +841,7 @@ export class QueryCache {
840841
}
841842

842843
public queryRedisKey(cacheKey) {
843-
return `SQL_QUERY_RESULT_${this.redisPrefix}_${crypto.createHash('md5').update(JSON.stringify(cacheKey)).digest('hex')}`;
844+
return `SQL_QUERY_RESULT_${this.redisPrefix}_${getCacheHash(cacheKey)}`;
844845
}
845846

846847
public async cleanup() {

0 commit comments

Comments
 (0)