Skip to content

Commit 312e955

Browse files
committed
dev
1 parent 1140243 commit 312e955

File tree

4 files changed

+223
-605
lines changed

4 files changed

+223
-605
lines changed

packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts

Lines changed: 4 additions & 200 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,6 @@ import {
1010
CacheDriverInterface,
1111
TableStructure,
1212
DriverInterface,
13-
QuerySchemasResult,
14-
QueryTablesResult,
15-
QueryColumnsResult,
1613
} from '@cubejs-backend/base-driver';
1714

1815
import { QueryQueue } from './QueryQueue';
@@ -21,13 +18,7 @@ import { LocalCacheDriver } from './LocalCacheDriver';
2118
import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory';
2219
import { LoadPreAggregationResult, PreAggregationDescription } from './PreAggregations';
2320
import { getCacheHash } from './utils';
24-
import { CacheAndQueryDriverType } from './QueryOrchestrator';
25-
26-
enum MetadataOperation {
27-
GET_SCHEMAS = 'GET_SCHEMAS',
28-
GET_TABLES_FOR_SCHEMAS = 'GET_TABLES_FOR_SCHEMAS',
29-
GET_COLUMNS_FOR_TABLES = 'GET_COLUMNS_FOR_TABLES'
30-
}
21+
import { CacheAndQueryDriverType, MetadataOperationType } from './QueryOrchestrator';
3122

3223
type QueryOptions = {
3324
external?: boolean;
@@ -585,17 +576,17 @@ export class QueryCache {
585576
const params = req.values && req.values[0] ? JSON.parse(req.values[0]) : {};
586577

587578
switch (operation) {
588-
case MetadataOperation.GET_SCHEMAS:
579+
case MetadataOperationType.GET_SCHEMAS:
589580
queue.logger('Getting datasource schemas', { dataSource: req.dataSource, requestId: req.requestId });
590581
return client.getSchemas();
591-
case MetadataOperation.GET_TABLES_FOR_SCHEMAS:
582+
case MetadataOperationType.GET_TABLES_FOR_SCHEMAS:
592583
queue.logger('Getting tables for schemas', {
593584
dataSource: req.dataSource,
594585
schemaCount: params.schemas?.length || 0,
595586
requestId: req.requestId
596587
});
597588
return client.getTablesForSpecificSchemas(params.schemas);
598-
case MetadataOperation.GET_COLUMNS_FOR_TABLES:
589+
case MetadataOperationType.GET_COLUMNS_FOR_TABLES:
599590
queue.logger('Getting columns for tables', {
600591
dataSource: req.dataSource,
601592
tableCount: params.tables?.length || 0,
@@ -1047,191 +1038,4 @@ export class QueryCache {
10471038
public async testConnection() {
10481039
return this.cacheDriver.testConnection();
10491040
}
1050-
1051-
private createMetadataHash(operation: MetadataOperation, params: Record<string, any>): string {
1052-
if (!params || Object.keys(params).length === 0) {
1053-
return 'empty';
1054-
}
1055-
1056-
const hashData: string[] = [];
1057-
1058-
switch (operation) {
1059-
case MetadataOperation.GET_SCHEMAS:
1060-
return 'all_schemas';
1061-
1062-
case MetadataOperation.GET_TABLES_FOR_SCHEMAS:
1063-
if (params.schemas && Array.isArray(params.schemas)) {
1064-
hashData.push(...params.schemas.map(schema => schema.schema_name).sort());
1065-
}
1066-
break;
1067-
1068-
case MetadataOperation.GET_COLUMNS_FOR_TABLES:
1069-
if (params.tables && Array.isArray(params.tables)) {
1070-
hashData.push(...params.tables.map(table => `${table.schema_name}.${table.table_name}`).sort());
1071-
}
1072-
break;
1073-
1074-
default:
1075-
return crypto
1076-
.createHash('sha256')
1077-
.update(JSON.stringify(params))
1078-
.digest('hex')
1079-
.substring(0, 16);
1080-
}
1081-
1082-
if (hashData.length === 0) {
1083-
return 'empty';
1084-
}
1085-
1086-
return crypto
1087-
.createHash('sha256')
1088-
.update(hashData.join('|'))
1089-
.digest('hex')
1090-
.substring(0, 16);
1091-
}
1092-
1093-
private createMetadataQuery(operation: string, params: Record<string, any>): QueryWithParams {
1094-
return [
1095-
`METADATA:${operation}`,
1096-
[JSON.stringify(params)],
1097-
{ external: false, renewalThreshold: 24 * 60 * 60 }
1098-
];
1099-
}
1100-
1101-
private async queryDataSourceMetadata<T>(
1102-
operation: MetadataOperation,
1103-
params: Record<string, any>,
1104-
dataSource: string = 'default',
1105-
options: {
1106-
requestId?: string;
1107-
forceRefresh?: boolean;
1108-
renewalThreshold?: number;
1109-
expiration?: number;
1110-
} = {}
1111-
): Promise<T> {
1112-
const {
1113-
requestId,
1114-
forceRefresh = false,
1115-
renewalThreshold = 24 * 60 * 60,
1116-
expiration = 7 * 24 * 60 * 60,
1117-
} = options;
1118-
1119-
const paramsHash = this.createMetadataHash(operation, params);
1120-
1121-
const metadataQuery = this.createMetadataQuery(operation, params);
1122-
const cacheKey: CacheKey = [`METADATA:${operation}`, paramsHash, dataSource];
1123-
1124-
const renewalKey = forceRefresh ? undefined : [
1125-
`METADATA_RENEWAL:${operation}`,
1126-
paramsHash,
1127-
dataSource,
1128-
Math.floor(Date.now() / (renewalThreshold * 1000))
1129-
];
1130-
1131-
return this.cacheQueryResult(
1132-
metadataQuery,
1133-
[],
1134-
cacheKey,
1135-
expiration,
1136-
{
1137-
renewalThreshold,
1138-
renewalKey,
1139-
forceNoCache: forceRefresh,
1140-
requestId,
1141-
dataSource,
1142-
useInMemory: true,
1143-
waitForRenew: true,
1144-
}
1145-
);
1146-
}
1147-
1148-
public async queryDataSourceSchemas(
1149-
dataSource: string = 'default',
1150-
options: {
1151-
requestId?: string;
1152-
forceRefresh?: boolean;
1153-
renewalThreshold?: number;
1154-
expiration?: number;
1155-
} = {}
1156-
): Promise<QuerySchemasResult[]> {
1157-
return this.queryDataSourceMetadata<QuerySchemasResult[]>(
1158-
MetadataOperation.GET_SCHEMAS,
1159-
{},
1160-
dataSource,
1161-
options
1162-
);
1163-
}
1164-
1165-
public async queryTablesForSchemas(
1166-
schemas: QuerySchemasResult[],
1167-
dataSource: string = 'default',
1168-
options: {
1169-
requestId?: string;
1170-
forceRefresh?: boolean;
1171-
renewalThreshold?: number;
1172-
expiration?: number;
1173-
} = {}
1174-
): Promise<QueryTablesResult[]> {
1175-
return this.queryDataSourceMetadata<QueryTablesResult[]>(
1176-
MetadataOperation.GET_TABLES_FOR_SCHEMAS,
1177-
{ schemas },
1178-
dataSource,
1179-
options
1180-
);
1181-
}
1182-
1183-
public async queryColumnsForTables(
1184-
tables: QueryTablesResult[],
1185-
dataSource: string = 'default',
1186-
options: {
1187-
requestId?: string;
1188-
forceRefresh?: boolean;
1189-
renewalThreshold?: number;
1190-
expiration?: number;
1191-
} = {}
1192-
): Promise<QueryColumnsResult[]> {
1193-
return this.queryDataSourceMetadata<QueryColumnsResult[]>(
1194-
MetadataOperation.GET_COLUMNS_FOR_TABLES,
1195-
{ tables },
1196-
dataSource,
1197-
options
1198-
);
1199-
}
1200-
1201-
public async clearDataSourceSchemaCache(dataSource: string = 'default') {
1202-
const cacheKey: CacheKey = [`METADATA:${MetadataOperation.GET_SCHEMAS}`, 'empty', dataSource];
1203-
const redisKey = this.queryRedisKey(cacheKey);
1204-
await this.cacheDriver.remove(redisKey);
1205-
this.logger('Cleared datasource schema cache', { dataSource });
1206-
}
1207-
1208-
public async clearTablesForSchemasCache(
1209-
schemas: QuerySchemasResult[],
1210-
dataSource: string = 'default'
1211-
) {
1212-
const paramsHash = this.createMetadataHash(MetadataOperation.GET_TABLES_FOR_SCHEMAS, { schemas });
1213-
const cacheKey: CacheKey = [`METADATA:${MetadataOperation.GET_TABLES_FOR_SCHEMAS}`, paramsHash, dataSource];
1214-
const redisKey = this.queryRedisKey(cacheKey);
1215-
await this.cacheDriver.remove(redisKey);
1216-
this.logger('Cleared tables for schemas cache', {
1217-
dataSource,
1218-
schemaCount: schemas.length,
1219-
cacheHash: paramsHash
1220-
});
1221-
}
1222-
1223-
public async clearColumnsForTablesCache(
1224-
tables: QueryTablesResult[],
1225-
dataSource: string = 'default'
1226-
) {
1227-
const paramsHash = this.createMetadataHash(MetadataOperation.GET_COLUMNS_FOR_TABLES, { tables });
1228-
const cacheKey: CacheKey = [`METADATA:${MetadataOperation.GET_COLUMNS_FOR_TABLES}`, paramsHash, dataSource];
1229-
const redisKey = this.queryRedisKey(cacheKey);
1230-
await this.cacheDriver.remove(redisKey);
1231-
this.logger('Cleared columns for tables cache', {
1232-
dataSource,
1233-
tableCount: tables.length,
1234-
cacheHash: paramsHash
1235-
});
1236-
}
12371041
}

0 commit comments

Comments
 (0)