Skip to content

Commit 45fc571

Browse files
committed
chore: simplify code
1 parent ba2caed commit 45fc571

File tree

2 files changed

+48
-56
lines changed

2 files changed

+48
-56
lines changed

packages/cubejs-base-driver/src/queue-driver.interface.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,15 @@ export type ProcessingId = string | number;
66
export type QueryKey = (string | [string, any[]]) & {
77
persistent?: true,
88
};
9-
export interface QueryKeyHash extends String {
10-
__type: 'QueryKeyHash'
11-
}
9+
export type QueryKeyHash = string & { __type: 'QueryKeyHash' };
1210

1311
export type QueryKeysTuple = [keyHash: QueryKeyHash, queueId: QueueId | null /** Supported by new Cube Store and Memory */];
1412
export type GetActiveAndToProcessResponse = [active: QueryKeysTuple[], toProcess: QueryKeysTuple[]];
1513
export type AddToQueueResponse = [added: number, queueId: QueueId | null, queueSize: number, addedToQueueTime: number];
1614
export type QueryStageStateResponse = [active: string[], toProcess: string[]] | [active: string[], toProcess: string[], defs: Record<string, QueryDef>];
1715
export type RetrieveForProcessingSuccess = [
1816
added: unknown,
19-
// QueueId is required for Cube Store, other providers doesn't support it
17+
// QueueId is required for Cube Store, other providers don't support it
2018
queueId: QueueId | null,
2119
active: QueryKeyHash[],
2220
pending: number,
@@ -25,7 +23,7 @@ export type RetrieveForProcessingSuccess = [
2523
];
2624
export type RetrieveForProcessingFail = [
2725
added: unknown,
28-
// QueueId is required for Cube Store, other providers doesn't support it
26+
// QueueId is required for Cube Store, other providers don't support it
2927
queueId: QueueId | null,
3028
active: QueryKeyHash[],
3129
pending: number,

packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriverConnection.ts

Lines changed: 45 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ import {
2222
export interface QueueItem {
2323
order: number;
2424
key: any;
25-
queueId: any;
25+
queueId: QueueId;
2626
}
2727

28-
export interface QueryQueueObject {
29-
queueId: any;
28+
export interface QueryDefObject {
29+
queueId: QueueId;
3030
queryHandler: string;
3131
query: any;
3232
queryKey: any;
@@ -46,21 +46,21 @@ export interface ProcessingCounter {
4646
}
4747

4848
export class LocalQueueDriverConnectionState {
49-
public resultPromises: Record<string, PromiseWithResolve> = {};
49+
public resultPromises: Record<QueryKeyHash, PromiseWithResolve> = {};
5050

51-
public queryDef: Record<string, QueryQueueObject> = {};
51+
public queryDef: Record<QueryKeyHash, QueryDefObject> = {};
5252

53-
public toProcess: Record<string, QueueItem> = {};
53+
public toProcess: Record<QueryKeyHash, QueueItem> = {};
5454

55-
public recent: Record<string, QueueItem> = {};
55+
public recent: Record<QueryKeyHash, QueueItem> = {};
5656

57-
public active: Record<string, QueueItem> = {};
57+
public active: Record<QueryKeyHash, QueueItem> = {};
5858

59-
public heartBeat: Record<string, QueueItem> = {};
59+
public heartBeat: Record<QueryKeyHash, QueueItem> = {};
6060

6161
public processingCounter: ProcessingCounter = { counter: 1 };
6262

63-
public processingLocks: Record<string, any> = {};
63+
public processingLocks: Record<QueryKeyHash, any> = {};
6464
}
6565

6666
export class LocalQueueDriverConnection implements QueueDriverConnectionInterface {
@@ -118,7 +118,7 @@ export class LocalQueueDriverConnection implements QueueDriverConnectionInterfac
118118

119119
public async getResultBlocking(queryKeyHash: QueryKeyHash, _queueId?: QueueId): Promise<any> {
120120
const resultListKey = this.resultListKey(queryKeyHash);
121-
if (!this.state.queryDef[queryKeyHash as unknown as string] && !this.state.resultPromises[resultListKey]) {
121+
if (!this.state.queryDef[queryKeyHash] && !this.state.resultPromises[resultListKey]) {
122122
return null;
123123
}
124124
const timeoutPromise = (timeout: number) => new Promise((resolve) => setTimeout(() => resolve(null), timeout));
@@ -148,7 +148,7 @@ export class LocalQueueDriverConnection implements QueueDriverConnectionInterfac
148148
R.values,
149149
R.filter(orderFilterLessThan ? (q: QueueItem) => q.order < orderFilterLessThan : R.identity),
150150
R.sortBy((q: QueueItem) => q.order),
151-
R.map((q: QueueItem) => q.key as unknown as string)
151+
R.map((q: QueueItem) => q.key)
152152
)(queueObj);
153153
}
154154

@@ -162,7 +162,7 @@ export class LocalQueueDriverConnection implements QueueDriverConnectionInterfac
162162
}
163163

164164
public async addToQueue(keyScore: number, queryKey: QueryKey, orphanedTime: number, queryHandler: string, query: AddToQueueQuery, priority: number, options: AddToQueueOptions): Promise<AddToQueueResponse> {
165-
const queryQueueObj: QueryQueueObject = {
165+
const queryQueueObj: QueryDefObject = {
166166
queueId: options.queueId,
167167
queryHandler,
168168
query,
@@ -174,7 +174,7 @@ export class LocalQueueDriverConnection implements QueueDriverConnectionInterfac
174174
};
175175

176176
const key = this.redisHash(queryKey);
177-
const keyStr = key as unknown as string;
177+
const keyStr = key as string;
178178

179179
if (!this.state.queryDef[keyStr]) {
180180
this.state.queryDef[keyStr] = queryQueueObj;
@@ -215,15 +215,14 @@ export class LocalQueueDriverConnection implements QueueDriverConnectionInterfac
215215
}
216216

217217
public async getQueryAndRemove(queryKeyHash: QueryKeyHash, _queueId?: QueueId | null): Promise<[QueryDef]> {
218-
const keyStr = queryKeyHash as unknown as string;
219-
const query = this.state.queryDef[keyStr];
218+
const query = this.state.queryDef[queryKeyHash];
220219

221-
delete this.state.active[keyStr];
222-
delete this.state.heartBeat[keyStr];
223-
delete this.state.toProcess[keyStr];
224-
delete this.state.recent[keyStr];
225-
delete this.state.queryDef[keyStr];
226-
delete this.state.processingLocks[keyStr];
220+
delete this.state.active[queryKeyHash];
221+
delete this.state.heartBeat[queryKeyHash];
222+
delete this.state.toProcess[queryKeyHash];
223+
delete this.state.recent[queryKeyHash];
224+
delete this.state.queryDef[queryKeyHash];
225+
delete this.state.processingLocks[queryKeyHash];
227226

228227
return [query];
229228
}
@@ -234,19 +233,18 @@ export class LocalQueueDriverConnection implements QueueDriverConnectionInterfac
234233
}
235234

236235
public async setResultAndRemoveQuery(queryKeyHash: QueryKeyHash, executionResult: any, processingId: ProcessingId, _queueId?: QueueId | null): Promise<boolean> {
237-
const keyStr = queryKeyHash as unknown as string;
238-
if (this.state.processingLocks[keyStr] !== processingId) {
236+
if (this.state.processingLocks[queryKeyHash] !== processingId) {
239237
return false;
240238
}
241239

242240
const promise = this.getResultPromise(this.resultListKey(queryKeyHash));
243241

244-
delete this.state.active[keyStr];
245-
delete this.state.heartBeat[keyStr];
246-
delete this.state.toProcess[keyStr];
247-
delete this.state.recent[keyStr];
248-
delete this.state.queryDef[keyStr];
249-
delete this.state.processingLocks[keyStr];
242+
delete this.state.active[queryKeyHash];
243+
delete this.state.heartBeat[queryKeyHash];
244+
delete this.state.toProcess[queryKeyHash];
245+
delete this.state.recent[queryKeyHash];
246+
delete this.state.queryDef[queryKeyHash];
247+
delete this.state.processingLocks[queryKeyHash];
250248

251249
promise.resolved = true;
252250
if (promise.resolve) {
@@ -274,65 +272,61 @@ export class LocalQueueDriverConnection implements QueueDriverConnectionInterfac
274272
}
275273

276274
public async getQueryDef(queryKeyHash: QueryKeyHash, _queueId?: QueueId | null): Promise<QueryDef | null> {
277-
return this.state.queryDef[queryKeyHash as unknown as string] || null;
275+
return this.state.queryDef[queryKeyHash] || null;
278276
}
279277

280278
public async updateHeartBeat(queryKeyHash: QueryKeyHash, queueId?: QueueId | null): Promise<void> {
281-
const keyStr = queryKeyHash as unknown as string;
282-
if (this.state.heartBeat[keyStr]) {
283-
this.state.heartBeat[keyStr] = { key: queryKeyHash, order: new Date().getTime(), queueId: queueId || this.state.heartBeat[keyStr].queueId };
279+
if (this.state.heartBeat[queryKeyHash]) {
280+
this.state.heartBeat[queryKeyHash] = { key: queryKeyHash, order: new Date().getTime(), queueId: queueId || this.state.heartBeat[queryKeyHash].queueId };
284281
}
285282
}
286283

287284
public async retrieveForProcessing(queryKeyHash: QueryKeyHash, processingId: ProcessingId): Promise<RetrieveForProcessingResponse> {
288-
const keyStr = queryKeyHash as unknown as string;
289285
let lockAcquired = false;
290286

291-
if (!this.state.processingLocks[keyStr]) {
292-
this.state.processingLocks[keyStr] = processingId;
287+
if (!this.state.processingLocks[queryKeyHash]) {
288+
this.state.processingLocks[queryKeyHash] = processingId;
293289
lockAcquired = true;
294290
} else {
295291
return null;
296292
}
297293

298294
let added = 0;
299295

300-
if (Object.keys(this.state.active).length < this.concurrency && !this.state.active[keyStr]) {
301-
this.state.active[keyStr] = { key: queryKeyHash, order: Number(processingId), queueId: Number(processingId) };
302-
delete this.state.toProcess[keyStr];
296+
if (Object.keys(this.state.active).length < this.concurrency && !this.state.active[queryKeyHash]) {
297+
this.state.active[queryKeyHash] = { key: queryKeyHash, order: Number(processingId), queueId: Number(processingId) };
298+
delete this.state.toProcess[queryKeyHash];
303299

304300
added = 1;
305301
}
306302

307-
this.state.heartBeat[keyStr] = { key: queryKeyHash, order: new Date().getTime(), queueId: Number(processingId) };
303+
this.state.heartBeat[queryKeyHash] = { key: queryKeyHash, order: new Date().getTime(), queueId: Number(processingId) };
308304

309305
return [
310306
added,
311-
this.state.queryDef[keyStr]?.queueId || null,
312-
this.queueArray(this.state.active) as unknown as QueryKeyHash[],
307+
this.state.queryDef[queryKeyHash]?.queueId || null,
308+
this.queueArray(this.state.active) as QueryKeyHash[],
313309
Object.keys(this.state.toProcess).length,
314-
this.state.queryDef[keyStr],
310+
this.state.queryDef[queryKeyHash],
315311
lockAcquired
316312
];
317313
}
318314

319315
public async freeProcessingLock(queryKeyHash: QueryKeyHash, processingId: ProcessingId, activated: any): Promise<void> {
320-
const keyStr = queryKeyHash as unknown as string;
321-
if (this.state.processingLocks[keyStr] === processingId) {
322-
delete this.state.processingLocks[keyStr];
316+
if (this.state.processingLocks[queryKeyHash] === processingId) {
317+
delete this.state.processingLocks[queryKeyHash];
323318
if (activated) {
324-
delete this.state.active[keyStr];
319+
delete this.state.active[queryKeyHash];
325320
}
326321
}
327322
}
328323

329324
public async optimisticQueryUpdate(queryKeyHash: QueryKeyHash, toUpdate: any, processingId: ProcessingId, _queueId?: QueueId | null): Promise<boolean> {
330-
const keyStr = queryKeyHash as unknown as string;
331-
if (this.state.processingLocks[keyStr] !== processingId) {
325+
if (this.state.processingLocks[queryKeyHash] !== processingId) {
332326
return false;
333327
}
334328

335-
this.state.queryDef[keyStr] = { ...this.state.queryDef[keyStr], ...toUpdate };
329+
this.state.queryDef[queryKeyHash] = { ...this.state.queryDef[queryKeyHash], ...toUpdate };
336330
return true;
337331
}
338332

0 commit comments

Comments
 (0)