diff --git a/packages/cubejs-backend-shared/src/type-helpers.ts b/packages/cubejs-backend-shared/src/type-helpers.ts index a51151acffddd..4b677d82de251 100644 --- a/packages/cubejs-backend-shared/src/type-helpers.ts +++ b/packages/cubejs-backend-shared/src/type-helpers.ts @@ -1,5 +1,5 @@ /** - * This module export only type helpers for using it across Cube.js project + * This module exports only type helpers for using it across the Cube project */ export type ResolveAwait = T extends { @@ -16,3 +16,6 @@ export type Required = { }; export type Optional = Pick, K> & Omit; + +// > +export type MethodName = { [K in keyof T]: T[K] extends (...args: any[]) => any ? K : never }[keyof T]; diff --git a/packages/cubejs-base-driver/src/queue-driver.interface.ts b/packages/cubejs-base-driver/src/queue-driver.interface.ts index 29674f8c2077b..fe192cb41d9ff 100644 --- a/packages/cubejs-base-driver/src/queue-driver.interface.ts +++ b/packages/cubejs-base-driver/src/queue-driver.interface.ts @@ -1,4 +1,4 @@ -export type QueryDef = unknown; +export type QueryDef = any; // Primary key of Queue item export type QueueId = string | number | bigint; // This was used as a lock for Redis, deprecated. @@ -38,10 +38,12 @@ export interface AddToQueueQuery { } export interface AddToQueueOptions { - stageQueryKey: string, + // It's an ugly workaround for skip queue tasks + queueId?: QueueId, + stageQueryKey?: any, requestId: string, + spanId?: string, orphanedTimeout?: number, - queueId: QueueId, } export interface QueueDriverOptions { @@ -70,19 +72,19 @@ export interface QueueDriverConnectionInterface { * @param options */ addToQueue(keyScore: number, queryKey: QueryKey, orphanedTime: number, queryHandler: string, query: AddToQueueQuery, priority: number, options: AddToQueueOptions): Promise; - // Return query keys that were sorted by priority and time + // Return query keys which was sorted by priority and time getToProcessQueries(): Promise; getActiveQueries(): Promise; getQueryDef(hash: QueryKeyHash, queueId: QueueId | null): Promise; - // Queries that were added to queue, but was not processed and not needed + // Queries which was added to queue, but was not processed and not needed getOrphanedQueries(): Promise; - // Queries that were not completed with old heartbeat + // Queries which was not completed with old heartbeat getStalledQueries(): Promise; getQueryStageState(onlyKeys: boolean): Promise; updateHeartBeat(hash: QueryKeyHash, queueId: QueueId | null): Promise; getNextProcessingId(): Promise; // Trying to acquire a lock for processing a queue item, this method can return null when - // multiple nodes try to process the same query + // multiple nodes tries to process the same query retrieveForProcessing(hash: QueryKeyHash, processingId: ProcessingId): Promise; freeProcessingLock(hash: QueryKeyHash, processingId: ProcessingId, activated: unknown): Promise; optimisticQueryUpdate(hash: QueryKeyHash, toUpdate: unknown, processingId: ProcessingId, queueId: QueueId | null): Promise; diff --git a/packages/cubejs-query-orchestrator/CLAUDE.md b/packages/cubejs-query-orchestrator/CLAUDE.md index a568c49647b91..2867ba7b7f237 100644 --- a/packages/cubejs-query-orchestrator/CLAUDE.md +++ b/packages/cubejs-query-orchestrator/CLAUDE.md @@ -26,7 +26,7 @@ yarn test # Run only unit tests yarn unit -# Run only integration tests +# Run only integration tests yarn integration # Run CubeStore integration tests specifically @@ -46,8 +46,8 @@ yarn lint:fix The Query Orchestrator consists of several interconnected components: 1. **QueryOrchestrator** (`src/orchestrator/QueryOrchestrator.ts`): Main orchestration class that coordinates query execution and manages drivers -2. **QueryCache** (`src/orchestrator/QueryCache.ts`): Handles query result caching with configurable cache drivers -3. **QueryQueue** (`src/orchestrator/QueryQueue.js`): Manages query queuing and background processing +2. **QueryCache** (`src/orchestrator/QueryCache.ts`): Handles query result caching with configurable cache drivers +3. **QueryQueue** (`src/orchestrator/QueryQueue.ts`): Manages query queuing and background processing 4. **PreAggregations** (`src/orchestrator/PreAggregations.ts`): Manages pre-aggregation building and loading 5. **DriverFactory** (`src/orchestrator/DriverFactory.ts`): Creates and manages database driver instances @@ -55,7 +55,7 @@ The Query Orchestrator consists of several interconnected components: The orchestrator supports multiple backend drivers: - **Memory**: In-memory caching and queuing (development) -- **CubeStore**: Distributed storage engine (production) +- **CubeStore**: Distributed storage engine (production) - **Redis**: External Redis-based caching (legacy, being phased out) Driver selection logic in `QueryOrchestrator.ts:detectQueueAndCacheDriver()`: @@ -128,4 +128,4 @@ Key configuration options in `QueryOrchestratorOptions`: - Inherits linting rules from `@cubejs-backend/linter` - Jest configuration extends base repository config - Docker Compose setup for integration testing -- Coverage reports generated in `coverage/` directory \ No newline at end of file +- Coverage reports generated in `coverage/` directory diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts index 16703906b76e1..784d66aa4534f 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts @@ -418,7 +418,8 @@ export class PreAggregationLoader { const queue = await this.preAggregations.getQueue(this.preAggregation.dataSource); return queue.executeInQueue( 'query', - this.preAggregationQueryKey(invalidationKeys), + // TODO: Sync types + this.preAggregationQueryKey(invalidationKeys) as any, { preAggregation: this.preAggregation, preAggregationsTablesToTempTables: this.preAggregationsTablesToTempTables, diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts index 3fe3a1ab7305b..30a45ff1da9d9 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts @@ -376,10 +376,9 @@ export class PreAggregations { tables = tables.filter(row => `${schema}.${row.table_name}` === table); // fetching query result - const { queueDriver } = this.queue[dataSource]; - const conn = await queueDriver.createConnection(); + const conn = await this.queue[dataSource].getQueueDriver().createConnection(); const result = await conn.getResult(key); - queueDriver.release(conn); + this.queue[dataSource].getQueueDriver().release(conn); // calculating status let status: string; diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index cf9659686c237..3c07ac545ed5d 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -9,10 +9,10 @@ import { InlineTables, CacheDriverInterface, TableStructure, - DriverInterface, + DriverInterface, QueryKey, } from '@cubejs-backend/base-driver'; -import { QueryQueue } from './QueryQueue'; +import { QueryQueue, QueryQueueOptions } from './QueryQueue'; import { ContinueWaitError } from './ContinueWaitError'; import { LocalCacheDriver } from './LocalCacheDriver'; import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory'; @@ -115,7 +115,7 @@ export interface QueryCacheOptions { }>; cubeStoreDriverFactory?: () => Promise, continueWaitTimeout?: number; - cacheAndQueueDriver?: CacheAndQueryDriverType; + cacheAndQueueDriver: CacheAndQueryDriverType; maxInMemoryCacheEntries?: number; skipExternalCacheAndQueue?: boolean; } @@ -133,7 +133,7 @@ export class QueryCache { protected readonly redisPrefix: string, protected readonly driverFactory: DriverFactoryByDataSource, protected readonly logger: any, - public readonly options: QueryCacheOptions = {} + public readonly options: QueryCacheOptions ) { switch (options.cacheAndQueueDriver || 'memory') { case 'memory': @@ -455,9 +455,9 @@ export class QueryCache { }; if (!persistent) { - return queue.executeInQueue('query', cacheKey, _query, priority, opt); + return queue.executeInQueue('query', cacheKey as QueryKey, _query, priority, opt); } else { - return queue.executeInQueue('stream', cacheKey, { + return queue.executeInQueue('stream', cacheKey as QueryKey, { ..._query, aliasNameToMember, }, priority, opt); @@ -563,7 +563,7 @@ export class QueryCache { redisPrefix: string, clientFactory: DriverFactory, executeFn: (client: BaseDriver, req: any) => any, - options: Record = {} + options: Omit ): QueryQueue { const queue: any = new QueryQueue(redisPrefix, { queryHandlers: { @@ -583,57 +583,57 @@ export class QueryCache { } return result; }, - stream: async (req, target) => { - queue.logger('Streaming SQL', { ...req }); - await (new Promise((resolve, reject) => { - let logged = false; - Promise - .all([clientFactory()]) - .then(([client]) => (client).stream(req.query, req.values, { highWaterMark: getEnv('dbQueryStreamHighWaterMark') })) - .then((source) => { - const cleanup = async (error) => { - if (source.release) { - const toRelease = source.release; - delete source.release; - await toRelease(); - } - if (error && !target.destroyed) { - target.destroy(error); - } - if (!logged && target.destroyed) { - logged = true; - if (error) { - queue.logger('Streaming done with error', { - query: req.query, - query_values: req.values, - error, - }); - reject(error); - } else { - queue.logger('Streaming successfully completed', { - requestId: req.requestId, - }); - resolve(req.requestId); - } + }, + streamHandler: async (req, target) => { + queue.logger('Streaming SQL', { ...req }); + await (new Promise((resolve, reject) => { + let logged = false; + Promise + .all([clientFactory()]) + .then(([client]) => (client).stream(req.query, req.values, { highWaterMark: getEnv('dbQueryStreamHighWaterMark') })) + .then((source) => { + const cleanup = async (error) => { + if (source.release) { + const toRelease = source.release; + delete source.release; + await toRelease(); + } + if (error && !target.destroyed) { + target.destroy(error); + } + if (!logged && target.destroyed) { + logged = true; + if (error) { + queue.logger('Streaming done with error', { + query: req.query, + query_values: req.values, + error, + }); + reject(error); + } else { + queue.logger('Streaming successfully completed', { + requestId: req.requestId, + }); + resolve(req.requestId); } - }; - - source.rowStream.once('end', () => cleanup(undefined)); - source.rowStream.once('error', cleanup); - source.rowStream.once('close', () => cleanup(undefined)); - - target.once('end', () => cleanup(undefined)); - target.once('error', cleanup); - target.once('close', () => cleanup(undefined)); - - source.rowStream.pipe(target); - }) - .catch((reason) => { - target.emit('error', reason); - resolve(reason); - }); - })); - }, + } + }; + + source.rowStream.once('end', () => cleanup(undefined)); + source.rowStream.once('error', cleanup); + source.rowStream.once('close', () => cleanup(undefined)); + + target.once('end', () => cleanup(undefined)); + target.once('error', cleanup); + target.once('close', () => cleanup(undefined)); + + source.rowStream.pipe(target); + }) + .catch((reason) => { + target.emit('error', reason); + resolve(reason); + }); + })); }, cancelHandlers: { query: async (req) => { diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts index cb326efc8d2ea..0e60da3dc32e9 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts @@ -3,6 +3,7 @@ import R from 'ramda'; import { getEnv } from '@cubejs-backend/shared'; import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver'; +import { QueryKey } from '@cubejs-backend/base-driver'; import { QueryCache, QueryBody, TempTable, PreAggTableToTempTable } from './QueryCache'; import { PreAggregations, PreAggregationDescription, getLastUpdatedAtTimestamp } from './PreAggregations'; import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory'; @@ -306,7 +307,7 @@ export class QueryOrchestrator { if (pendingPreAggregationIndex === -1) { const qcQueue = await this.queryCache.getQueue(queryBody.dataSource); - return qcQueue.getQueryStage(QueryCache.queryCacheKey(queryBody)); + return qcQueue.getQueryStage(QueryCache.queryCacheKey(queryBody) as QueryKey); } const preAggregation = queryBody.preAggregations[pendingPreAggregationIndex]; diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.ts similarity index 83% rename from packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js rename to packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.ts index 923f2f3be1503..629888aa83ef4 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.ts @@ -1,4 +1,3 @@ -import R from 'ramda'; import { EventEmitter } from 'events'; import { getEnv, getProcessUid } from '@cubejs-backend/shared'; import { @@ -7,7 +6,8 @@ import { QueryKeyHash, QueueId, QueryDef, - QueryStageStateResponse + QueryStageStateResponse, + AddToQueueOptions } from '@cubejs-backend/base-driver'; import { CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver'; @@ -15,13 +15,41 @@ import { TimeoutError } from './TimeoutError'; import { ContinueWaitError } from './ContinueWaitError'; import { LocalQueueDriver } from './LocalQueueDriver'; import { QueryStream } from './QueryStream'; - -/** - * @param cacheAndQueueDriver - * @param queueDriverOptions - * @returns {QueueDriverInterface} - */ -function factoryQueueDriver(cacheAndQueueDriver, queueDriverOptions) { +import { CacheAndQueryDriverType } from './QueryOrchestrator'; + +export type CancelHandlerFn = (query: QueryDef) => Promise; +export type QueryHandlerFn = (query: QueryDef, cancelHandler: CancelHandlerFn) => Promise; +export type StreamHandlerFn = (query: QueryDef, stream: QueryStream) => Promise; +export type QueryHandlersMap = Record; + +export type SendProcessMessageFn = (queryKeyHash: QueryKeyHash, queueId: QueueId | null) => Promise | void; +export type SendCancelMessageFn = (query: QueryDef, queueId: QueueId | null) => Promise | void; + +export type ExecuteInQueueOptions = Omit & { + spanId?: string +}; + +export type QueryQueueOptions = { + cacheAndQueueDriver: CacheAndQueryDriverType; + logger: (message, event) => void; + sendCancelMessageFn?: SendCancelMessageFn; + sendProcessMessageFn?: SendProcessMessageFn; + cancelHandlers: Record; + queryHandlers: QueryHandlersMap; + streamHandler?: StreamHandlerFn; + processUid?: string; + concurrency?: number, + continueWaitTimeout?: number, + executionTimeout?: number, + orphanedTimeout?: number, + heartBeatInterval?: number, + redisPool?: any, + cubeStoreDriverFactory?: any, + queueDriverFactory?: (cacheAndQueueDriver: string, queueDriverOptions: any) => QueueDriverInterface, + skipQueue?: boolean, +}; + +function factoryQueueDriver(cacheAndQueueDriver: string, queueDriverOptions): QueueDriverInterface { switch (cacheAndQueueDriver || 'memory') { case 'memory': return new LocalQueueDriver(queueDriverOptions); @@ -40,79 +68,60 @@ function factoryQueueDriver(cacheAndQueueDriver, queueDriverOptions) { } export class QueryQueue { + protected concurrency: number; + + protected continueWaitTimeout: number; + + protected executionTimeout: number; + + protected orphanedTimeout: number; + + protected heartBeatInterval: number; + + protected readonly sendProcessMessageFn: SendProcessMessageFn; + + protected readonly sendCancelMessageFn: SendCancelMessageFn; + + protected readonly queryHandlers: QueryHandlersMap; + + protected readonly streamHandler: StreamHandlerFn | undefined; + + protected cancelHandlers: Record; + + protected logger: any; + + protected processUid: string; + + protected queueDriver: QueueDriverInterface; + + protected skipQueue: boolean; + /** - * Class constructor. - * - * @param {*} redisQueuePrefix - * @param {*} options + * Persistent queries streams maps. */ - constructor(redisQueuePrefix, options) { - /** - * @type {string} - */ - this.redisQueuePrefix = redisQueuePrefix; - - /** - * @type {number} - */ - this.concurrency = options.concurrency || 2; + protected readonly streams = new Map(); - /** - * @protected - * @type {number} - */ - this.continueWaitTimeout = options.continueWaitTimeout || 5; + /** + * Notify streaming queries when streaming has been started and stream is available. + */ + protected readonly streamEvents = new EventEmitter(); - /** - * @protected - * @type {number} - */ + public constructor( + protected readonly redisQueuePrefix: string, + options: QueryQueueOptions + ) { + this.concurrency = options.concurrency || 2; + this.continueWaitTimeout = options.continueWaitTimeout || 5; this.executionTimeout = options.executionTimeout || getEnv('dbQueryTimeout'); - - /** - * @protected - * @type {number} - */ this.orphanedTimeout = options.orphanedTimeout || 120; - - /** - * @protected - * @type {number} - */ this.heartBeatInterval = options.heartBeatInterval || 30; - /** - * @protected - * @type {function(QueryKeyHash, QueueId | null): Promise} - */ this.sendProcessMessageFn = options.sendProcessMessageFn || ((queryKey, queryId) => { this.processQuery(queryKey, queryId); }); - - /** - * @protected - * @param {QueryDef} query - * @param {QueueId | null} queueId - * @type {function(*): Promise} - */ this.sendCancelMessageFn = options.sendCancelMessageFn || ((query, queueId) => { this.processCancel(query, queueId); }); - - /** - * @protected - * @type {*} - */ this.queryHandlers = options.queryHandlers; - - /** - * @protected - * @type {*} - */ + this.streamHandler = options.streamHandler; this.cancelHandlers = options.cancelHandlers; - - /** - * @protected - * @type {function(string, *): void} - */ this.logger = options.logger || ((message, event) => console.log(`${message} ${JSON.stringify(event)}`)); - this.processUid = options.processUid || getProcessUid(); const queueDriverOptions = { @@ -128,43 +137,26 @@ export class QueryQueue { const queueDriverFactory = options.queueDriverFactory || factoryQueueDriver; - /** - * @type {QueueDriverInterface} - */ this.queueDriver = queueDriverFactory(options.cacheAndQueueDriver, queueDriverOptions); - /** - * @protected - * @type {boolean} - */ this.skipQueue = options.skipQueue; + } - /** - * Persistent queries streams maps. - */ - this.streams = new Map(); + public getQueueDriver(): QueueDriverInterface { + return this.queueDriver; + } - /** - * Notify streaming queries when streaming has been started and stream is available. - */ - this.streamEvents = new EventEmitter(); + public getConcurrency(): number { + return this.concurrency; } /** * Returns stream object which will be used to pipe data from data source. - * - * @param {QueryKeyHash} queryKeyHash - * @return {QueryStream | undefined} */ - getQueryStream(queryKeyHash) { + public getQueryStream(queryKeyHash: QueryKeyHash): QueryStream | undefined { return this.streams.get(queryKeyHash); } - /** - * @param {QueryKeyHash} key - * @param {{ [alias: string]: string }} aliasNameToMember - * @return {QueryStream} - */ - createQueryStream(key, aliasNameToMember) { + public createQueryStream(key: QueryKeyHash, aliasNameToMember: Record): QueryStream { const stream = new QueryStream({ key, streams: this.streams, @@ -176,9 +168,9 @@ export class QueryQueue { return stream; } - counter = 0; + protected counter = 0; - generateQueueId() { + public generateQueueId() { return this.counter++; } @@ -187,24 +179,20 @@ export class QueryQueue { * `options.skipQueue` is set to `false`, execute query skipping queue * otherwise. * - * @param {string} queryHandler - * @param {*} queryKey - * @param {*} query - * @param {number=} priority - * @param {*=} options - * @returns {*} - * * @throw {ContinueWaitError} */ - async executeInQueue( - queryHandler, - queryKey, - query, - priority, - options, + public async executeInQueue( + queryHandler: string, + queryKey: QueryKey, + query: QueryDef, + priority?: number, + executeOptions?: ExecuteInQueueOptions, ) { - options = options || {}; - options.queueId = this.generateQueueId(); + const options: AddToQueueOptions = { + queueId: this.generateQueueId(), + ...executeOptions, + }; + if (this.skipQueue) { const queryDef = { queryHandler, @@ -261,6 +249,7 @@ export class QueryQueue { const keyScore = time + (10000 - priority) * 1E14; options.orphanedTimeout = query.orphanedTimeout; + const orphanedTimeout = 'orphanedTimeout' in query ? query.orphanedTimeout : this.orphanedTimeout; const orphanedTime = time + (orphanedTimeout * 1000); @@ -381,14 +370,9 @@ export class QueryQueue { } /** - * Parse query result. - * - * @param {*} result - * @returns {*} - * * @throw {Error} */ - parseResult(result) { + protected parseResult(result: any): any { if (!result) { return; } @@ -404,14 +388,16 @@ export class QueryQueue { } } + protected reconcileAgain: boolean = false; + + protected reconcilePromise: Promise | null = null; + /** * Run query queue reconciliation flow by calling internal `reconcileQueueImpl` * method. Returns promise which will be resolved with the reconciliation * result. - * - * @returns {Promise} */ - async reconcileQueue() { + public async reconcileQueue(): Promise { if (!this.reconcilePromise) { this.reconcileAgain = false; this.reconcilePromise = this.reconcileQueueImpl() @@ -436,7 +422,7 @@ export class QueryQueue { return this.reconcilePromise; } - async shutdown() { + public async shutdown(): Promise { if (this.reconcilePromise) { await this.reconcilePromise; @@ -452,7 +438,7 @@ export class QueryQueue { * * @returns {Promise} */ - async getQueries() { + public async getQueries() { const queueConnection = await this.queueDriver.createConnection(); try { const [stalledQueries, orphanedQueries, activeQueries, toProcessQueries] = await Promise.all([ @@ -499,14 +485,7 @@ export class QueryQueue { } } - /** - * Cancel query by its `queryKey`. - * - * @param {QueryKeyHash} queryKey - * @param {QueueId | null} queueId - * @returns {void} - */ - async cancelQuery(queryKey, queueId) { + public async cancelQuery(queryKey: QueryKeyHash, queueId: QueueId | null): Promise { const queueConnection = await this.queueDriver.createConnection(); try { const query = await queueConnection.cancelQuery(queryKey, queueId); @@ -523,7 +502,8 @@ export class QueryQueue { preAggregation: query.query?.preAggregation, addedToQueueTime: query.addedToQueueTime, }); - await this.sendCancelMessageFn(query); + + await this.sendCancelMessageFn(query, queueId); } return true; @@ -539,7 +519,7 @@ export class QueryQueue { * @private * @returns {Promise} */ - async reconcileQueueImpl() { + protected async reconcileQueueImpl() { const queueConnection = await this.queueDriver.createConnection(); try { const toCancel = await queueConnection.getQueriesToCancel(); @@ -565,25 +545,24 @@ export class QueryQueue { const [_active, toProcess] = await queueConnection.getActiveAndToProcess(); - await Promise.all( - R.pipe( - R.filter(([queryKey, _queueId]) => { - const subKeys = queryKey.split('@'); - if (subKeys.length === 1) { - // common queries - return true; - } else if (subKeys[1] === this.processUid) { - // current process persistent queries - return true; - } else { - // other processes persistent queries - return false; - } - }), - R.take(this.concurrency), - R.map((([queryKey, queueId]) => this.sendProcessMessageFn(queryKey, queueId))) - )(toProcess) - ); + const tasks = toProcess + .filter(([queryKey, _queueId]) => { + const subKeys = queryKey.split('@'); + if (subKeys.length === 1) { + // common queries + return true; + } else if (subKeys[1] === this.processUid) { + // current process persistent queries + return true; + } else { + // other processes persistent queries + return false; + } + }) + .slice(0, this.concurrency) + .map(([queryKey, queueId]) => this.sendProcessMessageFn(queryKey, queueId)); + + await Promise.all(tasks); } finally { this.queueDriver.release(queueConnection); } @@ -591,20 +570,17 @@ export class QueryQueue { /** * Apply query timeout to the query. Throw if query execution time takes more - * than specified timeout. Returns resolved `promise` value. - * - * @param {Promise<*>} promise - * @returns {Promise<*>} + * than the specified timeout. Returns resolved `promise` value. * - * @throw + * @throw {TimeoutError} */ - queryTimeout(promise) { + protected queryTimeout(promise: Promise): Promise { let timeout; const { executionTimeout } = this; - return Promise.race([ + return Promise.race([ promise, - new Promise((resolve, reject) => { + new Promise((_resolve, reject) => { timeout = setTimeout(() => { reject(new TimeoutError(`Query execution timeout after ${executionTimeout / 60} min of waiting`)); }, executionTimeout * 1000); @@ -621,10 +597,8 @@ export class QueryQueue { /** * Returns the list of queries planned to be processed and the list of active * queries. - * - * @returns {Promise} */ - async fetchQueryStageState() { + public async fetchQueryStageState(): Promise { const queueConnection = await this.queueDriver.createConnection(); try { return queueConnection.getQueryStageState(false); @@ -637,12 +611,9 @@ export class QueryQueue { * Returns current state of the specified by the `stageQueryKey` query if it * exists. * - * @param {*} stageQueryKey - * @param {number=} priorityFilter - * @param {QueryStageStateResponse=} queryStageState * @returns {Promise | Promise<{ stage: string, timeElapsed: number }>} */ - async getQueryStage(stageQueryKey, priorityFilter, queryStageState) { + public async getQueryStage(stageQueryKey: QueryKey, priorityFilter?: number, queryStageState?: QueryStageStateResponse) { const [active, toProcess, allQueryDefs] = queryStageState || await this.fetchQueryStageState(); const queryInQueue = Object.values(allQueryDefs).find( @@ -670,12 +641,8 @@ export class QueryQueue { /** * Execute query without adding it to the queue. - * - * @param {*} query - * @param {QueueId} queueId - * @returns {Promise<{ result: undefined | Object, error: string | undefined }>} */ - async processQuerySkipQueue(query, queueId) { + protected async processQuerySkipQueue(query: QueryDef, queueId: QueueId) { const startQueryTime = (new Date()).getTime(); this.logger('Performing query', { queueId, @@ -709,7 +676,7 @@ export class QueryQueue { requestId: query.requestId, timeInQueue: 0 }); - } catch (e) { + } catch (e: any) { executionResult = { error: (e.message || e).toString() // TODO error handling }; @@ -739,14 +706,10 @@ export class QueryQueue { } /** - * Processing query specified by the `queryKey`. This method encapsulate most - * of the logic related with the queues updates, heartbeat, etc. - * - * @param {QueryKeyHash} queryKeyHashed - * @param {QueueId | null} queueId Supported by new Cube Store and Memory - * @return {Promise<{ result: undefined | Object, error: string | undefined }>} + * Processing query specified by the `queryKey`. This method encapsulates most + * of the logic related to the queue updates, heartbeat, etc. */ - async processQuery(queryKeyHashed, queueId) { + protected async processQuery(queryKeyHashed: QueryKeyHash, queueId: QueueId | null): Promise { const queueConnection = await this.queueDriver.createConnection(); let insertedCount; @@ -819,7 +782,7 @@ export class QueryQueue { const queryStream = this.createQueryStream(queryKeyHashed, query.query?.aliasNameToMember); try { - await this.queryHandlers.stream(query.query, queryStream); + await this.streamHandler(query.query, queryStream); // CubeStore has special handling for null executionResult = { streamResult: true @@ -837,8 +800,8 @@ export class QueryQueue { query.query, async (cancelHandler) => { try { - return queueConnection.optimisticQueryUpdate(queryKeyHashed, { cancelHandler }, processingId, queueId); - } catch (e) { + await queueConnection.optimisticQueryUpdate(queryKeyHashed, { cancelHandler }, processingId, queueId); + } catch (e: any) { this.logger('Error while query update', { queueId, queryKey: query.queryKey, @@ -852,7 +815,6 @@ export class QueryQueue { addedToQueueTime: query.addedToQueueTime, }); } - return null; }, ) ) @@ -875,7 +837,7 @@ export class QueryQueue { preAggregation: query.query?.preAggregation, addedToQueueTime: query.addedToQueueTime, }); - } catch (e) { + } catch (e: any) { executionResult = { error: (e.message || e).toString() // TODO error handling }; @@ -959,25 +921,9 @@ export class QueryQueue { activated, queryExists: !!query }); - const currentProcessingId = await queueConnection.freeProcessingLock(queryKeyHashed, processingId, activated); - if (currentProcessingId) { - this.logger('Skipping free processing lock', { - queueId, - processingId, - currentProcessingId, - queryKey: query && query.queryKey || queryKeyHashed, - requestId: query && query.requestId, - queuePrefix: this.redisQueuePrefix, - processingLockAcquired, - query, - insertedCount, - activeKeys, - activated, - queryExists: !!query - }); - } + await queueConnection.freeProcessingLock(queryKeyHashed, processingId, activated); } - } catch (e) { + } catch (e: any) { this.logger('Queue storage error', { queueId, queryKey: query && query.queryKey || queryKeyHashed, @@ -992,18 +938,17 @@ export class QueryQueue { /** * Processing cancel query flow. - * - * @param {QueryDef} query - * @param {QueueId | null} queueId */ - async processCancel(query, queueId) { + protected async processCancel(query: QueryDef, queueId: QueueId | null) { const { queryHandler } = query; + try { if (!this.cancelHandlers[queryHandler]) { throw new Error(`No cancel handler for ${queryHandler}`); } + await this.cancelHandlers[queryHandler](query); - } catch (e) { + } catch (e: any) { this.logger('Error while cancel', { queueId, queryKey: query.queryKey, @@ -1014,13 +959,7 @@ export class QueryQueue { } } - /** - * Returns hash sum of the specified `queryKey`. - * - * @param {QueryKey} queryKey - * @returns {QueryKeyHash} - */ - redisHash(queryKey) { + protected redisHash(queryKey: QueryKey): QueryKeyHash { return this.queueDriver.redisHash(queryKey); } } diff --git a/packages/cubejs-query-orchestrator/test/benchmarks/QueueBench.abstract.ts b/packages/cubejs-query-orchestrator/test/benchmarks/QueueBench.abstract.ts index f3b32872b81e6..1485142d8a753 100644 --- a/packages/cubejs-query-orchestrator/test/benchmarks/QueueBench.abstract.ts +++ b/packages/cubejs-query-orchestrator/test/benchmarks/QueueBench.abstract.ts @@ -1,20 +1,17 @@ -import { CubeStoreDriver, CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver'; +import { CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver'; import crypto from 'crypto'; -import { createPromiseLock, pausePromise } from '@cubejs-backend/shared'; +import { createPromiseLock, MethodName, pausePromise } from '@cubejs-backend/shared'; import { QueueDriverConnectionInterface, QueueDriverInterface, } from '@cubejs-backend/base-driver'; -import { LocalQueueDriver, QueryQueue } from '../../src'; +import { LocalQueueDriver, QueryQueue, QueryQueueOptions } from '../../src'; -export type QueryQueueTestOptions = { - cacheAndQueueDriver?: string, - redisPool?: any, - cubeStoreDriverFactory?: () => Promise, +export type QueryQueueTestOptions = Pick & { beforeAll?: () => Promise, afterAll?: () => Promise, }; function patchQueueDriverConnectionForTrack(connection: QueueDriverConnectionInterface, counters: any): QueueDriverConnectionInterface { - function wrapAsyncMethod(methodName: string): any { - return async function (...args) { + function wrapAsyncMethod>(methodName: M): any { + return async (...args: Parameters) => { if (!(methodName in counters.methods)) { counters.methods[methodName] = { started: 1, @@ -24,7 +21,7 @@ function patchQueueDriverConnectionForTrack(connection: QueueDriverConnectionInt counters.methods[methodName].started++; } - const result = await connection[methodName](...args); + const result = await (connection[methodName] as any)(...args); counters.methods[methodName].finished++; return result; @@ -67,7 +64,7 @@ function patchQueueDriverForTrack(driver: QueueDriverInterface, counters: any): }; } -export function QueryQueueBenchmark(name: string, options: QueryQueueTestOptions = {}) { +export function QueryQueueBenchmark(name: string, options: QueryQueueTestOptions) { (async () => { if (options.beforeAll) { await options.beforeAll(); @@ -119,6 +116,14 @@ export function QueryQueueBenchmark(name: string, options: QueryQueueTestOptions payload: 'a'.repeat(benchSettings.queueResponseSize), }; }, + stream: async (_query, _stream) => { + throw new Error('streaming handler is not supported for testing'); + } + }, + cancelHandlers: { + query: async (_query) => { + console.error('Cancel handler was called for query'); + }, }, continueWaitTimeout: 60 * 2, executionTimeout: 20, @@ -176,7 +181,9 @@ export function QueryQueueBenchmark(name: string, options: QueryQueueTestOptions // eslint-disable-next-line no-bitwise payload: 'a'.repeat(benchSettings.queuePayloadSize) }, 1, { - + stageQueryKey: 1, + requestId: 'request-id', + spanId: 'span-id' }); counters.queueResolved++; diff --git a/packages/cubejs-query-orchestrator/test/unit/PreAggregations.test.js b/packages/cubejs-query-orchestrator/test/unit/PreAggregations.test.js index 9f7c5ff8cc9f0..d72d7e5492ad5 100644 --- a/packages/cubejs-query-orchestrator/test/unit/PreAggregations.test.js +++ b/packages/cubejs-query-orchestrator/test/unit/PreAggregations.test.js @@ -166,6 +166,7 @@ describe('PreAggregations', () => { }; jest.resetModules(); + const { QueryCache } = require('../../src/orchestrator/QueryCache'); queryCache = new QueryCache( 'TEST', @@ -173,6 +174,7 @@ describe('PreAggregations', () => { // eslint-disable-next-line @typescript-eslint/no-empty-function () => {}, { + cacheAndQueueDriver: 'memory', queueOptions: () => ({ executionTimeout: 1, concurrency: 2, diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryCache.abstract.ts b/packages/cubejs-query-orchestrator/test/unit/QueryCache.abstract.ts index ad40e446461cd..58403e93f6504 100644 --- a/packages/cubejs-query-orchestrator/test/unit/QueryCache.abstract.ts +++ b/packages/cubejs-query-orchestrator/test/unit/QueryCache.abstract.ts @@ -8,7 +8,7 @@ export type QueryCacheTestOptions = QueryCacheOptions & { afterAll?: () => Promise, }; -export const QueryCacheTest = (name: string, options?: QueryCacheTestOptions) => { +export const QueryCacheTest = (name: string, options: QueryCacheTestOptions) => { describe(`QueryQueue${name}`, () => { const cache = new QueryCache( crypto.randomBytes(16).toString('hex'), diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryCache.test.ts b/packages/cubejs-query-orchestrator/test/unit/QueryCache.test.ts index c3d7067adfa34..6cdbdb684da2d 100644 --- a/packages/cubejs-query-orchestrator/test/unit/QueryCache.test.ts +++ b/packages/cubejs-query-orchestrator/test/unit/QueryCache.test.ts @@ -1,3 +1,5 @@ import { QueryCacheTest } from './QueryCache.abstract'; -QueryCacheTest('Local'); +QueryCacheTest('Local', { + cacheAndQueueDriver: 'memory', +}); diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts index 57466ce9034d4..52b0d8b47908b 100644 --- a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts +++ b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts @@ -1,21 +1,29 @@ import { Readable } from 'stream'; -import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver'; -import type { QueryKey } from '@cubejs-backend/base-driver'; +import type { QueryKey, QueueDriverInterface } from '@cubejs-backend/base-driver'; import { pausePromise } from '@cubejs-backend/shared'; import crypto from 'crypto'; -import { QueryQueue } from '../../src'; +import { QueryQueue, QueryQueueOptions } from '../../src'; import { processUidRE } from '../../src/orchestrator/utils'; -export type QueryQueueTestOptions = { - cacheAndQueueDriver?: string, - redisPool?: any, - cubeStoreDriverFactory?: () => Promise, +export type QueryQueueTestOptions = Pick & { beforeAll?: () => Promise, afterAll?: () => Promise, }; -export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}) => { +class QueryQueueExtended extends QueryQueue { + declare public queueDriver: QueueDriverInterface; + + public reconcileQueue = super.reconcileQueue; + + public processQuery = super.processQuery; + + public processCancel = super.processCancel; + + public redisHash = super.redisHash; +} + +export const QueryQueueTest = (name: string, options: QueryQueueTestOptions) => { describe(`QueryQueue${name}`, () => { jest.setTimeout(10 * 1000); @@ -30,7 +38,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} const tenantPrefix = crypto.randomBytes(6).toString('hex'); - const queue = new QueryQueue(`${tenantPrefix}#test_query_queue`, { + const queue = new QueryQueueExtended(`${tenantPrefix}#test_query_queue`, { queryHandlers: { foo: async (query) => `${query[0]} bar`, delay: async (query, setCancelHandler) => { @@ -39,21 +47,21 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} await setCancelHandler(result); return delayFn(result, query.delay); }, - stream: async (query, stream) => { - streamCount++; - - // TODO: Fix an issue with a fast execution of stream handler which caused by removal of QueryStream from streams, - // while EventListener doesnt start to listen for started stream event - await pausePromise(250); - - return new Promise((resolve, reject) => { - const readable = Readable.from([]); - readable.once('end', () => resolve(null)); - readable.once('close', () => resolve(null)); - readable.once('error', (err) => reject(err)); - readable.pipe(stream); - }); - }, + }, + streamHandler: async (query, stream) => { + streamCount++; + + // TODO: Fix an issue with a fast execution of stream handler which caused by removal of QueryStream from streams, + // while EventListener doesnt start to listen for started stream event + await pausePromise(250); + + return new Promise((resolve, reject) => { + const readable = Readable.from([]); + readable.once('end', () => resolve(null)); + readable.once('close', () => resolve(null)); + readable.once('error', (err) => reject(err)); + readable.pipe(stream); + }); }, sendProcessMessageFn: async (queryKeyHashed, queueId) => { processMessagePromises.push(queue.processQuery.bind(queue)(queryKeyHashed, queueId)); @@ -62,7 +70,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} processCancelPromises.push(queue.processCancel.bind(queue)(query)); }, cancelHandlers: { - delay: (query) => { + delay: async (query) => { console.log(`cancel call: ${JSON.stringify(query)}`); cancelledQuery = query.queryKey; } @@ -100,10 +108,6 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} // TODO: find out why awaitProcessing doesnt work await pausePromise(1 * 1000); - if (options.redisPool) { - await options.redisPool.cleanup(); - } - if (options.afterAll) { await options.afterAll(); } @@ -116,7 +120,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} } test('gutter', async () => { - const query = ['select * from']; + const query: QueryKey = ['select * from', []]; const result = await queue.executeInQueue('foo', query, query); expect(result).toBe('select * from bar'); }); @@ -139,7 +143,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} }); test('timeout - continue wait', async () => { - const query = ['select * from 2']; + const query: QueryKey = ['select * from 2', []]; let errorString = ''; for (let i = 0; i < 5; i++) { @@ -160,7 +164,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} }); test('timeout', async () => { - const query = ['select * from 3']; + const query: QueryKey = ['select * from 3', []]; // executionTimeout is 2s, 5s is enough await queue.executeInQueue('delay', query, { delay: 5 * 1000, result: '1', isJob: true }); @@ -173,7 +177,11 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} }); test('stage reporting', async () => { - const resultPromise = queue.executeInQueue('delay', '1', { delay: 200, result: '1' }, 0, { stageQueryKey: '1' }); + const resultPromise = queue.executeInQueue('delay', '1', { delay: 200, result: '1' }, 0, { + stageQueryKey: '1', + requestId: 'request-id', + spanId: 'span-id' + }); await delayFn(null, 50); expect((await queue.getQueryStage('1')).stage).toBe('Executing query'); await resultPromise; @@ -181,9 +189,17 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} }); test('priority stage reporting', async () => { - const resultPromise1 = queue.executeInQueue('delay', '31', { delay: 200, result: '1' }, 20, { stageQueryKey: '12' }); + const resultPromise1 = queue.executeInQueue('delay', '31', { delay: 200, result: '1' }, 20, { + stageQueryKey: '12', + requestId: 'request-id', + spanId: 'span-id' + }); await delayFn(null, 50); - const resultPromise2 = queue.executeInQueue('delay', '32', { delay: 200, result: '1' }, 10, { stageQueryKey: '12' }); + const resultPromise2 = queue.executeInQueue('delay', '32', { delay: 200, result: '1' }, 10, { + stageQueryKey: '12', + requestId: 'request-id', + spanId: 'span-id' + }); await delayFn(null, 50); expect((await queue.getQueryStage('12', 10)).stage).toBe('#1 in queue'); @@ -292,8 +308,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} expect(await connection.getOrphanedQueries()).toEqual([ [ connection.redisHash(['1', []]), - // Redis doesnt support queueId, it will return Null - name.includes('Redis') ? null : expect.any(Number) + expect.any(Number) ] ]); } finally { @@ -418,11 +433,11 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} await queue.reconcileQueue(); await redisClient.addToQueue( - keyScore, 'activated1', time, 'handler', ['select'], priority, { stageQueryKey: 'race', requestId: '1', queueId: 1 } + keyScore, 'activated1', time, 'handler', ['select'], priority, { stageQueryKey: 'race', requestId: '1' } ); await redisClient.addToQueue( - keyScore + 100, 'activated2', time + 100, 'handler2', ['select2'], priority, { stageQueryKey: 'race2', requestId: '1', queueId: 2 } + keyScore + 100, 'activated2', time + 100, 'handler2', ['select2'], priority, { stageQueryKey: 'race2', requestId: '1' } ); const processingId1 = await redisClient.getNextProcessingId(); diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.test.ts b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.test.ts index 5f34a6d25df6b..853e5a08ab2bb 100644 --- a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.test.ts +++ b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.test.ts @@ -1,3 +1,5 @@ import { QueryQueueTest } from './QueryQueue.abstract'; -QueryQueueTest('Local'); +QueryQueueTest('Local', { + cacheAndQueueDriver: 'memory', +}); diff --git a/packages/cubejs-server-core/src/core/RefreshScheduler.ts b/packages/cubejs-server-core/src/core/RefreshScheduler.ts index 777ed886899a4..d9d947ec5b1b5 100644 --- a/packages/cubejs-server-core/src/core/RefreshScheduler.ts +++ b/packages/cubejs-server-core/src/core/RefreshScheduler.ts @@ -247,7 +247,7 @@ export class RefreshScheduler { // further executions - queues ready const concurrencies: number[] = []; Object.keys(preaggsQueues).forEach((name) => { - concurrencies.push(preaggsQueues[name].concurrency); + concurrencies.push(preaggsQueues[name].getConcurrency()); }); concurrency = Math.min(...concurrencies); }