Skip to content

Commit 4dce611

Browse files
committed
dev
1 parent 03b53ee commit 4dce611

File tree

2 files changed

+53
-64
lines changed

2 files changed

+53
-64
lines changed

packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts

Lines changed: 31 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import {
88
QueryColumnsResult,
99
QueryKey } from '@cubejs-backend/base-driver';
1010

11-
import { QueryCache, QueryBody, TempTable, PreAggTableToTempTable } from './QueryCache';
11+
import { QueryCache, QueryBody, TempTable, PreAggTableToTempTable, QueryWithParams, CacheKey } from './QueryCache';
1212
import { PreAggregations, PreAggregationDescription, getLastUpdatedAtTimestamp } from './PreAggregations';
1313
import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory';
1414
import { QueryStream } from './QueryStream';
@@ -439,6 +439,14 @@ export class QueryOrchestrator {
439439
return this.preAggregations.updateRefreshEndReached();
440440
}
441441

442+
private createMetadataQuery(operation: string, params: Record<string, any>): QueryWithParams {
443+
return [
444+
`METADATA:${operation}`,
445+
[JSON.stringify(params)],
446+
{ external: false, renewalThreshold: 24 * 60 * 60 }
447+
];
448+
}
449+
442450
private async queryDataSourceMetadata<T>(
443451
operation: MetadataOperationType,
444452
params: Record<string, any>,
@@ -452,74 +460,35 @@ export class QueryOrchestrator {
452460
): Promise<T> {
453461
const {
454462
requestId,
455-
forceRefresh,
456-
expiration
463+
forceRefresh = false,
464+
renewalThreshold = 24 * 60 * 60,
465+
expiration = 7 * 24 * 60 * 60,
457466
} = options;
458467

459-
// Create a unique cache key for this metadata request
460-
const cacheKey = `METADATA:${operation}:${dataSource}:${JSON.stringify(params)}`;
461-
const cacheDriver = this.queryCache.getCacheDriver();
462-
463-
// Check cache first (unless forceRefresh is true)
464-
if (!forceRefresh) {
465-
try {
466-
const cachedResult = await cacheDriver.get(cacheKey);
467-
if (cachedResult && cachedResult.result) {
468-
this.logger('Found cached metadata result', { cacheKey, operation, dataSource });
469-
return cachedResult.result;
470-
}
471-
} catch (e) {
472-
this.logger('Error reading from cache', { cacheKey, error: e instanceof Error ? e.message : String(e) });
473-
}
474-
}
475-
476-
// If not in cache or forceRefresh, execute through queue
477-
const metadataRequest = {
478-
operation,
479-
params,
480-
external: false,
481-
type: 'metadata'
482-
};
483-
484-
// Create unique queue key for forceRefresh to avoid conflicts
485-
const queueKey = forceRefresh
486-
? `${cacheKey}:${new Date().getTime()}`
487-
: cacheKey;
468+
const metadataQuery = this.createMetadataQuery(operation, params);
469+
const cacheKey: CacheKey = [`METADATA:${operation}`, metadataQuery, dataSource];
488470

489-
// Get queue for the given datasource
490-
const queue = await this.queryCache.getQueue(dataSource);
491-
492-
// Execute metadata request through the queue
493-
const result = await queue.executeInQueue(
494-
'metadata',
495-
queueKey,
471+
const renewalKey = forceRefresh ? undefined : [
472+
`METADATA_RENEWAL:${operation}`,
473+
dataSource,
474+
Math.floor(Date.now() / (renewalThreshold * 1000))
475+
];
476+
477+
return this.queryCache.cacheQueryResult(
478+
metadataQuery,
479+
[],
480+
cacheKey,
481+
expiration,
496482
{
497-
...metadataRequest,
483+
renewalThreshold,
484+
renewalKey,
485+
forceNoCache: forceRefresh,
486+
requestId,
498487
dataSource,
499-
requestId
500-
},
501-
10, // priority
502-
{
503-
stageQueryKey: queueKey,
504-
requestId
488+
useInMemory: true,
489+
waitForRenew: true,
505490
}
506491
);
507-
508-
// Store result in cache driver (unless forceRefresh)
509-
if (!forceRefresh && result) {
510-
try {
511-
const cacheValue = {
512-
time: new Date().getTime(),
513-
result
514-
};
515-
await cacheDriver.set(cacheKey, cacheValue, expiration || (24 * 60 * 60)); // Default 24 hours
516-
this.logger('Stored metadata result in cache', { cacheKey, operation, dataSource });
517-
} catch (e) {
518-
this.logger('Error storing in cache', { cacheKey, error: e instanceof Error ? e.message : String(e) });
519-
}
520-
}
521-
522-
return result;
523492
}
524493

525494
/**

packages/cubejs-query-orchestrator/test/unit/QueryOrchestrator.test.js

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,23 @@ class MockDriver {
2525
query(query) {
2626
this.executedQueries.push(query);
2727

28-
// Handle metadata operations using the new approach
28+
// Handle metadata operations - check if query is an array with metadata operation
29+
if (Array.isArray(query) && query.length > 0 && typeof query[0] === 'string') {
30+
const operation = query[0];
31+
if (operation === 'METADATA:GET_SCHEMAS') {
32+
return this.getSchemas();
33+
} else if (operation === 'METADATA:GET_TABLES_FOR_SCHEMAS') {
34+
// Parse parameters from the query array
35+
const params = query[1] && query[1].length > 0 ? JSON.parse(query[1][0]) : {};
36+
return this.getTablesForSpecificSchemas(params.schemas || []);
37+
} else if (operation === 'METADATA:GET_COLUMNS_FOR_TABLES') {
38+
// Parse parameters from the query array
39+
const params = query[1] && query[1].length > 0 ? JSON.parse(query[1][0]) : {};
40+
return this.getColumnsForSpecificTables(params.tables || []);
41+
}
42+
}
43+
44+
// Handle metadata operations using the new approach (legacy format)
2945
if (query && typeof query === 'object' && query.type === 'metadata') {
3046
const { operation, params = {} } = query;
3147
if (operation === 'GET_SCHEMAS') {
@@ -38,7 +54,11 @@ class MockDriver {
3854
return Promise.resolve([]);
3955
}
4056

41-
// Handle regular SQL queries
57+
// Handle regular SQL queries - ensure query is a string
58+
if (typeof query !== 'string') {
59+
return Promise.resolve([]);
60+
}
61+
4262
let promise = Promise.resolve([query]);
4363
if (query.match('orders_too_big')) {
4464
promise = promise.then((res) => new Promise(resolve => setTimeout(() => resolve(res), 3000)));

0 commit comments

Comments
 (0)