Skip to content

Commit 118ba7c

Browse files
committed
upd
1 parent 6685b6c commit 118ba7c

File tree

3 files changed

+63
-66
lines changed

3 files changed

+63
-66
lines changed

packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -567,38 +567,36 @@ export class QueryCache {
567567
): QueryQueue {
568568
const queue: any = new QueryQueue(redisPrefix, {
569569
queryHandlers: {
570-
query: async (req, setCancelHandle) => {
570+
metadata: async (req, _setCancelHandle) => {
571571
const client = await clientFactory();
572-
573-
// Handle metadata queries
574-
if (req.query && typeof req.query === 'string' && req.query.startsWith('METADATA:')) {
575-
const operation = req.query.replace('METADATA:', '');
576-
const params = req.values && req.values[0] ? JSON.parse(req.values[0]) : {};
577-
578-
switch (operation) {
579-
case MetadataOperationType.GET_SCHEMAS:
580-
queue.logger('Getting datasource schemas', { dataSource: req.dataSource, requestId: req.requestId });
581-
return client.getSchemas();
582-
case MetadataOperationType.GET_TABLES_FOR_SCHEMAS:
583-
queue.logger('Getting tables for schemas', {
584-
dataSource: req.dataSource,
585-
schemaCount: params.schemas?.length || 0,
586-
requestId: req.requestId
587-
});
588-
return client.getTablesForSpecificSchemas(params.schemas);
589-
case MetadataOperationType.GET_COLUMNS_FOR_TABLES:
590-
queue.logger('Getting columns for tables', {
591-
dataSource: req.dataSource,
592-
tableCount: params.tables?.length || 0,
593-
requestId: req.requestId
594-
});
595-
return client.getColumnsForSpecificTables(params.tables);
596-
default:
597-
throw new Error(`Unknown metadata operation: ${operation}`);
598-
}
572+
const { operation } = req;
573+
const params = req.params || {};
574+
575+
switch (operation) {
576+
case MetadataOperationType.GET_SCHEMAS:
577+
queue.logger('Getting datasource schemas', { dataSource: req.dataSource, requestId: req.requestId });
578+
return client.getSchemas();
579+
case MetadataOperationType.GET_TABLES_FOR_SCHEMAS:
580+
queue.logger('Getting tables for schemas', {
581+
dataSource: req.dataSource,
582+
schemaCount: params.schemas?.length || 0,
583+
requestId: req.requestId
584+
});
585+
return client.getTablesForSpecificSchemas(params.schemas);
586+
case MetadataOperationType.GET_COLUMNS_FOR_TABLES:
587+
queue.logger('Getting columns for tables', {
588+
dataSource: req.dataSource,
589+
tableCount: params.tables?.length || 0,
590+
requestId: req.requestId
591+
});
592+
return client.getColumnsForSpecificTables(params.tables);
593+
default:
594+
throw new Error(`Unknown metadata operation: ${operation}`);
599595
}
600-
601-
// Handle regular SQL queries
596+
},
597+
query: async (req, setCancelHandle) => {
598+
const client = await clientFactory();
599+
602600
const resultPromise = executeFn(client, req);
603601
let handle;
604602
if (resultPromise.cancel) {
@@ -666,6 +664,12 @@ export class QueryCache {
666664
}));
667665
},
668666
cancelHandlers: {
667+
metadata: async (req) => {
668+
if (req.cancelHandler && queue.handles[req.cancelHandler]) {
669+
await queue.handles[req.cancelHandler].cancel();
670+
delete queue.handles[req.cancelHandler];
671+
}
672+
},
669673
query: async (req) => {
670674
if (req.cancelHandler && queue.handles[req.cancelHandler]) {
671675
await queue.handles[req.cancelHandler].cancel();

packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts

Lines changed: 23 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import {
99
QueryColumnsResult,
1010
QueryKey } from '@cubejs-backend/base-driver';
1111

12-
import { QueryCache, QueryBody, TempTable, PreAggTableToTempTable, CacheKey, QueryWithParams } from './QueryCache';
12+
import { QueryCache, QueryBody, TempTable, PreAggTableToTempTable } from './QueryCache';
1313
import { PreAggregations, PreAggregationDescription, getLastUpdatedAtTimestamp } from './PreAggregations';
1414
import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory';
1515
import { QueryStream } from './QueryStream';
@@ -482,12 +482,13 @@ export class QueryOrchestrator {
482482
.substring(0, 16);
483483
}
484484

485-
private createMetadataQuery(operation: string, params: Record<string, any>): QueryWithParams {
486-
return [
487-
`METADATA:${operation}`,
488-
[JSON.stringify(params)],
489-
{ external: false, renewalThreshold: 24 * 60 * 60 }
490-
];
485+
private createMetadataRequest(operation: string, params: Record<string, any>) {
486+
return {
487+
operation,
488+
params,
489+
external: false,
490+
type: 'metadata'
491+
};
491492
}
492493

493494
private async queryDataSourceMetadata<T>(
@@ -503,36 +504,30 @@ export class QueryOrchestrator {
503504
): Promise<T> {
504505
const {
505506
requestId,
506-
forceRefresh = false,
507-
renewalThreshold = 24 * 60 * 60,
508-
expiration = 7 * 24 * 60 * 60,
509507
} = options;
510508

511509
const paramsHash = this.createMetadataHash(operation, params);
512510

513-
const metadataQuery = this.createMetadataQuery(operation, params);
514-
const cacheKey: CacheKey = [`METADATA:${operation}`, paramsHash, dataSource];
511+
const metadataRequest = this.createMetadataRequest(operation, params);
512+
// Create a unique string key for this metadata request
513+
const cacheKey = `METADATA:${operation}:${paramsHash}:${dataSource}`;
515514

516-
const renewalKey = forceRefresh ? undefined : [
517-
`METADATA_RENEWAL:${operation}`,
518-
paramsHash,
519-
dataSource,
520-
Math.floor(Date.now() / (renewalThreshold * 1000))
521-
];
515+
// Get queue for the given datasource
516+
const queue = await this.queryCache.getQueue(dataSource);
522517

523-
return this.queryCache.cacheQueryResult(
524-
metadataQuery,
525-
[],
518+
// Execute metadata request through the queue
519+
return queue.executeInQueue(
520+
'metadata',
526521
cacheKey,
527-
expiration,
528522
{
529-
renewalThreshold,
530-
renewalKey,
531-
forceNoCache: forceRefresh,
532-
requestId,
523+
...metadataRequest,
533524
dataSource,
534-
useInMemory: true,
535-
waitForRenew: true,
525+
requestId
526+
},
527+
10, // priority
528+
{
529+
stageQueryKey: cacheKey,
530+
requestId
536531
}
537532
);
538533
}

packages/cubejs-query-orchestrator/test/unit/QueryOrchestrator.test.js

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,14 @@ class MockDriver {
2525
query(query) {
2626
this.executedQueries.push(query);
2727

28-
// Handle metadata operations
29-
if (Array.isArray(query) && query[0] && query[0].startsWith('METADATA:')) {
30-
const operationType = query[0];
31-
if (operationType === 'METADATA:GET_SCHEMAS') {
28+
// Handle metadata operations using the new approach
29+
if (query && typeof query === 'object' && query.type === 'metadata') {
30+
const { operation, params = {} } = query;
31+
if (operation === 'GET_SCHEMAS') {
3232
return this.getSchemas();
33-
} else if (operationType === 'METADATA:GET_TABLES_FOR_SCHEMAS') {
34-
const params = JSON.parse(query[1][0]);
33+
} else if (operation === 'GET_TABLES_FOR_SCHEMAS') {
3534
return this.getTablesForSpecificSchemas(params.schemas);
36-
} else if (operationType === 'METADATA:GET_COLUMNS_FOR_TABLES') {
37-
const params = JSON.parse(query[1][0]);
35+
} else if (operation === 'GET_COLUMNS_FOR_TABLES') {
3836
return this.getColumnsForSpecificTables(params.tables);
3937
}
4038
return Promise.resolve([]);

0 commit comments

Comments
 (0)