Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { LocalCacheDriver } from './LocalCacheDriver';
import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory';
import { LoadPreAggregationResult, PreAggregationDescription } from './PreAggregations';
import { getCacheHash } from './utils';
import { CacheAndQueryDriverType } from './QueryOrchestrator';
import { CacheAndQueryDriverType, MetadataOperationType } from './QueryOrchestrator';

type QueryOptions = {
external?: boolean;
Expand Down Expand Up @@ -567,8 +567,36 @@ export class QueryCache {
): QueryQueue {
const queue: any = new QueryQueue(redisPrefix, {
queryHandlers: {
metadata: async (req, _setCancelHandle) => {
const client = await clientFactory();
const { operation } = req;
const params = req.params || {};

switch (operation) {
case MetadataOperationType.GET_SCHEMAS:
queue.logger('Getting datasource schemas', { dataSource: req.dataSource, requestId: req.requestId });
return client.getSchemas();
case MetadataOperationType.GET_TABLES_FOR_SCHEMAS:
queue.logger('Getting tables for schemas', {
dataSource: req.dataSource,
schemaCount: params.schemas?.length || 0,
requestId: req.requestId
});
return client.getTablesForSpecificSchemas(params.schemas);
case MetadataOperationType.GET_COLUMNS_FOR_TABLES:
queue.logger('Getting columns for tables', {
dataSource: req.dataSource,
tableCount: params.tables?.length || 0,
requestId: req.requestId
});
return client.getColumnsForSpecificTables(params.tables);
default:
throw new Error(`Unknown metadata operation: ${operation}`);
}
},
query: async (req, setCancelHandle) => {
const client = await clientFactory();

const resultPromise = executeFn(client, req);
let handle;
if (resultPromise.cancel) {
Expand Down Expand Up @@ -636,6 +664,12 @@ export class QueryCache {
}));
},
cancelHandlers: {
metadata: async (req) => {
if (req.cancelHandler && queue.handles[req.cancelHandler]) {
await queue.handles[req.cancelHandler].cancel();
delete queue.handles[req.cancelHandler];
}
},
query: async (req) => {
if (req.cancelHandler && queue.handles[req.cancelHandler]) {
await queue.handles[req.cancelHandler].cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ import * as stream from 'stream';
import R from 'ramda';
import { getEnv } from '@cubejs-backend/shared';
import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver';
import {
QuerySchemasResult,
QueryTablesResult,
QueryColumnsResult,
QueryKey } from '@cubejs-backend/base-driver';

import { QueryKey } from '@cubejs-backend/base-driver';
import { QueryCache, QueryBody, TempTable, PreAggTableToTempTable } from './QueryCache';
import { QueryCache, QueryBody, TempTable, PreAggTableToTempTable, QueryWithParams, CacheKey } from './QueryCache';
import { PreAggregations, PreAggregationDescription, getLastUpdatedAtTimestamp } from './PreAggregations';
import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory';
import { QueryStream } from './QueryStream';
Expand All @@ -17,6 +21,12 @@ export enum DriverType {
Cache = 'cache',
}

export enum MetadataOperationType {
GET_SCHEMAS = 'GET_SCHEMAS',
GET_TABLES_FOR_SCHEMAS = 'GET_TABLES_FOR_SCHEMAS',
GET_COLUMNS_FOR_TABLES = 'GET_COLUMNS_FOR_TABLES'
}

export interface QueryOrchestratorOptions {
externalDriverFactory?: DriverFactory;
cacheAndQueueDriver?: CacheAndQueryDriverType;
Expand Down Expand Up @@ -428,4 +438,118 @@ export class QueryOrchestrator {
public async updateRefreshEndReached() {
return this.preAggregations.updateRefreshEndReached();
}

private createMetadataQuery(operation: string, params: Record<string, any>): QueryWithParams {
return [
`METADATA:${operation}`,
[JSON.stringify(params)],
{ external: false, renewalThreshold: 24 * 60 * 60 }
];
}

private async queryDataSourceMetadata<T>(
operation: MetadataOperationType,
params: Record<string, any>,
dataSource: string = 'default',
options: {
requestId?: string;
forceRefresh?: boolean;
renewalThreshold?: number;
expiration?: number;
} = {}
): Promise<T> {
const {
requestId,
forceRefresh = false,
renewalThreshold = 24 * 60 * 60,
expiration = 7 * 24 * 60 * 60,
} = options;

const metadataQuery = this.createMetadataQuery(operation, params);
const cacheKey: CacheKey = [`METADATA:${operation}`, metadataQuery, dataSource];

const renewalKey = forceRefresh ? undefined : [
`METADATA_RENEWAL:${operation}`,
dataSource,
Math.floor(Date.now() / (renewalThreshold * 1000))
];

return this.queryCache.cacheQueryResult(
metadataQuery,
[],
cacheKey,
expiration,
{
renewalThreshold,
renewalKey,
forceNoCache: forceRefresh,
requestId,
dataSource,
useInMemory: true,
waitForRenew: true,
}
);
}

/**
* Query the data source for available schemas.
*/
public async queryDataSourceSchemas(
dataSource: string = 'default',
options: {
requestId?: string;
forceRefresh?: boolean;
renewalThreshold?: number;
expiration?: number;
} = {}
): Promise<QuerySchemasResult[]> {
return this.queryDataSourceMetadata<QuerySchemasResult[]>(
MetadataOperationType.GET_SCHEMAS,
{},
dataSource,
options
);
}

/**
* Query the data source for tables within the specified schemas.
*/
public async queryTablesForSchemas(
schemas: QuerySchemasResult[],
dataSource: string = 'default',
options: {
requestId?: string;
forceRefresh?: boolean;
renewalThreshold?: number;
expiration?: number;
} = {}
): Promise<QueryTablesResult[]> {
return this.queryDataSourceMetadata<QueryTablesResult[]>(
MetadataOperationType.GET_TABLES_FOR_SCHEMAS,
{ schemas },
dataSource,
options
);
}

/**
* Query the data source for columns within the specified tables.
*/
public async queryColumnsForTables(
tables: QueryTablesResult[],
dataSource: string = 'default',
options: {
requestId?: string;
forceRefresh?: boolean;
renewalThreshold?: number;
expiration?: number;
} = {}
): Promise<QueryColumnsResult[]> {
return this.queryDataSourceMetadata<QueryColumnsResult[]>(
MetadataOperationType.GET_COLUMNS_FOR_TABLES,
{ tables },
dataSource,
options
);
}
}
Loading
Loading