Skip to content

Commit 03b53ee

Browse files
committed
dev
1 parent 118ba7c commit 03b53ee

File tree

2 files changed

+81
-65
lines changed

2 files changed

+81
-65
lines changed

packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts

Lines changed: 49 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import * as stream from 'stream';
2-
import * as crypto from 'crypto';
32
import R from 'ramda';
43
import { getEnv } from '@cubejs-backend/shared';
54
import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver';
@@ -440,57 +439,6 @@ export class QueryOrchestrator {
440439
return this.preAggregations.updateRefreshEndReached();
441440
}
442441

443-
private createMetadataHash(operation: MetadataOperationType, params: Record<string, any>): string {
444-
if (!params || Object.keys(params).length === 0) {
445-
return 'empty';
446-
}
447-
448-
const hashData: string[] = [];
449-
450-
switch (operation) {
451-
case MetadataOperationType.GET_SCHEMAS:
452-
return 'all_schemas';
453-
454-
case MetadataOperationType.GET_TABLES_FOR_SCHEMAS:
455-
if (params.schemas && Array.isArray(params.schemas)) {
456-
hashData.push(...params.schemas.map(schema => schema.schema_name).sort());
457-
}
458-
break;
459-
460-
case MetadataOperationType.GET_COLUMNS_FOR_TABLES:
461-
if (params.tables && Array.isArray(params.tables)) {
462-
hashData.push(...params.tables.map(table => `${table.schema_name}.${table.table_name}`).sort());
463-
}
464-
break;
465-
466-
default:
467-
return crypto
468-
.createHash('sha256')
469-
.update(JSON.stringify(params))
470-
.digest('hex')
471-
.substring(0, 16);
472-
}
473-
474-
if (hashData.length === 0) {
475-
return 'empty';
476-
}
477-
478-
return crypto
479-
.createHash('sha256')
480-
.update(hashData.join('|'))
481-
.digest('hex')
482-
.substring(0, 16);
483-
}
484-
485-
private createMetadataRequest(operation: string, params: Record<string, any>) {
486-
return {
487-
operation,
488-
params,
489-
external: false,
490-
type: 'metadata'
491-
};
492-
}
493-
494442
private async queryDataSourceMetadata<T>(
495443
operation: MetadataOperationType,
496444
params: Record<string, any>,
@@ -504,32 +452,74 @@ export class QueryOrchestrator {
504452
): Promise<T> {
505453
const {
506454
requestId,
455+
forceRefresh,
456+
expiration
507457
} = options;
508458

509-
const paramsHash = this.createMetadataHash(operation, params);
459+
// Create a unique cache key for this metadata request
460+
const cacheKey = `METADATA:${operation}:${dataSource}:${JSON.stringify(params)}`;
461+
const cacheDriver = this.queryCache.getCacheDriver();
462+
463+
// Check cache first (unless forceRefresh is true)
464+
if (!forceRefresh) {
465+
try {
466+
const cachedResult = await cacheDriver.get(cacheKey);
467+
if (cachedResult && cachedResult.result) {
468+
this.logger('Found cached metadata result', { cacheKey, operation, dataSource });
469+
return cachedResult.result;
470+
}
471+
} catch (e) {
472+
this.logger('Error reading from cache', { cacheKey, error: e instanceof Error ? e.message : String(e) });
473+
}
474+
}
510475

511-
const metadataRequest = this.createMetadataRequest(operation, params);
512-
// Create a unique string key for this metadata request
513-
const cacheKey = `METADATA:${operation}:${paramsHash}:${dataSource}`;
476+
// If not in cache or forceRefresh, execute through queue
477+
const metadataRequest = {
478+
operation,
479+
params,
480+
external: false,
481+
type: 'metadata'
482+
};
483+
484+
// Create unique queue key for forceRefresh to avoid conflicts
485+
const queueKey = forceRefresh
486+
? `${cacheKey}:${new Date().getTime()}`
487+
: cacheKey;
514488

515489
// Get queue for the given datasource
516490
const queue = await this.queryCache.getQueue(dataSource);
517491

518492
// Execute metadata request through the queue
519-
return queue.executeInQueue(
493+
const result = await queue.executeInQueue(
520494
'metadata',
521-
cacheKey,
495+
queueKey,
522496
{
523497
...metadataRequest,
524498
dataSource,
525499
requestId
526500
},
527501
10, // priority
528502
{
529-
stageQueryKey: cacheKey,
503+
stageQueryKey: queueKey,
530504
requestId
531505
}
532506
);
507+
508+
// Store result in cache driver (unless forceRefresh)
509+
if (!forceRefresh && result) {
510+
try {
511+
const cacheValue = {
512+
time: new Date().getTime(),
513+
result
514+
};
515+
await cacheDriver.set(cacheKey, cacheValue, expiration || (24 * 60 * 60)); // Default 24 hours
516+
this.logger('Stored metadata result in cache', { cacheKey, operation, dataSource });
517+
} catch (e) {
518+
this.logger('Error storing in cache', { cacheKey, error: e instanceof Error ? e.message : String(e) });
519+
}
520+
}
521+
522+
return result;
533523
}
534524

535525
/**

packages/cubejs-query-orchestrator/test/unit/QueryOrchestrator.test.js

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1996,17 +1996,30 @@ describe('QueryOrchestrator', () => {
19961996
test('should cache results based on schema list', async () => {
19971997
const schemas = [{ schema_name: 'public' }];
19981998

1999-
// First call
1999+
// First call - will execute and store in cache
20002000
await metadataOrchestrator.queryTablesForSchemas(schemas);
2001+
2002+
// Add a delay to ensure the first query has completed and cached its result
2003+
await new Promise(resolve => setTimeout(resolve, 100));
2004+
2005+
// Clear the mock calls
2006+
metadataMockDriver.getTablesForSpecificSchemas.mockClear();
2007+
2008+
// Create equivalent but different object instance
2009+
// Our hash function should handle this correctly
2010+
const schemas2 = [{ schema_name: 'public' }];
2011+
20012012
// Second call should use cache
2002-
const result = await metadataOrchestrator.queryTablesForSchemas(schemas);
2013+
const result = await metadataOrchestrator.queryTablesForSchemas(schemas2);
20032014

20042015
expect(result).toEqual([
20052016
{ schema_name: 'public', table_name: 'users' },
20062017
{ schema_name: 'public', table_name: 'orders' },
20072018
{ schema_name: 'public', table_name: 'products' }
20082019
]);
2009-
expect(metadataMockDriver.getTablesForSpecificSchemas).toHaveBeenCalledTimes(1);
2020+
2021+
// Verify driver wasn't called again
2022+
expect(metadataMockDriver.getTablesForSpecificSchemas).not.toHaveBeenCalled();
20102023
});
20112024

20122025
test('should handle empty schema list', async () => {
@@ -2086,10 +2099,21 @@ describe('QueryOrchestrator', () => {
20862099
test('should cache results based on table list', async () => {
20872100
const tables = [{ schema_name: 'public', table_name: 'users' }];
20882101

2089-
// First call
2102+
// First call - will execute and store in cache
20902103
await metadataOrchestrator.queryColumnsForTables(tables);
2104+
2105+
// Add a delay to ensure the first query has completed and cached its result
2106+
await new Promise(resolve => setTimeout(resolve, 100));
2107+
2108+
// Clear the mock calls
2109+
metadataMockDriver.getColumnsForSpecificTables.mockClear();
2110+
2111+
// Create equivalent but different object instance
2112+
// Our hash function should handle this correctly
2113+
const tables2 = [{ schema_name: 'public', table_name: 'users' }];
2114+
20912115
// Second call should use cache
2092-
const result = await metadataOrchestrator.queryColumnsForTables(tables);
2116+
const result = await metadataOrchestrator.queryColumnsForTables(tables2);
20932117

20942118
expect(result).toEqual([
20952119
{
@@ -2114,7 +2138,9 @@ describe('QueryOrchestrator', () => {
21142138
attributes: ['UNIQUE']
21152139
}
21162140
]);
2117-
expect(metadataMockDriver.getColumnsForSpecificTables).toHaveBeenCalledTimes(1);
2141+
2142+
// Verify driver wasn't called again
2143+
expect(metadataMockDriver.getColumnsForSpecificTables).not.toHaveBeenCalled();
21182144
});
21192145

21202146
test('should handle empty table list', async () => {

0 commit comments

Comments
 (0)