diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index 3c07ac545ed5d..ad7ea358ea713 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -18,7 +18,7 @@ import { LocalCacheDriver } from './LocalCacheDriver'; import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory'; import { LoadPreAggregationResult, PreAggregationDescription } from './PreAggregations'; import { getCacheHash } from './utils'; -import { CacheAndQueryDriverType } from './QueryOrchestrator'; +import { CacheAndQueryDriverType, MetadataOperationType } from './QueryOrchestrator'; type QueryOptions = { external?: boolean; @@ -567,8 +567,36 @@ export class QueryCache { ): QueryQueue { const queue: any = new QueryQueue(redisPrefix, { queryHandlers: { + metadata: async (req, _setCancelHandle) => { + const client = await clientFactory(); + const { operation } = req; + const params = req.params || {}; + + switch (operation) { + case MetadataOperationType.GET_SCHEMAS: + queue.logger('Getting datasource schemas', { dataSource: req.dataSource, requestId: req.requestId }); + return client.getSchemas(); + case MetadataOperationType.GET_TABLES_FOR_SCHEMAS: + queue.logger('Getting tables for schemas', { + dataSource: req.dataSource, + schemaCount: params.schemas?.length || 0, + requestId: req.requestId + }); + return client.getTablesForSpecificSchemas(params.schemas); + case MetadataOperationType.GET_COLUMNS_FOR_TABLES: + queue.logger('Getting columns for tables', { + dataSource: req.dataSource, + tableCount: params.tables?.length || 0, + requestId: req.requestId + }); + return client.getColumnsForSpecificTables(params.tables); + default: + throw new Error(`Unknown metadata operation: ${operation}`); + } + }, query: async (req, setCancelHandle) => { const client = await clientFactory(); + const resultPromise = executeFn(client, req); let handle; if (resultPromise.cancel) { @@ -636,6 +664,12 @@ export class QueryCache { })); }, cancelHandlers: { + metadata: async (req) => { + if (req.cancelHandler && queue.handles[req.cancelHandler]) { + await queue.handles[req.cancelHandler].cancel(); + delete queue.handles[req.cancelHandler]; + } + }, query: async (req) => { if (req.cancelHandler && queue.handles[req.cancelHandler]) { await queue.handles[req.cancelHandler].cancel(); diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts index 0e60da3dc32e9..98b9696ffe414 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts @@ -2,9 +2,14 @@ import * as stream from 'stream'; import R from 'ramda'; import { getEnv } from '@cubejs-backend/shared'; import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver'; - -import { QueryKey } from '@cubejs-backend/base-driver'; -import { QueryCache, QueryBody, TempTable, PreAggTableToTempTable } from './QueryCache'; +import { + QuerySchemasResult, + QueryTablesResult, + QueryColumnsResult, + QueryKey +} from '@cubejs-backend/base-driver'; + +import { QueryCache, QueryBody, TempTable, PreAggTableToTempTable, QueryWithParams, CacheKey } from './QueryCache'; import { PreAggregations, PreAggregationDescription, getLastUpdatedAtTimestamp } from './PreAggregations'; import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory'; import { QueryStream } from './QueryStream'; @@ -17,6 +22,12 @@ export enum DriverType { Cache = 'cache', } +export enum MetadataOperationType { + GET_SCHEMAS = 'GET_SCHEMAS', + GET_TABLES_FOR_SCHEMAS = 'GET_TABLES_FOR_SCHEMAS', + GET_COLUMNS_FOR_TABLES = 'GET_COLUMNS_FOR_TABLES' +} + export interface QueryOrchestratorOptions { externalDriverFactory?: DriverFactory; cacheAndQueueDriver?: CacheAndQueryDriverType; @@ -428,4 +439,120 @@ export class QueryOrchestrator { public async updateRefreshEndReached() { return this.preAggregations.updateRefreshEndReached(); } + + private createMetadataQuery(operation: string, params: Record): QueryWithParams { + return [ + `METADATA:${operation}`, + // TODO (@MikeNitsenko): Metadata queries need object params like [{ schema, table }] + // but QueryWithParams expects string[]. This forces JSON.stringify workaround. + [JSON.stringify(params)], + { external: false, renewalThreshold: 24 * 60 * 60 } + ]; + } + + private async queryDataSourceMetadata( + operation: MetadataOperationType, + params: Record, + dataSource: string = 'default', + options: { + requestId?: string; + forceRefresh?: boolean; + renewalThreshold?: number; + expiration?: number; + } = {} + ): Promise { + const { + requestId, + forceRefresh = false, + renewalThreshold = 24 * 60 * 60, + expiration = 7 * 24 * 60 * 60, + } = options; + + const metadataQuery = this.createMetadataQuery(operation, params); + const cacheKey: CacheKey = [metadataQuery, dataSource]; + + const renewalKey = forceRefresh ? undefined : [ + `METADATA_RENEWAL:${operation}`, + dataSource, + Math.floor(Date.now() / (renewalThreshold * 1000)) + ]; + + return this.queryCache.cacheQueryResult( + metadataQuery, + [], + cacheKey, + expiration, + { + renewalThreshold, + renewalKey, + forceNoCache: forceRefresh, + requestId, + dataSource, + useInMemory: true, + waitForRenew: true, + } + ); + } + + /** + * Query the data source for available schemas. + */ + public async queryDataSourceSchemas( + dataSource: string = 'default', + options: { + requestId?: string; + forceRefresh?: boolean; + renewalThreshold?: number; + expiration?: number; + } = {} + ): Promise { + return this.queryDataSourceMetadata( + MetadataOperationType.GET_SCHEMAS, + {}, + dataSource, + options + ); + } + + /** + * Query the data source for tables within the specified schemas. + */ + public async queryTablesForSchemas( + schemas: QuerySchemasResult[], + dataSource: string = 'default', + options: { + requestId?: string; + forceRefresh?: boolean; + renewalThreshold?: number; + expiration?: number; + } = {} + ): Promise { + return this.queryDataSourceMetadata( + MetadataOperationType.GET_TABLES_FOR_SCHEMAS, + { schemas }, + dataSource, + options + ); + } + + /** + * Query the data source for columns within the specified tables. + */ + public async queryColumnsForTables( + tables: QueryTablesResult[], + dataSource: string = 'default', + options: { + requestId?: string; + forceRefresh?: boolean; + renewalThreshold?: number; + expiration?: number; + } = {} + ): Promise { + return this.queryDataSourceMetadata( + MetadataOperationType.GET_COLUMNS_FOR_TABLES, + { tables }, + dataSource, + options + ); + } } diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryOrchestrator.test.js b/packages/cubejs-query-orchestrator/test/unit/QueryOrchestrator.test.js index eb6954ad5bdea..08196c142739d 100644 --- a/packages/cubejs-query-orchestrator/test/unit/QueryOrchestrator.test.js +++ b/packages/cubejs-query-orchestrator/test/unit/QueryOrchestrator.test.js @@ -24,6 +24,38 @@ class MockDriver { query(query) { this.executedQueries.push(query); + + // Handle metadata operations - check if query is an array with metadata operation + if (Array.isArray(query) && query.length > 0 && typeof query[0] === 'string') { + const operation = query[0]; + if (operation === 'METADATA:GET_SCHEMAS') { + return this.getSchemas(); + } else if (operation === 'METADATA:GET_TABLES_FOR_SCHEMAS') { + // Parse parameters from the query array + let params = {}; + try { + params = query[1] && query[1].length > 0 ? JSON.parse(query[1][0]) : {}; + } catch (error) { + console.warn('Failed to parse JSON parameters for METADATA:GET_TABLES_FOR_SCHEMAS:', error); + } + return this.getTablesForSpecificSchemas(params.schemas || []); + } else if (operation === 'METADATA:GET_COLUMNS_FOR_TABLES') { + // Parse parameters from the query array + let params = {}; + try { + params = query[1] && query[1].length > 0 ? JSON.parse(query[1][0]) : {}; + } catch (error) { + console.warn('Failed to parse JSON parameters for METADATA:GET_COLUMNS_FOR_TABLES:', error); + } + return this.getColumnsForSpecificTables(params.tables || []); + } + } + + // Handle regular SQL queries - ensure query is a string + if (typeof query !== 'string') { + return Promise.resolve([]); + } + let promise = Promise.resolve([query]); if (query.match('orders_too_big')) { promise = promise.then((res) => new Promise(resolve => setTimeout(() => resolve(res), 3000))); @@ -1769,4 +1801,429 @@ describe('QueryOrchestrator', () => { } // expect(mockDriver.tables).toContainEqual(expect.stringMatching(/orders_delay/)); }); + + describe('Data Source Metadata Methods', () => { + let metadataOrchestrator; + let metadataMockDriver; + + beforeEach(() => { + metadataMockDriver = new MockDriver(); + + // Mock metadata methods + metadataMockDriver.getSchemas = jest.fn().mockResolvedValue([ + { schema_name: 'public' }, + { schema_name: 'analytics' }, + { schema_name: 'staging' } + ]); + + metadataMockDriver.getTablesForSpecificSchemas = jest.fn().mockImplementation((schemas) => { + const tables = []; + schemas.forEach(schema => { + if (schema.schema_name === 'public') { + tables.push( + { schema_name: 'public', table_name: 'users' }, + { schema_name: 'public', table_name: 'orders' }, + { schema_name: 'public', table_name: 'products' } + ); + } else if (schema.schema_name === 'analytics') { + tables.push( + { schema_name: 'analytics', table_name: 'user_metrics' }, + { schema_name: 'analytics', table_name: 'sales_summary' } + ); + } + }); + return Promise.resolve(tables); + }); + + metadataMockDriver.getColumnsForSpecificTables = jest.fn().mockImplementation((tables) => { + const columns = []; + tables.forEach(table => { + if (table.table_name === 'users') { + columns.push( + { + schema_name: 'public', + table_name: 'users', + column_name: 'id', + data_type: 'integer', + attributes: ['PRIMARY_KEY'] + }, + { + schema_name: 'public', + table_name: 'users', + column_name: 'name', + data_type: 'varchar', + attributes: [] + }, + { + schema_name: 'public', + table_name: 'users', + column_name: 'email', + data_type: 'varchar', + attributes: ['UNIQUE'] + } + ); + } else if (table.table_name === 'orders') { + columns.push( + { + schema_name: 'public', + table_name: 'orders', + column_name: 'id', + data_type: 'integer', + attributes: ['PRIMARY_KEY'] + }, + { + schema_name: 'public', + table_name: 'orders', + column_name: 'user_id', + data_type: 'integer', + attributes: [], + foreign_keys: [{ target_table: 'users', target_column: 'id' }] + }, + { + schema_name: 'public', + table_name: 'orders', + column_name: 'total', + data_type: 'decimal', + attributes: [] + } + ); + } + }); + return Promise.resolve(columns); + }); + + const driverFactory = () => metadataMockDriver; + + metadataOrchestrator = new QueryOrchestrator( + 'ORCHESTRATOR_TEST_METADATA', + driverFactory, + console.log, + { + cacheAndQueueDriver: 'memory', + continueWaitTimeout: 5, + queryCacheOptions: { + queueOptions: () => ({ + concurrency: 2, + processUid: 'metadata_test', + }), + }, + preAggregationsOptions: { + queueOptions: () => ({ + concurrency: 2, + processUid: 'metadata_test', + }), + }, + } + ); + + jest.clearAllMocks(); + + if (metadataOrchestrator && metadataOrchestrator.queryCache && metadataOrchestrator.queryCache.memoryCache) { + metadataOrchestrator.queryCache.memoryCache.clear(); + } + + if (metadataOrchestrator && metadataOrchestrator.queryCache && metadataOrchestrator.queryCache.getCacheDriver()) { + const cacheDriver = metadataOrchestrator.queryCache.getCacheDriver(); + if (cacheDriver.store) { + Object.keys(cacheDriver.store).forEach(key => delete cacheDriver.store[key]); + } + } + }); + + afterEach(async () => { + await metadataOrchestrator.cleanup(); + }); + + describe('queryDataSourceSchemas', () => { + test('should query and cache schemas for default datasource', async () => { + const result = await metadataOrchestrator.queryDataSourceSchemas(); + + expect(result).toEqual([ + { schema_name: 'public' }, + { schema_name: 'analytics' }, + { schema_name: 'staging' } + ]); + }); + + test('should query schemas for specific datasource', async () => { + const result = await metadataOrchestrator.queryDataSourceSchemas('custom'); + + expect(result).toEqual([ + { schema_name: 'public' }, + { schema_name: 'analytics' }, + { schema_name: 'staging' } + ]); + }); + + test('should use cache on second call', async () => { + // First call + await metadataOrchestrator.queryDataSourceSchemas(); + // Second call should use cache + const result = await metadataOrchestrator.queryDataSourceSchemas(); + + expect(result).toEqual([ + { schema_name: 'public' }, + { schema_name: 'analytics' }, + { schema_name: 'staging' } + ]); + }); + + test('should force refresh when requested', async () => { + // First call + await metadataOrchestrator.queryDataSourceSchemas(); + // Second call with forceRefresh + const result = await metadataOrchestrator.queryDataSourceSchemas('default', { forceRefresh: true }); + + expect(result).toEqual([ + { schema_name: 'public' }, + { schema_name: 'analytics' }, + { schema_name: 'staging' } + ]); + }); + + test('should pass requestId option', async () => { + const requestId = 'test-request-123'; + await metadataOrchestrator.queryDataSourceSchemas('default', { requestId }); + + expect(metadataMockDriver.getSchemas).toHaveBeenCalledTimes(1); + }); + }); + + describe('queryTablesForSchemas', () => { + test('should query tables for given schemas', async () => { + const schemas = [ + { schema_name: 'public' }, + { schema_name: 'analytics' } + ]; + + const result = await metadataOrchestrator.queryTablesForSchemas(schemas); + + expect(result).toEqual([ + { schema_name: 'public', table_name: 'users' }, + { schema_name: 'public', table_name: 'orders' }, + { schema_name: 'public', table_name: 'products' }, + { schema_name: 'analytics', table_name: 'user_metrics' }, + { schema_name: 'analytics', table_name: 'sales_summary' } + ]); + expect(metadataMockDriver.getTablesForSpecificSchemas).toHaveBeenCalledWith(schemas); + }); + + test('should cache results based on schema list', async () => { + const schemas = [{ schema_name: 'public' }]; + + // First call - will execute and store in cache + await metadataOrchestrator.queryTablesForSchemas(schemas); + + // Add a delay to ensure the first query has completed and cached its result + await new Promise(resolve => setTimeout(resolve, 100)); + + // Clear the mock calls + metadataMockDriver.getTablesForSpecificSchemas.mockClear(); + + // Create equivalent but different object instance + // Our hash function should handle this correctly + const schemas2 = [{ schema_name: 'public' }]; + + // Second call should use cache + const result = await metadataOrchestrator.queryTablesForSchemas(schemas2); + + expect(result).toEqual([ + { schema_name: 'public', table_name: 'users' }, + { schema_name: 'public', table_name: 'orders' }, + { schema_name: 'public', table_name: 'products' } + ]); + + // Verify driver wasn't called again + expect(metadataMockDriver.getTablesForSpecificSchemas).not.toHaveBeenCalled(); + }); + + test('should handle empty schema list', async () => { + const result = await metadataOrchestrator.queryTablesForSchemas([]); + + expect(result).toEqual([]); + expect(metadataMockDriver.getTablesForSpecificSchemas).toHaveBeenCalledWith([]); + }); + + test('should force refresh when requested', async () => { + const schemas = [{ schema_name: 'public' }]; + + await metadataOrchestrator.queryTablesForSchemas(schemas); + await metadataOrchestrator.queryTablesForSchemas(schemas, 'default', { forceRefresh: true }); + + expect(metadataMockDriver.getTablesForSpecificSchemas).toHaveBeenCalledTimes(2); + }); + }); + + describe('queryColumnsForTables', () => { + test('should query columns for given tables', async () => { + const tables = [ + { schema_name: 'public', table_name: 'users' }, + { schema_name: 'public', table_name: 'orders' } + ]; + + const result = await metadataOrchestrator.queryColumnsForTables(tables); + + expect(result).toEqual([ + { + schema_name: 'public', + table_name: 'users', + column_name: 'id', + data_type: 'integer', + attributes: ['PRIMARY_KEY'] + }, + { + schema_name: 'public', + table_name: 'users', + column_name: 'name', + data_type: 'varchar', + attributes: [] + }, + { + schema_name: 'public', + table_name: 'users', + column_name: 'email', + data_type: 'varchar', + attributes: ['UNIQUE'] + }, + { + schema_name: 'public', + table_name: 'orders', + column_name: 'id', + data_type: 'integer', + attributes: ['PRIMARY_KEY'] + }, + { + schema_name: 'public', + table_name: 'orders', + column_name: 'user_id', + data_type: 'integer', + attributes: [], + foreign_keys: [{ target_table: 'users', target_column: 'id' }] + }, + { + schema_name: 'public', + table_name: 'orders', + column_name: 'total', + data_type: 'decimal', + attributes: [] + } + ]); + expect(metadataMockDriver.getColumnsForSpecificTables).toHaveBeenCalledWith(tables); + }); + + test('should cache results based on table list', async () => { + const tables = [{ schema_name: 'public', table_name: 'users' }]; + + // First call - will execute and store in cache + await metadataOrchestrator.queryColumnsForTables(tables); + + // Add a delay to ensure the first query has completed and cached its result + await new Promise(resolve => setTimeout(resolve, 100)); + + // Clear the mock calls + metadataMockDriver.getColumnsForSpecificTables.mockClear(); + + // Create equivalent but different object instance + // Our hash function should handle this correctly + const tables2 = [{ schema_name: 'public', table_name: 'users' }]; + + // Second call should use cache + const result = await metadataOrchestrator.queryColumnsForTables(tables2); + + expect(result).toEqual([ + { + schema_name: 'public', + table_name: 'users', + column_name: 'id', + data_type: 'integer', + attributes: ['PRIMARY_KEY'] + }, + { + schema_name: 'public', + table_name: 'users', + column_name: 'name', + data_type: 'varchar', + attributes: [] + }, + { + schema_name: 'public', + table_name: 'users', + column_name: 'email', + data_type: 'varchar', + attributes: ['UNIQUE'] + } + ]); + + // Verify driver wasn't called again + expect(metadataMockDriver.getColumnsForSpecificTables).not.toHaveBeenCalled(); + }); + + test('should handle empty table list', async () => { + const result = await metadataOrchestrator.queryColumnsForTables([]); + + expect(result).toEqual([]); + expect(metadataMockDriver.getColumnsForSpecificTables).toHaveBeenCalledWith([]); + }); + + test('should force refresh when requested', async () => { + const tables = [{ schema_name: 'public', table_name: 'users' }]; + + await metadataOrchestrator.queryColumnsForTables(tables); + await metadataOrchestrator.queryColumnsForTables(tables, 'default', { forceRefresh: true }); + + expect(metadataMockDriver.getColumnsForSpecificTables).toHaveBeenCalledTimes(2); + }); + }); + + describe('Integration Tests', () => { + test('should handle full metadata workflow', async () => { + // Query schemas + const schemas = await metadataOrchestrator.queryDataSourceSchemas(); + expect(schemas).toHaveLength(3); + + // Query tables for specific schemas + const publicSchema = schemas.filter(s => s.schema_name === 'public'); + const tables = await metadataOrchestrator.queryTablesForSchemas(publicSchema); + expect(tables).toHaveLength(3); + + // Query columns for specific tables + const userTable = tables.filter(t => t.table_name === 'users'); + const columns = await metadataOrchestrator.queryColumnsForTables(userTable); + expect(columns).toHaveLength(3); + expect(columns[0].column_name).toBe('id'); + expect(columns[0].data_type).toBe('integer'); + expect(columns[0].attributes).toContain('PRIMARY_KEY'); + }); + + test('should handle concurrent metadata requests', async () => { + const schemas = [{ schema_name: 'public' }]; + + // Make concurrent requests + const promises = [ + metadataOrchestrator.queryDataSourceSchemas(), + metadataOrchestrator.queryDataSourceSchemas(), + metadataOrchestrator.queryTablesForSchemas(schemas), + metadataOrchestrator.queryTablesForSchemas(schemas) + ]; + + const results = await Promise.all(promises); + + // All requests should return the same data + expect(results[0]).toEqual(results[1]); + expect(results[2]).toEqual(results[3]); + }); + + test('should handle error scenarios gracefully', async () => { + // Mock driver error + metadataMockDriver.getSchemas.mockRejectedValueOnce(new Error('Database connection failed')); + + await expect(metadataOrchestrator.queryDataSourceSchemas('default', { forceRefresh: true })).rejects.toThrow('Database connection failed'); + + // Should retry on next call + metadataMockDriver.getSchemas.mockResolvedValueOnce([{ schema_name: 'recovered' }]); + const result = await metadataOrchestrator.queryDataSourceSchemas('default', { forceRefresh: true }); + expect(result).toEqual([{ schema_name: 'recovered' }]); + }); + }); + }); });