Skip to content

Commit 9eb4922

Browse files
committed
chore: fixes
1 parent b6f83d5 commit 9eb4922

File tree

5 files changed

+25
-17
lines changed

5 files changed

+25
-17
lines changed

packages/cubejs-backend-shared/src/type-helpers.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* This module export only type helpers for using it across Cube.js project
2+
* This module exports only type helpers for using it across the Cube project
33
*/
44

55
export type ResolveAwait<T> = T extends {
@@ -16,3 +16,6 @@ export type Required<T, K extends keyof T> = {
1616
};
1717

1818
export type Optional<T, K extends keyof T> = Pick<Partial<T>, K> & Omit<T, K>;
19+
20+
// <M extends Method<Class/Interface, M>>
21+
export type MethodName<T> = { [K in keyof T]: T[K] extends (...args: any[]) => any ? K : never }[keyof T];

packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {
1212
DriverInterface, QueryKey,
1313
} from '@cubejs-backend/base-driver';
1414

15-
import {QueryQueue, QueryQueueOptions} from './QueryQueue';
15+
import { QueryQueue, QueryQueueOptions } from './QueryQueue';
1616
import { ContinueWaitError } from './ContinueWaitError';
1717
import { LocalCacheDriver } from './LocalCacheDriver';
1818
import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory';

packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ import {
66
QueryKeyHash,
77
QueueId,
88
QueryDef,
9-
QueryStageStateResponse, QueueDriverOptions, QueryKeysTuple
9+
QueryStageStateResponse
1010
} from '@cubejs-backend/base-driver';
11-
import { CubeStoreDriver, CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver';
11+
import { CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver';
1212

1313
import { TimeoutError } from './TimeoutError';
1414
import { ContinueWaitError } from './ContinueWaitError';
@@ -20,11 +20,14 @@ export type QueryHandlerFn = (query: QueryDef, cancelHandler: CancelHandlerFn) =
2020
export type StreamHandlerFn = (query: QueryDef, stream: QueryStream) => Promise<unknown>;
2121
export type QueryHandlersMap = Record<string, QueryHandlerFn>;
2222

23+
export type SendProcessMessageFn = (queryKeyHash: QueryKeyHash, queueId: QueueId | null) => Promise<void> | void;
24+
export type SendCancelMessageFn = (query: QueryDef, queueId: QueueId | null) => Promise<void> | void;
25+
2326
export type QueryQueueOptions = {
2427
cacheAndQueueDriver: string;
2528
logger: (message, event) => void;
26-
sendCancelMessageFn?: (query, queueId) => Promise<void>;
27-
sendProcessMessageFn?: (queryKey, queryId) => Promise<void>;
29+
sendCancelMessageFn?: SendCancelMessageFn;
30+
sendProcessMessageFn?: SendProcessMessageFn;
2831
cancelHandlers: Record<string, CancelHandlerFn>;
2932
queryHandlers: QueryHandlersMap;
3033
streamHandler?: StreamHandlerFn;
@@ -69,9 +72,9 @@ export class QueryQueue {
6972

7073
protected heartBeatInterval: number;
7174

72-
protected readonly sendProcessMessageFn: any;
75+
protected readonly sendProcessMessageFn: SendProcessMessageFn;
7376

74-
protected readonly sendCancelMessageFn: any;
77+
protected readonly sendCancelMessageFn: SendCancelMessageFn;
7578

7679
protected readonly queryHandlers: QueryHandlersMap;
7780

@@ -485,7 +488,8 @@ export class QueryQueue {
485488
preAggregation: query.query?.preAggregation,
486489
addedToQueueTime: query.addedToQueueTime,
487490
});
488-
await this.sendCancelMessageFn(query);
491+
492+
await this.sendCancelMessageFn(query, queueId);
489493
}
490494

491495
return true;
@@ -501,7 +505,7 @@ export class QueryQueue {
501505
* @private
502506
* @returns {Promise<void>}
503507
*/
504-
async reconcileQueueImpl() {
508+
protected async reconcileQueueImpl() {
505509
const queueConnection = await this.queueDriver.createConnection();
506510
try {
507511
const toCancel = await queueConnection.getQueriesToCancel();

packages/cubejs-query-orchestrator/test/benchmarks/QueueBench.abstract.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
1-
import { CubeStoreDriver, CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver';
1+
import { CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver';
22
import crypto from 'crypto';
3-
import { createPromiseLock, pausePromise } from '@cubejs-backend/shared';
3+
import { createPromiseLock, MethodName, pausePromise } from '@cubejs-backend/shared';
44
import { QueueDriverConnectionInterface, QueueDriverInterface, } from '@cubejs-backend/base-driver';
5-
import {LocalQueueDriver, QueryQueue, QueryQueueOptions} from '../../src';
5+
import { LocalQueueDriver, QueryQueue, QueryQueueOptions } from '../../src';
66

77
export type QueryQueueTestOptions = Pick<QueryQueueOptions, 'cacheAndQueueDriver' | 'cubeStoreDriverFactory'> & {
88
beforeAll?: () => Promise<void>,
99
afterAll?: () => Promise<void>,
1010
};
1111

1212
function patchQueueDriverConnectionForTrack(connection: QueueDriverConnectionInterface, counters: any): QueueDriverConnectionInterface {
13-
function wrapAsyncMethod(methodName: string): any {
14-
return async function (...args) {
13+
function wrapAsyncMethod<M extends MethodName<QueueDriverConnectionInterface>>(methodName: M): any {
14+
return async (...args: Parameters<QueueDriverConnectionInterface[M]>) => {
1515
if (!(methodName in counters.methods)) {
1616
counters.methods[methodName] = {
1717
started: 1,
@@ -21,7 +21,7 @@ function patchQueueDriverConnectionForTrack(connection: QueueDriverConnectionInt
2121
counters.methods[methodName].started++;
2222
}
2323

24-
const result = await connection[methodName](...args);
24+
const result = await (connection[methodName] as any)(...args);
2525
counters.methods[methodName].finished++;
2626

2727
return result;
@@ -122,6 +122,7 @@ export function QueryQueueBenchmark(name: string, options: QueryQueueTestOptions
122122
},
123123
cancelHandlers: {
124124
query: async (_query) => {
125+
console.error('Cancel handler was called for query');
125126
},
126127
},
127128
continueWaitTimeout: 60 * 2,

packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import type { QueryKey, QueueDriverInterface } from '@cubejs-backend/base-driver
44
import { pausePromise } from '@cubejs-backend/shared';
55
import crypto from 'crypto';
66

7-
import {QueryQueue, QueryQueueOptions} from '../../src';
7+
import { QueryQueue, QueryQueueOptions } from '../../src';
88
import { processUidRE } from '../../src/orchestrator/utils';
99

1010
export type QueryQueueTestOptions = Pick<QueryQueueOptions, 'cacheAndQueueDriver' | 'cubeStoreDriverFactory'> & {

0 commit comments

Comments
 (0)