Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion packages/cubejs-backend-shared/src/type-helpers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* This module export only type helpers for using it across Cube.js project
* This module exports only type helpers for using it across the Cube project
*/

export type ResolveAwait<T> = T extends {
Expand All @@ -16,3 +16,6 @@ export type Required<T, K extends keyof T> = {
};

export type Optional<T, K extends keyof T> = Pick<Partial<T>, K> & Omit<T, K>;

// <M extends Method<Class/Interface, M>>
export type MethodName<T> = { [K in keyof T]: T[K] extends (...args: any[]) => any ? K : never }[keyof T];
16 changes: 9 additions & 7 deletions packages/cubejs-base-driver/src/queue-driver.interface.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export type QueryDef = unknown;
export type QueryDef = any;
// Primary key of Queue item
export type QueueId = string | number | bigint;
// This was used as a lock for Redis, deprecated.
Expand Down Expand Up @@ -38,10 +38,12 @@ export interface AddToQueueQuery {
}

export interface AddToQueueOptions {
stageQueryKey: string,
// It's an ugly workaround for skip queue tasks
queueId?: QueueId,
stageQueryKey?: any,
requestId: string,
spanId?: string,
orphanedTimeout?: number,
queueId: QueueId,
}

export interface QueueDriverOptions {
Expand Down Expand Up @@ -70,19 +72,19 @@ export interface QueueDriverConnectionInterface {
* @param options
*/
addToQueue(keyScore: number, queryKey: QueryKey, orphanedTime: number, queryHandler: string, query: AddToQueueQuery, priority: number, options: AddToQueueOptions): Promise<AddToQueueResponse>;
// Return query keys that were sorted by priority and time
// Return query keys which was sorted by priority and time
getToProcessQueries(): Promise<QueryKeysTuple[]>;
getActiveQueries(): Promise<QueryKeysTuple[]>;
getQueryDef(hash: QueryKeyHash, queueId: QueueId | null): Promise<QueryDef | null>;
// Queries that were added to queue, but was not processed and not needed
// Queries which was added to queue, but was not processed and not needed
getOrphanedQueries(): Promise<QueryKeysTuple[]>;
// Queries that were not completed with old heartbeat
// Queries which was not completed with old heartbeat
getStalledQueries(): Promise<QueryKeysTuple[]>;
getQueryStageState(onlyKeys: boolean): Promise<QueryStageStateResponse>;
updateHeartBeat(hash: QueryKeyHash, queueId: QueueId | null): Promise<void>;
getNextProcessingId(): Promise<ProcessingId>;
// Trying to acquire a lock for processing a queue item, this method can return null when
// multiple nodes try to process the same query
// multiple nodes tries to process the same query
retrieveForProcessing(hash: QueryKeyHash, processingId: ProcessingId): Promise<RetrieveForProcessingResponse>;
freeProcessingLock(hash: QueryKeyHash, processingId: ProcessingId, activated: unknown): Promise<void>;
optimisticQueryUpdate(hash: QueryKeyHash, toUpdate: unknown, processingId: ProcessingId, queueId: QueueId | null): Promise<boolean>;
Expand Down
10 changes: 5 additions & 5 deletions packages/cubejs-query-orchestrator/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ yarn test
# Run only unit tests
yarn unit

# Run only integration tests
# Run only integration tests
yarn integration

# Run CubeStore integration tests specifically
Expand All @@ -46,16 +46,16 @@ yarn lint:fix
The Query Orchestrator consists of several interconnected components:

1. **QueryOrchestrator** (`src/orchestrator/QueryOrchestrator.ts`): Main orchestration class that coordinates query execution and manages drivers
2. **QueryCache** (`src/orchestrator/QueryCache.ts`): Handles query result caching with configurable cache drivers
3. **QueryQueue** (`src/orchestrator/QueryQueue.js`): Manages query queuing and background processing
2. **QueryCache** (`src/orchestrator/QueryCache.ts`): Handles query result caching with configurable cache drivers
3. **QueryQueue** (`src/orchestrator/QueryQueue.ts`): Manages query queuing and background processing
4. **PreAggregations** (`src/orchestrator/PreAggregations.ts`): Manages pre-aggregation building and loading
5. **DriverFactory** (`src/orchestrator/DriverFactory.ts`): Creates and manages database driver instances

### Cache and Queue Driver Architecture

The orchestrator supports multiple backend drivers:
- **Memory**: In-memory caching and queuing (development)
- **CubeStore**: Distributed storage engine (production)
- **CubeStore**: Distributed storage engine (production)
- **Redis**: External Redis-based caching (legacy, being phased out)

Driver selection logic in `QueryOrchestrator.ts:detectQueueAndCacheDriver()`:
Expand Down Expand Up @@ -128,4 +128,4 @@ Key configuration options in `QueryOrchestratorOptions`:
- Inherits linting rules from `@cubejs-backend/linter`
- Jest configuration extends base repository config
- Docker Compose setup for integration testing
- Coverage reports generated in `coverage/` directory
- Coverage reports generated in `coverage/` directory
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,8 @@ export class PreAggregationLoader {
const queue = await this.preAggregations.getQueue(this.preAggregation.dataSource);
return queue.executeInQueue(
'query',
this.preAggregationQueryKey(invalidationKeys),
// TODO: Sync types
this.preAggregationQueryKey(invalidationKeys) as any,
{
preAggregation: this.preAggregation,
preAggregationsTablesToTempTables: this.preAggregationsTablesToTempTables,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,10 +376,9 @@ export class PreAggregations {
tables = tables.filter(row => `${schema}.${row.table_name}` === table);

// fetching query result
const { queueDriver } = this.queue[dataSource];
const conn = await queueDriver.createConnection();
const conn = await this.queue[dataSource].getQueueDriver().createConnection();
const result = await conn.getResult(key);
queueDriver.release(conn);
this.queue[dataSource].getQueueDriver().release(conn);

// calculating status
let status: string;
Expand Down
114 changes: 57 additions & 57 deletions packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import {
InlineTables,
CacheDriverInterface,
TableStructure,
DriverInterface,
DriverInterface, QueryKey,
} from '@cubejs-backend/base-driver';

import { QueryQueue } from './QueryQueue';
import { QueryQueue, QueryQueueOptions } from './QueryQueue';
import { ContinueWaitError } from './ContinueWaitError';
import { LocalCacheDriver } from './LocalCacheDriver';
import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory';
Expand Down Expand Up @@ -115,7 +115,7 @@ export interface QueryCacheOptions {
}>;
cubeStoreDriverFactory?: () => Promise<CubeStoreDriver>,
continueWaitTimeout?: number;
cacheAndQueueDriver?: CacheAndQueryDriverType;
cacheAndQueueDriver: CacheAndQueryDriverType;
maxInMemoryCacheEntries?: number;
skipExternalCacheAndQueue?: boolean;
}
Expand All @@ -133,7 +133,7 @@ export class QueryCache {
protected readonly redisPrefix: string,
protected readonly driverFactory: DriverFactoryByDataSource,
protected readonly logger: any,
public readonly options: QueryCacheOptions = {}
public readonly options: QueryCacheOptions
) {
switch (options.cacheAndQueueDriver || 'memory') {
case 'memory':
Expand Down Expand Up @@ -455,9 +455,9 @@ export class QueryCache {
};

if (!persistent) {
return queue.executeInQueue('query', cacheKey, _query, priority, opt);
return queue.executeInQueue('query', cacheKey as QueryKey, _query, priority, opt);
} else {
return queue.executeInQueue('stream', cacheKey, {
return queue.executeInQueue('stream', cacheKey as QueryKey, {
..._query,
aliasNameToMember,
}, priority, opt);
Expand Down Expand Up @@ -563,7 +563,7 @@ export class QueryCache {
redisPrefix: string,
clientFactory: DriverFactory,
executeFn: (client: BaseDriver, req: any) => any,
options: Record<string, any> = {}
options: Omit<QueryQueueOptions, 'queryHandlers' | 'cancelHandlers'>
): QueryQueue {
const queue: any = new QueryQueue(redisPrefix, {
queryHandlers: {
Expand All @@ -583,57 +583,57 @@ export class QueryCache {
}
return result;
},
stream: async (req, target) => {
queue.logger('Streaming SQL', { ...req });
await (new Promise((resolve, reject) => {
let logged = false;
Promise
.all([clientFactory()])
.then(([client]) => (<DriverInterface>client).stream(req.query, req.values, { highWaterMark: getEnv('dbQueryStreamHighWaterMark') }))
.then((source) => {
const cleanup = async (error) => {
if (source.release) {
const toRelease = source.release;
delete source.release;
await toRelease();
}
if (error && !target.destroyed) {
target.destroy(error);
}
if (!logged && target.destroyed) {
logged = true;
if (error) {
queue.logger('Streaming done with error', {
query: req.query,
query_values: req.values,
error,
});
reject(error);
} else {
queue.logger('Streaming successfully completed', {
requestId: req.requestId,
});
resolve(req.requestId);
}
},
streamHandler: async (req, target) => {
queue.logger('Streaming SQL', { ...req });
await (new Promise((resolve, reject) => {
let logged = false;
Promise
.all([clientFactory()])
.then(([client]) => (<DriverInterface>client).stream(req.query, req.values, { highWaterMark: getEnv('dbQueryStreamHighWaterMark') }))
.then((source) => {
const cleanup = async (error) => {
if (source.release) {
const toRelease = source.release;
delete source.release;
await toRelease();
}
if (error && !target.destroyed) {
target.destroy(error);
}
if (!logged && target.destroyed) {
logged = true;
if (error) {
queue.logger('Streaming done with error', {
query: req.query,
query_values: req.values,
error,
});
reject(error);
} else {
queue.logger('Streaming successfully completed', {
requestId: req.requestId,
});
resolve(req.requestId);
}
};

source.rowStream.once('end', () => cleanup(undefined));
source.rowStream.once('error', cleanup);
source.rowStream.once('close', () => cleanup(undefined));

target.once('end', () => cleanup(undefined));
target.once('error', cleanup);
target.once('close', () => cleanup(undefined));

source.rowStream.pipe(target);
})
.catch((reason) => {
target.emit('error', reason);
resolve(reason);
});
}));
},
}
};

source.rowStream.once('end', () => cleanup(undefined));
source.rowStream.once('error', cleanup);
source.rowStream.once('close', () => cleanup(undefined));

target.once('end', () => cleanup(undefined));
target.once('error', cleanup);
target.once('close', () => cleanup(undefined));

source.rowStream.pipe(target);
})
.catch((reason) => {
target.emit('error', reason);
resolve(reason);
});
}));
},
cancelHandlers: {
query: async (req) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import R from 'ramda';
import { getEnv } from '@cubejs-backend/shared';
import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver';

import { QueryKey } from '@cubejs-backend/base-driver';
import { QueryCache, QueryBody, TempTable, PreAggTableToTempTable } from './QueryCache';
import { PreAggregations, PreAggregationDescription, getLastUpdatedAtTimestamp } from './PreAggregations';
import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory';
Expand Down Expand Up @@ -306,7 +307,7 @@ export class QueryOrchestrator {

if (pendingPreAggregationIndex === -1) {
const qcQueue = await this.queryCache.getQueue(queryBody.dataSource);
return qcQueue.getQueryStage(QueryCache.queryCacheKey(queryBody));
return qcQueue.getQueryStage(QueryCache.queryCacheKey(queryBody) as QueryKey);
}

const preAggregation = queryBody.preAggregations[pendingPreAggregationIndex];
Expand Down
Loading
Loading