diff --git a/.gitignore b/.gitignore index 20be737ddce44..2b7750d4bfe28 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,4 @@ rust/cubesql/profile.json .cubestore .env .vimspector.json +.claude/settings.local.json diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000000000..9a4f718d7bd33 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,137 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Repository Overview + +Cube is a semantic layer for building data applications. This is a monorepo containing the complete Cube ecosystem including: +- Cube backend server and core components +- Client libraries for JavaScript/React/Vue/Angular +- Database drivers for various data sources +- Documentation site +- Rust components (CubeSQL, CubeStore) + +## Development Commands + +**Note: This project uses Yarn as the package manager.** + +### Core Build Commands +```bash +# Build all packages +yarn build + +# Run TypeScript compilation across all packages +yarn tsc + +# Watch mode for TypeScript compilation +yarn tsc:watch + +# Clean build artifacts +yarn clean + +# Run linting across all packages +yarn lint + +# Fix linting issues +yarn lint:fix + +# Lint package.json files +yarn lint:npm +``` + +### Testing Commands +```bash +# Run tests (most packages have individual test commands) +yarn test + +# Test individual packages +cd packages/cubejs-[package-name] +yarn test +``` + +### Documentation Development +The documentation is in `/docs` directory: +```bash +cd docs +yarn dev # Start development server +yarn build # Build for production +``` + +## Architecture Overview + +### Monorepo Structure +- **`/packages`**: All JavaScript/TypeScript packages managed by Lerna + - Core packages: `cubejs-server-core`, `cubejs-schema-compiler`, `cubejs-query-orchestrator` + - Client libraries: `cubejs-client-core`, `cubejs-client-react`, etc. + - Database drivers: `cubejs-postgres-driver`, `cubejs-bigquery-driver`, etc. + - API layer: `cubejs-api-gateway` +- **`/rust`**: Rust components including CubeSQL (SQL interface) and CubeStore (distributed storage) +- **`/docs`**: Next.js documentation site +- **`/examples`**: Example implementations and recipes + +### Key Components +1. **Schema Compiler**: Compiles data models into executable queries +2. **Query Orchestrator**: Manages query execution, caching, and pre-aggregations +3. **API Gateway**: Provides REST, GraphQL, and SQL APIs +4. **CubeSQL**: Postgres-compatible SQL interface (Rust) +5. **CubeStore**: Distributed OLAP storage engine (Rust) + +### Package Management +- Uses Yarn workspaces with Lerna for package management +- TypeScript compilation is coordinated across packages +- Jest for unit testing with package-specific configurations + +## Testing Approach + +### Unit Tests +- Most packages have Jest-based unit tests in `/test` directories +- TypeScript packages use `jest.config.js` with TypeScript compilation +- Snapshot testing for SQL compilation and query planning + +### Integration Tests +- Driver-specific integration tests in `/packages/cubejs-testing-drivers` +- End-to-end tests in `/packages/cubejs-testing` +- Docker-based testing environments for database drivers + +### Test Commands +```bash +# Individual package testing +cd packages/[package-name] +yarn test + +# Driver integration tests (requires Docker) +cd packages/cubejs-testing-drivers +yarn test +``` + +## Development Workflow + +1. **Making Changes**: Work in individual packages, changes are coordinated via Lerna +2. **Building**: Use `yarn tsc` to compile TypeScript across all packages +3. **Testing**: Run relevant tests for modified packages +4. **Linting**: Ensure code passes `yarn lint` before committing + +## Common File Patterns + +- `*.test.ts/js`: Jest unit tests +- `jest.config.js`: Jest configuration per package +- `tsconfig.json`: TypeScript configuration (inherits from root) +- `CHANGELOG.md`: Per-package changelogs maintained by Lerna +- `src/`: Source code directory +- `dist/`: Compiled output (not committed) + +## Important Notes + +- This is documentation for the old Cube docs site structure (the existing `/docs/CLAUDE.md` refers to the documentation site) +- The main Cube application development happens in `/packages` +- For data model changes, focus on `cubejs-schema-compiler` package +- For query execution changes, focus on `cubejs-query-orchestrator` package +- Database connectivity is handled by individual driver packages + +## Key Dependencies + +- **Lerna**: Monorepo management and publishing +- **TypeScript**: Primary language for most packages +- **Jest**: Testing framework +- **Rollup**: Bundling for client libraries +- **Docker**: Testing environments for database drivers \ No newline at end of file diff --git a/packages/cubejs-api-gateway/src/SubscriptionServer.ts b/packages/cubejs-api-gateway/src/SubscriptionServer.ts index b6df6d501ae9c..15cbe057c0b52 100644 --- a/packages/cubejs-api-gateway/src/SubscriptionServer.ts +++ b/packages/cubejs-api-gateway/src/SubscriptionServer.ts @@ -12,7 +12,6 @@ const methodParams: Record = { meta: [], subscribe: ['query', 'queryType'], unsubscribe: [], - 'subscribe.queue.events': [] }; const calcMessageLength = (message: unknown) => Buffer.byteLength( @@ -149,8 +148,6 @@ export class SubscriptionServer { } public async disconnect(connectionId: string) { - const authContext = await this.subscriptionStore.getAuthContext(connectionId); - await this.apiGateway.unSubscribeQueueEvents({ context: authContext, connectionId }); await this.subscriptionStore.cleanupSubscriptions(connectionId); } diff --git a/packages/cubejs-api-gateway/src/gateway.ts b/packages/cubejs-api-gateway/src/gateway.ts index 708f8e7b0e933..dd9f0985bde99 100644 --- a/packages/cubejs-api-gateway/src/gateway.ts +++ b/packages/cubejs-api-gateway/src/gateway.ts @@ -2067,21 +2067,6 @@ class ApiGateway { } } - public async subscribeQueueEvents({ context, signedWithPlaygroundAuthSecret, connectionId, res }) { - if (this.enforceSecurityChecks && !signedWithPlaygroundAuthSecret) { - throw new CubejsHandlerError( - 403, - 'Forbidden', - 'Only for signed with playground auth secret' - ); - } - return (await this.getAdapterApi(context)).subscribeQueueEvents(connectionId, res); - } - - public async unSubscribeQueueEvents({ context, connectionId }) { - return (await this.getAdapterApi(context)).unSubscribeQueueEvents(connectionId); - } - public async subscribe({ query, context, res, subscribe, subscriptionState, queryType, apiType }) { diff --git a/packages/cubejs-backend-shared/src/env.ts b/packages/cubejs-backend-shared/src/env.ts index dd0391a68a0d2..3b0a155f880f3 100644 --- a/packages/cubejs-backend-shared/src/env.ts +++ b/packages/cubejs-backend-shared/src/env.ts @@ -2090,9 +2090,6 @@ const variables: Record any> = { livePreview: () => get('CUBEJS_LIVE_PREVIEW') .default('true') .asBoolStrict(), - preAggregationsQueueEventsBus: () => get('CUBEJS_PRE_AGGREGATIONS_QUEUE_EVENTS_BUS') - .default('false') - .asBoolStrict(), externalDefault: () => get('CUBEJS_EXTERNAL_DEFAULT') .default('true') .asBoolStrict(), diff --git a/packages/cubejs-base-driver/src/queue-driver.interface.ts b/packages/cubejs-base-driver/src/queue-driver.interface.ts index 3d07c42387563..301a93a95d76c 100644 --- a/packages/cubejs-base-driver/src/queue-driver.interface.ts +++ b/packages/cubejs-base-driver/src/queue-driver.interface.ts @@ -52,7 +52,6 @@ export interface QueueDriverOptions { continueWaitTimeout: number, orphanedTimeout: number, heartBeatTimeout: number, - getQueueEventsBus?: any, processUid?: string; } diff --git a/packages/cubejs-query-orchestrator/CLAUDE.md b/packages/cubejs-query-orchestrator/CLAUDE.md new file mode 100644 index 0000000000000..a568c49647b91 --- /dev/null +++ b/packages/cubejs-query-orchestrator/CLAUDE.md @@ -0,0 +1,131 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Package Overview + +The Query Orchestrator is a multi-stage querying engine that manages query execution, caching, and pre-aggregations in Cube. It receives pre-aggregation SQL queries and executes them in exact order, ensuring up-to-date data structure and freshness. + +## Development Commands + +**Note: This project uses Yarn as the package manager.** + +```bash +# Build the package +yarn build + +# Build with TypeScript compilation +yarn tsc + +# Watch mode for development +yarn watch + +# Run all tests (unit + integration) +yarn test + +# Run only unit tests +yarn unit + +# Run only integration tests +yarn integration + +# Run CubeStore integration tests specifically +yarn integration:cubestore + +# Run linting +yarn lint + +# Fix linting issues +yarn lint:fix +``` + +## Architecture Overview + +### Core Components + +The Query Orchestrator consists of several interconnected components: + +1. **QueryOrchestrator** (`src/orchestrator/QueryOrchestrator.ts`): Main orchestration class that coordinates query execution and manages drivers +2. **QueryCache** (`src/orchestrator/QueryCache.ts`): Handles query result caching with configurable cache drivers +3. **QueryQueue** (`src/orchestrator/QueryQueue.js`): Manages query queuing and background processing +4. **PreAggregations** (`src/orchestrator/PreAggregations.ts`): Manages pre-aggregation building and loading +5. **DriverFactory** (`src/orchestrator/DriverFactory.ts`): Creates and manages database driver instances + +### Cache and Queue Driver Architecture + +The orchestrator supports multiple backend drivers: +- **Memory**: In-memory caching and queuing (development) +- **CubeStore**: Distributed storage engine (production) +- **Redis**: External Redis-based caching (legacy, being phased out) + +Driver selection logic in `QueryOrchestrator.ts:detectQueueAndCacheDriver()`: +- Explicit configuration via `cacheAndQueueDriver` option +- Environment variables (`CUBEJS_CACHE_AND_QUEUE_DRIVER`) +- Auto-detection: Redis if `CUBEJS_REDIS_URL` exists, CubeStore for production, Memory for development + +### Query Processing Flow + +1. **Query Submission**: Queries enter through QueryOrchestrator +2. **Cache Check**: QueryCache checks for existing results +3. **Queue Management**: QueryQueue handles background execution +4. **Pre-aggregation Processing**: PreAggregations component manages rollup tables +5. **Result Caching**: Results stored via cache driver for future requests + +### Pre-aggregation System + +The pre-aggregation system includes: +- **PreAggregationLoader**: Loads pre-aggregation definitions +- **PreAggregationPartitionRangeLoader**: Handles partition range loading +- **PreAggregationLoadCache**: Manages loading cache for pre-aggregations + +## Testing Structure + +### Unit Tests (`test/unit/`) +- `QueryCache.test.ts`: Query caching functionality +- `QueryQueue.test.ts`: Queue management and processing +- `QueryOrchestrator.test.js`: Main orchestrator logic +- `PreAggregations.test.js`: Pre-aggregation management + +### Integration Tests (`test/integration/`) +- `cubestore/`: CubeStore-specific integration tests +- Tests real database interactions and queue processing + +### Test Abstractions +- `QueryCache.abstract.ts`: Shared test suite for cache implementations +- `QueryQueue.abstract.ts`: Shared test suite for queue implementations + +## Key Design Patterns + +### Queue Processing Architecture +The DEVELOPMENT.md file contains detailed sequence diagrams showing: +- Queue interaction with CubeStore via specific queue commands (`QUEUE ADD`, `QUEUE RETRIEVE`, etc.) +- Background query processing with heartbeat management +- Result handling and cleanup + +### Driver Factory Pattern +- `DriverFactory` type enables pluggable database drivers +- `DriverFactoryByDataSource` supports multi-tenant scenarios +- Separation between external (user data) and internal (cache/queue) drivers + +### Error Handling +- `ContinueWaitError`: Signals when queries should continue waiting +- `TimeoutError`: Handles query timeout scenarios +- Proper cleanup and resource management across all components + +## Configuration + +Key configuration options in `QueryOrchestratorOptions`: +- `externalDriverFactory`: Database driver for user data +- `cacheAndQueueDriver`: Backend for caching and queuing +- `queryCacheOptions`: Cache-specific settings +- `preAggregationsOptions`: Pre-aggregation configuration +- `rollupOnlyMode`: When enabled, only serves pre-aggregated data +- `continueWaitTimeout`: Timeout for waiting operations + +## Development Notes + +- Uses TypeScript with relaxed strict settings (`tsconfig.json`) +- Inherits linting rules from `@cubejs-backend/linter` +- Jest configuration extends base repository config +- Docker Compose setup for integration testing +- Coverage reports generated in `coverage/` directory \ No newline at end of file diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueEventsBus.ts b/packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueEventsBus.ts deleted file mode 100644 index bd1141ae925e2..0000000000000 --- a/packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueEventsBus.ts +++ /dev/null @@ -1,11 +0,0 @@ -export class BaseQueueEventsBus { - protected readonly subscribers: Record = {}; - - public subscribe(id: string, callback) { - this.subscribers[id] = { id, callback }; - } - - public unsubscribe(id: string) { - delete this.subscribers[id]; - } -} diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js b/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js index 4ebb67a5747e6..74c1d6bd39b50 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js @@ -21,7 +21,6 @@ export class LocalQueueDriverConnection { this.heartBeat = driver.heartBeat; this.processingCounter = driver.processingCounter; this.processingLocks = driver.processingLocks; - this.getQueueEventsBus = options.getQueueEventsBus; } async getQueriesToCancel() { @@ -169,15 +168,6 @@ export class LocalQueueDriverConnection { queueId: options.queueId, }; - if (this.getQueueEventsBus) { - this.getQueueEventsBus().emit({ - event: 'addedToQueue', - redisQueuePrefix: this.redisQueuePrefix, - queryKey: this.redisHash(queryKey), - payload: queryQueueObj - }); - } - return [ added, queryQueueObj.queueId, @@ -209,15 +199,6 @@ export class LocalQueueDriverConnection { async cancelQuery(queryKey) { const [query] = await this.getQueryAndRemove(queryKey); - if (this.getQueueEventsBus) { - this.getQueueEventsBus().emit({ - event: 'cancelQuery', - redisQueuePrefix: this.redisQueuePrefix, - queryKey: this.redisHash(queryKey), - payload: query - }); - } - return query; } @@ -236,15 +217,6 @@ export class LocalQueueDriverConnection { promise.resolved = true; promise.resolve(executionResult); - if (this.getQueueEventsBus) { - this.getQueueEventsBus().emit({ - event: 'setResultAndRemoveQuery', - redisQueuePrefix: this.redisQueuePrefix, - queryKey: this.redisHash(queryKey), - payload: executionResult - }); - } - return true; } @@ -303,15 +275,6 @@ export class LocalQueueDriverConnection { this.heartBeat[key] = { key, order: new Date().getTime() }; - if (this.getQueueEventsBus) { - this.getQueueEventsBus().emit({ - event: 'retrievedForProcessing', - redisQueuePrefix: this.redisQueuePrefix, - queryKey: this.redisHash(queryKey), - payload: this.queryDef[key] - }); - } - return [ added, this.queryDef[key]?.queueId, diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueEventsBus.ts b/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueEventsBus.ts deleted file mode 100644 index 0e73743365341..0000000000000 --- a/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueEventsBus.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { BaseQueueEventsBus } from './BaseQueueEventsBus'; - -export class LocalQueueEventsBus extends BaseQueueEventsBus { - public emit(event) { - Promise.all(Object.values(this.subscribers).map(({ callback }) => callback(event))) - .catch(err => console.error(err)); - } -} diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts index c45d9156c7d34..3fe3a1ab7305b 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts @@ -260,8 +260,6 @@ export class PreAggregations { private readonly queue: Record = {}; - private readonly getQueueEventsBus: any; - private readonly touchCache: LRUCache; public constructor( @@ -279,7 +277,6 @@ export class PreAggregations { this.dropPreAggregationsWithoutTouch = options.dropPreAggregationsWithoutTouch || getEnv('dropPreAggregationsWithoutTouch'); this.usedTablePersistTime = options.usedTablePersistTime || getEnv('dbQueryTimeout'); this.externalRefresh = options.externalRefresh; - this.getQueueEventsBus = options.getQueueEventsBus; this.touchCache = new LRUCache({ max: getEnv('touchPreAggregationCacheMaxCount'), ttl: getEnv('touchPreAggregationCacheMaxAge') * 1000, @@ -634,7 +631,6 @@ export class PreAggregations { // Centralized continueWaitTimeout that can be overridden in queueOptions continueWaitTimeout: this.options.continueWaitTimeout, ...queueOptions, - getQueueEventsBus: this.getQueueEventsBus, } ); } @@ -672,7 +668,6 @@ export class PreAggregations { return loadCache.fetchTables(preAggregation); }, { - getQueueEventsBus: this.getQueueEventsBus, concurrency: 4, logger: this.logger, cacheAndQueueDriver: this.options.cacheAndQueueDriver, diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index 035c8ea40adc1..cf9659686c237 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -566,7 +566,6 @@ export class QueryCache { options: Record = {} ): QueryQueue { const queue: any = new QueryQueue(redisPrefix, { - getQueueEventsBus: options.getQueueEventsBus, queryHandlers: { query: async (req, setCancelHandle) => { const client = await clientFactory(); diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts index a9c9340d8bc08..cb326efc8d2ea 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts @@ -6,7 +6,6 @@ import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver'; import { QueryCache, QueryBody, TempTable, PreAggTableToTempTable } from './QueryCache'; import { PreAggregations, PreAggregationDescription, getLastUpdatedAtTimestamp } from './PreAggregations'; import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory'; -import { LocalQueueEventsBus } from './LocalQueueEventsBus'; import { QueryStream } from './QueryStream'; export type CacheAndQueryDriverType = 'memory' | 'cubestore' | /** removed, used for exception */ 'redis'; @@ -55,8 +54,6 @@ export class QueryOrchestrator { protected readonly rollupOnlyMode: boolean; - private queueEventsBus: LocalQueueEventsBus; - protected readonly cacheAndQueueDriver: string; public constructor( @@ -116,21 +113,10 @@ export class QueryOrchestrator { continueWaitTimeout, skipExternalCacheAndQueue, ...options.preAggregationsOptions, - getQueueEventsBus: - getEnv('preAggregationsQueueEventsBus') && - this.getQueueEventsBus.bind(this) } ); } - private getQueueEventsBus() { - if (!this.queueEventsBus) { - this.queueEventsBus = new LocalQueueEventsBus(); - } - - return this.queueEventsBus; - } - /** * Returns QueryCache instance. */ @@ -438,14 +424,6 @@ export class QueryOrchestrator { return this.preAggregations.cancelQueriesFromQueue(queryKeys, dataSource); } - public async subscribeQueueEvents(id: string, callback) { - return this.getQueueEventsBus().subscribe(id, callback); - } - - public async unSubscribeQueueEvents(id: string) { - return this.getQueueEventsBus().unsubscribe(id); - } - public async updateRefreshEndReached() { return this.preAggregations.updateRefreshEndReached(); } diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js index 06fbe8f7d49ae..4c3e04966fdc2 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js @@ -123,7 +123,6 @@ export class QueryQueue { heartBeatTimeout: this.heartBeatInterval * 4, redisPool: options.redisPool, cubeStoreDriverFactory: options.cubeStoreDriverFactory, - getQueueEventsBus: options.getQueueEventsBus, processUid: this.processUid, }; diff --git a/packages/cubejs-server-core/src/core/OrchestratorApi.ts b/packages/cubejs-server-core/src/core/OrchestratorApi.ts index df47cc0d8ecc5..77caf0d556c3f 100644 --- a/packages/cubejs-server-core/src/core/OrchestratorApi.ts +++ b/packages/cubejs-server-core/src/core/OrchestratorApi.ts @@ -297,14 +297,6 @@ export class OrchestratorApi { return this.orchestrator.cancelPreAggregationQueriesFromQueue(queryKeys, dataSource); } - public async subscribeQueueEvents(id: string, callback) { - return this.orchestrator.subscribeQueueEvents(id, callback); - } - - public async unSubscribeQueueEvents(id: string) { - return this.orchestrator.unSubscribeQueueEvents(id); - } - public async updateRefreshEndReached() { return this.orchestrator.updateRefreshEndReached(); }