Skip to content

Commit 053264b

Browse files
authored
refactor(query-orchestrator): RedisQueueDriver - migrate to TypeScript (#6528)
1 parent cf1dd35 commit 053264b

File tree

6 files changed

+123
-71
lines changed

6 files changed

+123
-71
lines changed

packages/cubejs-base-driver/src/queue-driver.interface.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ export interface QueryKeyHash extends String {
66
__type: 'QueryKeyHash'
77
}
88

9+
export type GetActiveAndToProcessResponse = [active: string[], toProcess: string[]];
910
export type AddToQueueResponse = [added: number, _b: any, _c: any, queueSize: number, addedToQueueTime: number];
1011
export type QueryStageStateResponse = [active: string[], toProcess: string[]] | [active: string[], toProcess: string[], defs: Record<string, QueryDef>];
1112
export type RetrieveForProcessingSuccess = [
@@ -51,7 +52,19 @@ export interface QueueDriverConnectionInterface {
5152
redisHash(queryKey: QueryKey): QueryKeyHash;
5253
getResultBlocking(queryKey: QueryKey): Promise<unknown>;
5354
getResult(queryKey: QueryKey): Promise<any>;
54-
addToQueue(keyScore: number, queryKey: QueryKey, orphanedTime: any, queryHandler: any, query: AddToQueueQuery, priority: number, options: AddToQueueOptions): Promise<AddToQueueResponse>;
55+
/**
56+
* Adds specified by the queryKey query to the queue, returns tuple
57+
* with the operation result.
58+
*
59+
* @param keyScore Redis specific thing
60+
* @param queryKey
61+
* @param orphanedTime
62+
* @param queryHandler Our queue allow to use different handlers. For example query, cvsQuery, etc.
63+
* @param query
64+
* @param priority
65+
* @param options
66+
*/
67+
addToQueue(keyScore: number, queryKey: QueryKey, orphanedTime: number, queryHandler: string, query: AddToQueueQuery, priority: number, options: AddToQueueOptions): Promise<AddToQueueResponse>;
5568
// Return query keys which was sorted by priority and time
5669
getToProcessQueries(): Promise<string[]>;
5770
getActiveQueries(): Promise<string[]>;
@@ -74,7 +87,7 @@ export interface QueueDriverConnectionInterface {
7487
release(): void;
7588
//
7689
getQueriesToCancel(): Promise<string[]>
77-
getActiveAndToProcess(): Promise<[active: string[], toProcess: string[]]>;
90+
getActiveAndToProcess(): Promise<GetActiveAndToProcessResponse>;
7891
}
7992

8093
export interface QueueDriverInterface {

packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
4444
public async addToQueue(
4545
keyScore: number,
4646
queryKey: QueryKey,
47-
orphanedTime: any,
47+
orphanedTime: number,
4848
queryHandler: string,
4949
query: AddToQueueQuery,
5050
priority: number,

packages/cubejs-query-orchestrator/src/orchestrator/AsyncRedisClient.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ interface Multi extends Commands<Multi> {
1111
execAsync: <T = any>() => Promise<T>,
1212
}
1313

14-
interface AsyncRedisClient extends RedisClient {
14+
export interface AsyncRedisClient extends RedisClient {
1515
evalAsync: Commands<Promise<any>>['eval'],
1616
brpopAsync: Commands<Promise<any>>['brpop'],
1717
delAsync: Commands<Promise<any>>['del'],

packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueDriver.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ export abstract class BaseQueueDriver implements QueueDriverInterface {
1616

1717
abstract createConnection(): Promise<QueueDriverConnectionInterface>;
1818

19-
abstract release(connection: QueueDriverConnectionInterface): Promise<void>;
19+
abstract release(connection: QueueDriverConnectionInterface): void;
2020
}

packages/cubejs-query-orchestrator/src/orchestrator/RedisPool.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@ export class RedisPool {
8181

8282
public release(client: AsyncRedisClient) {
8383
if (this.pool) {
84-
this.pool.release(client);
84+
this.pool.release(client).catch(() => {
85+
// nothing to do
86+
});
8587
} else if (client) {
8688
client.quit();
8789
}

0 commit comments

Comments
 (0)