Skip to content

Commit 3c10b05

Browse files
committed
refactor(query-orchestrator): Migrate QueryQueue to TS
1 parent ed2ca79 commit 3c10b05

File tree

6 files changed

+155
-164
lines changed

6 files changed

+155
-164
lines changed

packages/cubejs-base-driver/src/queue-driver.interface.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
1-
export type QueryDef = unknown;
2-
export type QueryKey = (string | [string, any[]]) & {
1+
export type QueryDef = any;
2+
export type QueryKeyOptions = any;
3+
export type QueryKeyTuple = [
4+
sql: string,
5+
params: unknown[],
6+
options?: QueryKeyOptions
7+
];
8+
export type QueryKey = (string | QueryKeyTuple) & {
39
persistent?: true,
410
};
11+
export type QueryMessageId = string;
512

613
export type AddToQueueResponse = [added: number, _b: any, _c: any, queueSize: number, addedToQueueTime: number];
714
export type QueryStageStateResponse = [active: string[], toProcess: string[]] | [active: string[], toProcess: string[], defs: Record<string, QueryDef>];
@@ -27,7 +34,7 @@ export interface QueueDriverOptions {
2734
}
2835

2936
export interface QueueDriverConnectionInterface {
30-
redisHash(queryKey: QueryKey): string;
37+
redisHash(queryKey: QueryKey): QueryMessageId;
3138
getResultBlocking(queryKey: QueryKey): Promise<unknown>;
3239
getResult(queryKey: QueryKey): Promise<any>;
3340
addToQueue(keyScore: number, queryKey: QueryKey, orphanedTime: any, queryHandler: any, query: AddToQueueQuery, priority: number, options: AddToQueueOptions): Promise<AddToQueueResponse>;
@@ -45,10 +52,10 @@ export interface QueueDriverConnectionInterface {
4552
// Trying to acquire a lock for processing a queue item, this method can return null when
4653
// multiple nodes tries to process the same query
4754
retrieveForProcessing(queryKey: QueryKey, processingId: number | string): Promise<RetrieveForProcessingResponse>;
48-
freeProcessingLock(queryKey: QueryKey, processingId: string | number, activated: unknown): Promise<void>;
55+
freeProcessingLock(queryKey: QueryKey, processingId: string | number, activated: unknown): Promise<void | string>;
4956
optimisticQueryUpdate(queryKey: QueryKey, toUpdate, processingId): Promise<boolean>;
5057
cancelQuery(queryKey: QueryKey): Promise<QueryDef | null>;
51-
getQueryAndRemove(queryKey: QueryKey): Promise<[QueryDef]>;
58+
getQueryAndRemove(queryKey: QueryMessageId): Promise<[QueryDef]>;
5259
setResultAndRemoveQuery(queryKey: QueryKey, executionResult: any, processingId: any): Promise<unknown>;
5360
release(): void;
5461
//

packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueDriver.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
1-
import { QueueDriverConnectionInterface, QueueDriverInterface } from '@cubejs-backend/base-driver';
1+
import {
2+
QueryKey,
3+
QueryMessageId,
4+
QueueDriverConnectionInterface,
5+
QueueDriverInterface,
6+
} from '@cubejs-backend/base-driver';
27
import { getCacheHash } from './utils';
38

49
export abstract class BaseQueueDriver implements QueueDriverInterface {
5-
public redisHash(queryKey: string) {
10+
public redisHash(queryKey: QueryKey): QueryMessageId {
611
return getCacheHash(queryKey);
712
}
813

packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -845,7 +845,7 @@ export class PreAggregationLoader {
845845
const queue = await this.preAggregations.getQueue(this.preAggregation.dataSource);
846846
return queue.executeInQueue(
847847
'query',
848-
this.preAggregationQueryKey(invalidationKeys),
848+
<any> this.preAggregationQueryKey(invalidationKeys),
849849
{
850850
preAggregation: this.preAggregation,
851851
preAggregationsTablesToTempTables: this.preAggregationsTablesToTempTables,
@@ -2015,7 +2015,7 @@ export class PreAggregations {
20152015
tables = tables.filter(row => `${schema}.${row.table_name}` === table);
20162016

20172017
// fetching query result
2018-
const { queueDriver } = this.queue[dataSource];
2018+
const queueDriver = this.queue[dataSource].getQueueDriver();
20192019
const conn = await queueDriver.createConnection();
20202020
const result = await conn.getResult(key);
20212021
queueDriver.release(conn);

packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -475,12 +475,12 @@ export class QueryCache {
475475
};
476476

477477
if (!persistent) {
478-
return queue.executeInQueue('query', cacheKey, _query, priority, opt);
478+
return queue.executeInQueue('query', cacheKey as any, _query, priority, opt);
479479
} else {
480-
const stream = queue.setQueryStream(cacheKey, aliasNameToMember);
480+
const stream = queue.setQueryStream(cacheKey as any, aliasNameToMember);
481481
// we don't want to handle error here as we want it to bubble up
482482
// to the api gateway
483-
queue.executeInQueue('stream', cacheKey, _query, priority, opt);
483+
queue.executeInQueue('stream', cacheKey as any, _query, priority, opt);
484484
return stream;
485485
}
486486
}

0 commit comments

Comments
 (0)