Skip to content

Commit 03089ce

Browse files
authored
fix(cubestore-driver): Correct cancellation handling (#6087)
1 parent 22d0ad9 commit 03089ce

File tree

11 files changed

+119
-67
lines changed

11 files changed

+119
-67
lines changed

packages/cubejs-base-driver/src/queue-driver.interface.ts

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@ export type QueryDef = unknown;
22
export type QueryKey = (string | [string, any[]]) & {
33
persistent?: true,
44
};
5+
export interface QueryKeyHash extends String {
6+
__type: 'QueryKeyHash'
7+
}
58

69
export type AddToQueueResponse = [added: number, _b: any, _c: any, queueSize: number, addedToQueueTime: number];
710
export type QueryStageStateResponse = [active: string[], toProcess: string[]] | [active: string[], toProcess: string[], defs: Record<string, QueryDef>];
8-
export type RetrieveForProcessingResponse = [added: any, removed: any, active: string[], toProcess: any, def: QueryDef, lockAquired: boolean] | null;
11+
export type RetrieveForProcessingResponse = [added: any, removed: any, active: QueryKeyHash[], toProcess: any, def: QueryDef, lockAquired: boolean] | null;
912

1013
export interface AddToQueueQuery {
1114
isJob: boolean,
@@ -27,37 +30,37 @@ export interface QueueDriverOptions {
2730
}
2831

2932
export interface QueueDriverConnectionInterface {
30-
redisHash(queryKey: QueryKey): string;
33+
redisHash(queryKey: QueryKey): QueryKeyHash;
3134
getResultBlocking(queryKey: QueryKey): Promise<unknown>;
3235
getResult(queryKey: QueryKey): Promise<any>;
3336
addToQueue(keyScore: number, queryKey: QueryKey, orphanedTime: any, queryHandler: any, query: AddToQueueQuery, priority: number, options: AddToQueueOptions): Promise<AddToQueueResponse>;
3437
// Return query keys which was sorted by priority and time
3538
getToProcessQueries(): Promise<string[]>;
3639
getActiveQueries(): Promise<string[]>;
37-
getQueryDef(queryKey: QueryKey): Promise<QueryDef | null>;
40+
getQueryDef(queryKey: QueryKeyHash): Promise<QueryDef | null>;
3841
// Queries which was added to queue, but was not processed and not needed
3942
getOrphanedQueries(): Promise<string[]>;
4043
// Queries which was not completed with old heartbeat
4144
getStalledQueries(): Promise<string[]>;
4245
getQueryStageState(onlyKeys: boolean): Promise<QueryStageStateResponse>;
43-
updateHeartBeat(queryKey: QueryKey): Promise<void>;
46+
updateHeartBeat(hash: QueryKeyHash): Promise<void>;
4447
getNextProcessingId(): Promise<string | number>;
4548
// Trying to acquire a lock for processing a queue item, this method can return null when
4649
// multiple nodes tries to process the same query
47-
retrieveForProcessing(queryKey: QueryKey, processingId: number | string): Promise<RetrieveForProcessingResponse>;
48-
freeProcessingLock(queryKey: QueryKey, processingId: string | number, activated: unknown): Promise<void>;
49-
optimisticQueryUpdate(queryKey: QueryKey, toUpdate, processingId): Promise<boolean>;
50+
retrieveForProcessing(hash: QueryKeyHash, processingId: number | string): Promise<RetrieveForProcessingResponse>;
51+
freeProcessingLock(hash: QueryKeyHash, processingId: string | number, activated: unknown): Promise<void>;
52+
optimisticQueryUpdate(hash: QueryKeyHash, toUpdate, processingId): Promise<boolean>;
5053
cancelQuery(queryKey: QueryKey): Promise<QueryDef | null>;
51-
getQueryAndRemove(queryKey: QueryKey): Promise<[QueryDef]>;
52-
setResultAndRemoveQuery(queryKey: QueryKey, executionResult: any, processingId: any): Promise<unknown>;
54+
getQueryAndRemove(hash: QueryKeyHash): Promise<[QueryDef]>;
55+
setResultAndRemoveQuery(hash: QueryKeyHash, executionResult: any, processingId: any): Promise<unknown>;
5356
release(): void;
5457
//
5558
getQueriesToCancel(): Promise<string[]>
5659
getActiveAndToProcess(): Promise<[active: string[], toProcess: string[]]>;
5760
}
5861

5962
export interface QueueDriverInterface {
60-
redisHash(queryKey: QueryKey): string;
63+
redisHash(queryKey: QueryKey): QueryKeyHash;
6164
createConnection(): Promise<QueueDriverConnectionInterface>;
6265
release(connection: QueueDriverConnectionInterface): void;
6366
}

packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,23 @@ import {
77
RetrieveForProcessingResponse,
88
QueueDriverOptions,
99
AddToQueueQuery,
10-
AddToQueueOptions, AddToQueueResponse, QueryKey,
10+
AddToQueueOptions,
11+
AddToQueueResponse,
12+
QueryKey,
13+
QueryKeyHash
1114
} from '@cubejs-backend/base-driver';
1215
import { getProcessUid } from '@cubejs-backend/shared';
1316

1417
import { CubeStoreDriver } from './CubeStoreDriver';
1518

16-
function hashQueryKey(queryKey: QueryKey) {
19+
function hashQueryKey(queryKey: QueryKey): QueryKeyHash {
1720
const hash = crypto.createHash('md5').update(JSON.stringify(queryKey)).digest('hex');
1821

1922
if (typeof queryKey === 'object' && queryKey.persistent) {
20-
return `${hash}@${getProcessUid()}`;
23+
return `${hash}@${getProcessUid()}` as any;
2124
}
2225

23-
return hash;
26+
return hash as any;
2427
}
2528

2629
class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
@@ -29,7 +32,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
2932
protected readonly options: QueueDriverOptions,
3033
) { }
3134

32-
public redisHash(queryKey: QueryKey): string {
35+
public redisHash(queryKey: QueryKey): QueryKeyHash {
3336
return hashQueryKey(queryKey);
3437
}
3538

@@ -75,13 +78,13 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
7578
}
7679

7780
// TODO: Looks useless, because we can do it in one step - getQueriesToCancel
78-
public async getQueryAndRemove(queryKey: string): Promise<[QueryDef]> {
79-
return [await this.cancelQuery(queryKey)];
81+
public async getQueryAndRemove(hash: QueryKeyHash): Promise<[QueryDef]> {
82+
return [await this.cancelQuery(hash)];
8083
}
8184

82-
public async cancelQuery(queryKey: string): Promise<QueryDef | null> {
85+
public async cancelQuery(hash: QueryKeyHash): Promise<QueryDef | null> {
8386
const rows = await this.driver.query('QUEUE CANCEL ?', [
84-
this.prefixKey(queryKey)
87+
this.prefixKey(hash)
8588
]);
8689
if (rows && rows.length) {
8790
return this.decodeQueryDefFromRow(rows[0], 'cancelQuery');
@@ -90,7 +93,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
9093
return null;
9194
}
9295

93-
public async freeProcessingLock(_queryKey: string, _processingId: string, _activated: unknown): Promise<void> {
96+
public async freeProcessingLock(_hash: QueryKeyHash, _processingId: string, _activated: unknown): Promise<void> {
9497
// nothing to do
9598
}
9699

@@ -170,7 +173,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
170173
return [active, toProcess, defs];
171174
}
172175

173-
public async getResult(queryKey: string): Promise<unknown> {
176+
public async getResult(queryKey: QueryKey): Promise<unknown> {
174177
const rows = await this.driver.query('QUEUE RESULT ?', [
175178
this.prefixKey(this.redisHash(queryKey)),
176179
]);
@@ -220,9 +223,9 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
220223
return payload;
221224
}
222225

223-
public async getQueryDef(queryKey: string): Promise<QueryDef | null> {
226+
public async getQueryDef(queryKey: QueryKeyHash): Promise<QueryDef | null> {
224227
const rows = await this.driver.query('QUEUE GET ?', [
225-
this.prefixKey(this.redisHash(queryKey))
228+
this.prefixKey(queryKey)
226229
]);
227230
if (rows && rows.length) {
228231
return this.decodeQueryDefFromRow(rows[0], 'getQueryDef');
@@ -244,7 +247,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
244247
// nothing to release
245248
}
246249

247-
public async retrieveForProcessing(queryKeyHashed: string, _processingId: string): Promise<RetrieveForProcessingResponse> {
250+
public async retrieveForProcessing(queryKeyHashed: QueryKeyHash, _processingId: string): Promise<RetrieveForProcessingResponse> {
248251
const rows = await this.driver.query('QUEUE RETRIEVE CONCURRENCY ? ?', [
249252
this.options.concurrency,
250253
this.prefixKey(queryKeyHashed),
@@ -276,18 +279,18 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
276279
return null;
277280
}
278281

279-
public async setResultAndRemoveQuery(queryKey: string, executionResult: any, _processingId: any): Promise<boolean> {
282+
public async setResultAndRemoveQuery(hash: QueryKeyHash, executionResult: any, _processingId: any): Promise<boolean> {
280283
await this.driver.query('QUEUE ACK ? ? ', [
281-
this.prefixKey(queryKey),
284+
this.prefixKey(hash),
282285
executionResult ? JSON.stringify(executionResult) : executionResult
283286
]);
284287

285288
return true;
286289
}
287290

288-
public async updateHeartBeat(queryKey: string): Promise<void> {
291+
public async updateHeartBeat(hash: QueryKeyHash): Promise<void> {
289292
await this.driver.query('QUEUE HEARTBEAT ?', [
290-
this.prefixKey(queryKey)
293+
this.prefixKey(hash)
291294
]);
292295
}
293296
}
@@ -300,7 +303,7 @@ export class CubeStoreQueueDriver implements QueueDriverInterface {
300303

301304
protected connection: CubeStoreDriver | null = null;
302305

303-
public redisHash(queryKey: QueryKey) {
306+
public redisHash(queryKey: QueryKey): QueryKeyHash {
304307
return hashQueryKey(queryKey);
305308
}
306309

packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueDriver.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
1-
import { QueueDriverConnectionInterface, QueueDriverInterface } from '@cubejs-backend/base-driver';
1+
import {
2+
QueryKey,
3+
QueryKeyHash,
4+
QueueDriverConnectionInterface,
5+
QueueDriverInterface,
6+
} from '@cubejs-backend/base-driver';
27
import { getCacheHash } from './utils';
38

49
export abstract class BaseQueueDriver implements QueueDriverInterface {
5-
public redisHash(queryKey: string) {
10+
public redisHash(queryKey: QueryKey): QueryKeyHash {
611
return getCacheHash(queryKey);
712
}
813

packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ export class LocalQueueDriverConnection {
7878
if (this.resultPromises[resultListKey] && this.resultPromises[resultListKey].resolved) {
7979
return this.getResultBlocking(queryKey);
8080
}
81+
8182
return null;
8283
}
8384

@@ -221,7 +222,7 @@ export class LocalQueueDriverConnection {
221222
}
222223

223224
async getQueryDef(queryKey) {
224-
return this.queryDef[this.redisHash(queryKey)];
225+
return this.queryDef[queryKey];
225226
}
226227

227228
/**

packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -960,7 +960,7 @@ export class QueryCache {
960960
}
961961

962962
public queryRedisKey(cacheKey): string {
963-
return this.getKey('SQL_QUERY_RESULT', getCacheHash(cacheKey));
963+
return this.getKey('SQL_QUERY_RESULT', getCacheHash(cacheKey) as any);
964964
}
965965

966966
public async cleanup() {

packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ export class QueryQueue {
318318
if (priority == null) {
319319
priority = 0;
320320
}
321+
321322
if (!(priority >= -10000 && priority <= 10000)) {
322323
throw new Error('Priority should be between -10000 and 10000');
323324
}
@@ -330,8 +331,10 @@ export class QueryQueue {
330331
return this.parseResult(result);
331332
}
332333

334+
const queryKeyHash = this.redisHash(queryKey);
335+
333336
if (query.forceBuild) {
334-
const jobExists = await queueConnection.getQueryDef(queryKey);
337+
const jobExists = await queueConnection.getQueryDef(queryKeyHash);
335338
if (jobExists) return null;
336339
}
337340

@@ -363,7 +366,7 @@ export class QueryQueue {
363366

364367
await this.reconcileQueue();
365368

366-
const queryDef = await queueConnection.getQueryDef(queryKey);
369+
const queryDef = await queueConnection.getQueryDef(queryKeyHash);
367370
const [active, toProcess] = await queueConnection.getQueryStageState(true);
368371

369372
if (queryDef) {
@@ -374,8 +377,8 @@ export class QueryQueue {
374377
requestId: options.requestId,
375378
activeQueryKeys: active,
376379
toProcessQueryKeys: toProcess,
377-
active: active.indexOf(this.redisHash(queryKey)) !== -1,
378-
queueIndex: toProcess.indexOf(this.redisHash(queryKey)),
380+
active: active.indexOf(queryKeyHash) !== -1,
381+
queueIndex: toProcess.indexOf(queryKeyHash),
379382
waitingForRequestId: queryDef.requestId
380383
});
381384
}

packages/cubejs-query-orchestrator/src/orchestrator/RedisQueueDriver.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ export class RedisQueueDriverConnection {
197197
}
198198

199199
async getQueryDef(queryKey) {
200-
const query = await this.redisClient.hgetAsync([this.queriesDefKey(), this.redisHash(queryKey)]);
200+
const query = await this.redisClient.hgetAsync([this.queriesDefKey(), queryKey]);
201201
return JSON.parse(query);
202202
}
203203

packages/cubejs-query-orchestrator/src/orchestrator/utils.ts

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import * as querystring from 'querystring';
33
import crypto from 'crypto';
44

55
import { getProcessUid } from '@cubejs-backend/shared';
6+
import { QueryKey, QueryKeyHash } from '@cubejs-backend/base-driver';
67

78
function parseHostPort(addr: string): { host: string, port: number } {
89
if (addr.includes(':')) {
@@ -187,17 +188,19 @@ export const processUidRE = /^[0-9a-f]{8}\b-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-
187188
/**
188189
* Returns query hash by specified `queryKey`.
189190
*/
190-
export function getCacheHash(queryKey) {
191-
return typeof queryKey === 'string' && queryKey.length < 256
192-
? queryKey
193-
: `${
194-
crypto
195-
.createHash('md5')
196-
.update(JSON.stringify(queryKey))
197-
.digest('hex')
198-
}${
199-
typeof queryKey === 'object' && queryKey.persistent
200-
? `@${getProcessUid()}`
201-
: ''
202-
}`;
191+
export function getCacheHash(queryKey: QueryKey): QueryKeyHash {
192+
if (typeof queryKey === 'string' && queryKey.length < 256) {
193+
return queryKey as any;
194+
}
195+
196+
const hash = crypto
197+
.createHash('md5')
198+
.update(JSON.stringify(queryKey))
199+
.digest('hex');
200+
201+
if (typeof queryKey === 'object' && queryKey.persistent) {
202+
return `${hash}@${getProcessUid()}` as any;
203+
}
204+
205+
return hash as any;
203206
}

0 commit comments

Comments
 (0)