Skip to content

Commit 5b12ec8

Browse files
authored
feat(cubestore-driver): Queue - support persistent flag/stream handling (#6046)
1 parent d62079e commit 5b12ec8

File tree

18 files changed

+231
-150
lines changed

18 files changed

+231
-150
lines changed

package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,7 @@
7373
},
7474
"resolutions": {
7575
"@types/node": "^12",
76-
"@types/ramda": "0.27.40",
77-
"rc-tree": "4.1.5"
76+
"@types/ramda": "0.27.40"
7877
},
7978
"license": "MIT",
8079
"packageManager": "[email protected]"

packages/cubejs-backend-shared/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@
4747
"moment-timezone": "^0.5.33",
4848
"node-fetch": "^2.6.1",
4949
"shelljs": "^0.8.5",
50-
"throttle-debounce": "^3.0.1"
50+
"throttle-debounce": "^3.0.1",
51+
"uuid": "^8.3.2"
5152
},
5253
"publishConfig": {
5354
"access": "public"

packages/cubejs-backend-shared/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@ export * from './http-utils';
1717
export * from './cli';
1818
export * from './proxy';
1919
export * from './time';
20+
export * from './process';
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import { v1, v5 } from 'uuid';
2+
3+
/**
4+
* Unique process ID (aka 00000000-0000-0000-0000-000000000000).
5+
*/
6+
const processUid = v5(v1(), v1()).toString();
7+
8+
/**
9+
* Returns unique process ID.
10+
*/
11+
export function getProcessUid(): string {
12+
return processUid;
13+
}

packages/cubejs-base-driver/src/queue-driver.interface.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
export type QueryDef = unknown;
2-
export type QueryKey = string | [string, any[]];
2+
export type QueryKey = (string | [string, any[]]) & {
3+
persistent?: true,
4+
};
35

46
export type AddToQueueResponse = [added: number, _b: any, _c: any, queueSize: number, addedToQueueTime: number];
57
export type QueryStageStateResponse = [active: string[], toProcess: string[]] | [active: string[], toProcess: string[], defs: Record<string, QueryDef>];

packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import crypto from 'crypto';
12
import {
23
QueueDriverInterface,
34
QueueDriverConnectionInterface,
@@ -8,12 +9,18 @@ import {
89
AddToQueueQuery,
910
AddToQueueOptions, AddToQueueResponse, QueryKey,
1011
} from '@cubejs-backend/base-driver';
12+
import { getProcessUid } from '@cubejs-backend/shared';
1113

12-
import crypto from 'crypto';
1314
import { CubeStoreDriver } from './CubeStoreDriver';
1415

1516
function hashQueryKey(queryKey: QueryKey) {
16-
return crypto.createHash('md5').update(JSON.stringify(queryKey)).digest('hex');
17+
const hash = crypto.createHash('md5').update(JSON.stringify(queryKey)).digest('hex');
18+
19+
if (typeof queryKey === 'object' && queryKey.persistent) {
20+
return `${hash}@${getProcessUid()}`;
21+
}
22+
23+
return hash;
1724
}
1825

1926
class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
@@ -237,14 +244,14 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
237244
// nothing to release
238245
}
239246

240-
public async retrieveForProcessing(queryKey: string, _processingId: string): Promise<RetrieveForProcessingResponse> {
247+
public async retrieveForProcessing(queryKeyHashed: string, _processingId: string): Promise<RetrieveForProcessingResponse> {
241248
const rows = await this.driver.query('QUEUE RETRIEVE CONCURRENCY ? ?', [
242249
this.options.concurrency,
243-
this.prefixKey(queryKey),
250+
this.prefixKey(queryKeyHashed),
244251
]);
245252
if (rows && rows.length) {
246253
const addedCount = 1;
247-
const active = [this.redisHash(queryKey)];
254+
const active = [queryKeyHashed];
248255
const toProcess = 0;
249256
const lockAcquired = true;
250257
const def = this.decodeQueryDefFromRow(rows[0], 'retrieveForProcessing');
@@ -272,7 +279,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
272279
public async setResultAndRemoveQuery(queryKey: string, executionResult: any, _processingId: any): Promise<boolean> {
273280
await this.driver.query('QUEUE ACK ? ? ', [
274281
this.prefixKey(queryKey),
275-
JSON.stringify(executionResult)
282+
executionResult ? JSON.stringify(executionResult) : executionResult
276283
]);
277284

278285
return true;

packages/cubejs-query-orchestrator/package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,7 @@
4343
"moment-range": "^4.0.2",
4444
"moment-timezone": "^0.5.33",
4545
"ramda": "^0.27.2",
46-
"redis": "^3.0.2",
47-
"uuid": "^8.3.2"
46+
"redis": "^3.0.2"
4847
},
4948
"devDependencies": {
5049
"@cubejs-backend/linter": "^0.31.0",

packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -477,11 +477,11 @@ export class QueryCache {
477477
if (!persistent) {
478478
return queue.executeInQueue('query', cacheKey, _query, priority, opt);
479479
} else {
480-
const _stream = queue.getQueryStream(cacheKey, aliasNameToMember);
480+
const stream = queue.setQueryStream(cacheKey, aliasNameToMember);
481481
// we don't want to handle error here as we want it to bubble up
482482
// to the api gateway
483483
queue.executeInQueue('stream', cacheKey, _query, priority, opt);
484-
return _stream;
484+
return stream;
485485
}
486486
}
487487

packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js

Lines changed: 44 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
import R from 'ramda';
2-
import { getEnv } from '@cubejs-backend/shared';
2+
import { getEnv, getProcessUid } from '@cubejs-backend/shared';
33
import { QueueDriverInterface } from '@cubejs-backend/base-driver';
4-
import { CubeStoreDriver, CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver';
4+
import { CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver';
55

66
import { TimeoutError } from './TimeoutError';
77
import { ContinueWaitError } from './ContinueWaitError';
88
import { RedisQueueDriver } from './RedisQueueDriver';
99
import { LocalQueueDriver } from './LocalQueueDriver';
10-
import { getProcessUid } from './utils';
1110
import { QueryStream } from './QueryStream';
1211

1312
/**
@@ -140,20 +139,30 @@ export class QueryQueue {
140139
/**
141140
* Returns stream object which will be used to pipe data from data source.
142141
*
142+
* @param {*} queryKeyHash
143+
*/
144+
getQueryStream(queryKeyHash) {
145+
if (!this.streams.queued.has(queryKeyHash)) {
146+
throw new Error(`Unable to find stream for persisted query with id: ${queryKeyHash}`);
147+
}
148+
149+
return this.streams.queued.get(queryKeyHash);
150+
}
151+
152+
/**
143153
* @param {*} queryKey
144154
* @param {{ [alias: string]: string }} aliasNameToMember
145155
*/
146-
getQueryStream(queryKey, aliasNameToMember) {
156+
setQueryStream(queryKey, aliasNameToMember) {
147157
const key = this.redisHash(queryKey);
148-
if (!this.streams.queued.has(key)) {
149-
const _stream = new QueryStream({
150-
key,
151-
maps: this.streams,
152-
aliasNameToMember,
153-
});
154-
this.streams.queued.set(key, _stream);
155-
}
156-
return this.streams.queued.get(key);
158+
const stream = new QueryStream({
159+
key,
160+
maps: this.streams,
161+
aliasNameToMember,
162+
});
163+
this.streams.queued.set(key, stream);
164+
165+
return stream;
157166
}
158167

159168
/**
@@ -559,7 +568,7 @@ export class QueryQueue {
559568
R.pipe(
560569
R.filter(p => {
561570
if (active.indexOf(p) === -1) {
562-
const subKeys = p.split('::');
571+
const subKeys = p.split('@');
563572
if (subKeys.length === 1) {
564573
// common queries
565574
return true;
@@ -729,10 +738,10 @@ export class QueryQueue {
729738
* Processing query specified by the `queryKey`. This method incapsulate most
730739
* of the logic related with the queues updates, heartbeating, etc.
731740
*
732-
* @param {string} queryKey
741+
* @param {string} queryKeyHashed
733742
* @return {Promise<{ result: undefined | Object, error: string | undefined }>}
734743
*/
735-
async processQuery(queryKey) {
744+
async processQuery(queryKeyHashed) {
736745
const queueConnection = await this.queueDriver.createConnection();
737746

738747
let insertedCount;
@@ -743,15 +752,15 @@ export class QueryQueue {
743752
let processingLockAcquired;
744753
try {
745754
const processingId = await queueConnection.getNextProcessingId();
746-
const retrieveResult = await queueConnection.retrieveForProcessing(queryKey, processingId);
755+
const retrieveResult = await queueConnection.retrieveForProcessing(queryKeyHashed, processingId);
747756

748757
if (retrieveResult) {
749758
[insertedCount, _removedCount, activeKeys, queueSize, query, processingLockAcquired] = retrieveResult;
750759
}
751760

752-
const activated = activeKeys && activeKeys.indexOf(this.redisHash(queryKey)) !== -1;
761+
const activated = activeKeys && activeKeys.indexOf(queryKeyHashed) !== -1;
753762
if (!query) {
754-
query = await queueConnection.getQueryDef(this.redisHash(queryKey));
763+
query = await queueConnection.getQueryDef(queryKeyHashed);
755764
}
756765

757766
if (query && insertedCount && activated && processingLockAcquired) {
@@ -771,19 +780,22 @@ export class QueryQueue {
771780
preAggregation: query.query?.preAggregation,
772781
addedToQueueTime: query.addedToQueueTime,
773782
});
774-
await queueConnection.optimisticQueryUpdate(queryKey, { startQueryTime }, processingId);
783+
await queueConnection.optimisticQueryUpdate(queryKeyHashed, { startQueryTime }, processingId);
775784

776785
const heartBeatTimer = setInterval(
777-
() => queueConnection.updateHeartBeat(queryKey),
786+
() => queueConnection.updateHeartBeat(queryKeyHashed),
778787
this.heartBeatInterval * 1000
779788
);
780789
try {
781790
const handler = query?.queryHandler;
782-
let target;
783791
switch (handler) {
784792
case 'stream':
785-
target = this.getQueryStream(this.redisHash(queryKey));
786-
await this.queryTimeout(this.queryHandlers.stream(query.query, target));
793+
await this.queryTimeout(
794+
this.queryHandlers.stream(query.query, this.getQueryStream(queryKeyHashed))
795+
);
796+
797+
// CubeStore has special handling for null
798+
executionResult = null;
787799
break;
788800
default:
789801
executionResult = {
@@ -792,7 +804,7 @@ export class QueryQueue {
792804
query.query,
793805
async (cancelHandler) => {
794806
try {
795-
return queueConnection.optimisticQueryUpdate(queryKey, { cancelHandler }, processingId);
807+
return queueConnection.optimisticQueryUpdate(queryKeyHashed, { cancelHandler }, processingId);
796808
} catch (e) {
797809
this.logger('Error while query update', {
798810
queryKey: query.queryKey,
@@ -848,7 +860,7 @@ export class QueryQueue {
848860
error: (e.stack || e).toString()
849861
});
850862
if (e instanceof TimeoutError) {
851-
const queryWithCancelHandle = await queueConnection.getQueryDef(queryKey);
863+
const queryWithCancelHandle = await queueConnection.getQueryDef(queryKeyHashed);
852864
if (queryWithCancelHandle) {
853865
this.logger('Cancelling query due to timeout', {
854866
processingId,
@@ -868,7 +880,7 @@ export class QueryQueue {
868880

869881
clearInterval(heartBeatTimer);
870882

871-
if (!(await queueConnection.setResultAndRemoveQuery(queryKey, executionResult, processingId))) {
883+
if (!(await queueConnection.setResultAndRemoveQuery(queryKeyHashed, executionResult, processingId))) {
872884
this.logger('Orphaned execution result', {
873885
processingId,
874886
warn: 'Result for query was not set due to processing lock wasn\'t acquired',
@@ -887,7 +899,7 @@ export class QueryQueue {
887899
} else {
888900
this.logger('Skip processing', {
889901
processingId,
890-
queryKey: query && query.queryKey || queryKey,
902+
queryKey: query && query.queryKey || queryKeyHashed,
891903
requestId: query && query.requestId,
892904
queuePrefix: this.redisQueuePrefix,
893905
processingLockAcquired,
@@ -899,15 +911,15 @@ export class QueryQueue {
899911
});
900912
// closing stream
901913
if (query?.queryHandler === 'stream') {
902-
const stream = this.getQueryStream(this.redisHash(queryKey));
914+
const stream = this.getQueryStream(queryKeyHashed);
903915
stream.destroy();
904916
}
905-
const currentProcessingId = await queueConnection.freeProcessingLock(queryKey, processingId, activated);
917+
const currentProcessingId = await queueConnection.freeProcessingLock(queryKeyHashed, processingId, activated);
906918
if (currentProcessingId) {
907919
this.logger('Skipping free processing lock', {
908920
processingId,
909921
currentProcessingId,
910-
queryKey: query && query.queryKey || queryKey,
922+
queryKey: query && query.queryKey || queryKeyHashed,
911923
requestId: query && query.requestId,
912924
queuePrefix: this.redisQueuePrefix,
913925
processingLockAcquired,
@@ -921,7 +933,7 @@ export class QueryQueue {
921933
}
922934
} catch (e) {
923935
this.logger('Queue storage error', {
924-
queryKey: query && query.queryKey || queryKey,
936+
queryKey: query && query.queryKey || queryKeyHashed,
925937
requestId: query && query.requestId,
926938
error: (e.stack || e).toString(),
927939
queuePrefix: this.redisQueuePrefix

packages/cubejs-query-orchestrator/src/orchestrator/utils.ts

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
/* eslint-disable no-restricted-syntax */
22
import * as querystring from 'querystring';
3-
import { v1, v5 } from 'uuid';
43
import crypto from 'crypto';
54

5+
import { getProcessUid } from '@cubejs-backend/shared';
6+
67
function parseHostPort(addr: string): { host: string, port: number } {
78
if (addr.includes(':')) {
89
const parts = addr.split(':');
@@ -178,18 +179,6 @@ export function parseRedisUrl(url: Readonly<string>): RedisParsedResult {
178179
return parseUrl(url, result, parseHostPartBasic);
179180
}
180181

181-
/**
182-
* Unique process ID (aka 00000000-0000-0000-0000-000000000000).
183-
*/
184-
const processUid = v5(v1(), v1()).toString();
185-
186-
/**
187-
* Returns unique process ID.
188-
*/
189-
export function getProcessUid(): string {
190-
return processUid;
191-
}
192-
193182
/**
194183
* Unique process ID regexp.
195184
*/
@@ -208,7 +197,7 @@ export function getCacheHash(queryKey) {
208197
.digest('hex')
209198
}${
210199
typeof queryKey === 'object' && queryKey.persistent
211-
? `::${getProcessUid()}`
200+
? `@${getProcessUid()}`
212201
: ''
213202
}`;
214203
}

0 commit comments

Comments
 (0)