@@ -2,9 +2,14 @@ import * as stream from 'stream';
22import R from 'ramda' ;
33import { getEnv } from '@cubejs-backend/shared' ;
44import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver' ;
5-
6- import { QueryKey } from '@cubejs-backend/base-driver' ;
7- import { QueryCache , QueryBody , TempTable , PreAggTableToTempTable } from './QueryCache' ;
5+ import {
6+ QuerySchemasResult ,
7+ QueryTablesResult ,
8+ QueryColumnsResult ,
9+ QueryKey
10+ } from '@cubejs-backend/base-driver' ;
11+
12+ import { QueryCache , QueryBody , TempTable , PreAggTableToTempTable , QueryWithParams , CacheKey } from './QueryCache' ;
813import { PreAggregations , PreAggregationDescription , getLastUpdatedAtTimestamp } from './PreAggregations' ;
914import { DriverFactory , DriverFactoryByDataSource } from './DriverFactory' ;
1015import { QueryStream } from './QueryStream' ;
@@ -17,6 +22,12 @@ export enum DriverType {
1722 Cache = 'cache' ,
1823}
1924
25+ export enum MetadataOperationType {
26+ GET_SCHEMAS = 'GET_SCHEMAS' ,
27+ GET_TABLES_FOR_SCHEMAS = 'GET_TABLES_FOR_SCHEMAS' ,
28+ GET_COLUMNS_FOR_TABLES = 'GET_COLUMNS_FOR_TABLES'
29+ }
30+
2031export interface QueryOrchestratorOptions {
2132 externalDriverFactory ?: DriverFactory ;
2233 cacheAndQueueDriver ?: CacheAndQueryDriverType ;
@@ -428,4 +439,120 @@ export class QueryOrchestrator {
428439 public async updateRefreshEndReached ( ) {
429440 return this . preAggregations . updateRefreshEndReached ( ) ;
430441 }
442+
443+ private createMetadataQuery ( operation : string , params : Record < string , any > ) : QueryWithParams {
444+ return [
445+ `METADATA:${ operation } ` ,
446+ // TODO (@MikeNitsenko): Metadata queries need object params like [{ schema, table }]
447+ // but QueryWithParams expects string[]. This forces JSON.stringify workaround.
448+ [ JSON . stringify ( params ) ] ,
449+ { external : false , renewalThreshold : 24 * 60 * 60 }
450+ ] ;
451+ }
452+
453+ private async queryDataSourceMetadata < T > (
454+ operation : MetadataOperationType ,
455+ params : Record < string , any > ,
456+ dataSource : string = 'default' ,
457+ options : {
458+ requestId ?: string ;
459+ forceRefresh ?: boolean ;
460+ renewalThreshold ?: number ;
461+ expiration ?: number ;
462+ } = { }
463+ ) : Promise < T > {
464+ const {
465+ requestId,
466+ forceRefresh = false ,
467+ renewalThreshold = 24 * 60 * 60 ,
468+ expiration = 7 * 24 * 60 * 60 ,
469+ } = options ;
470+
471+ const metadataQuery = this . createMetadataQuery ( operation , params ) ;
472+ const cacheKey : CacheKey = [ metadataQuery , dataSource ] ;
473+
474+ const renewalKey = forceRefresh ? undefined : [
475+ `METADATA_RENEWAL:${ operation } ` ,
476+ dataSource ,
477+ Math . floor ( Date . now ( ) / ( renewalThreshold * 1000 ) )
478+ ] ;
479+
480+ return this . queryCache . cacheQueryResult (
481+ metadataQuery ,
482+ [ ] ,
483+ cacheKey ,
484+ expiration ,
485+ {
486+ renewalThreshold,
487+ renewalKey,
488+ forceNoCache : forceRefresh ,
489+ requestId,
490+ dataSource,
491+ useInMemory : true ,
492+ waitForRenew : true ,
493+ }
494+ ) ;
495+ }
496+
497+ /**
498+ * Query the data source for available schemas.
499+ */
500+ public async queryDataSourceSchemas (
501+ dataSource : string = 'default' ,
502+ options : {
503+ requestId ?: string ;
504+ forceRefresh ?: boolean ;
505+ renewalThreshold ?: number ;
506+ expiration ?: number ;
507+ } = { }
508+ ) : Promise < QuerySchemasResult [ ] > {
509+ return this . queryDataSourceMetadata < QuerySchemasResult [ ] > (
510+ MetadataOperationType . GET_SCHEMAS ,
511+ { } ,
512+ dataSource ,
513+ options
514+ ) ;
515+ }
516+
517+ /**
518+ * Query the data source for tables within the specified schemas.
519+ */
520+ public async queryTablesForSchemas (
521+ schemas : QuerySchemasResult [ ] ,
522+ dataSource : string = 'default' ,
523+ options : {
524+ requestId ?: string ;
525+ forceRefresh ?: boolean ;
526+ renewalThreshold ?: number ;
527+ expiration ?: number ;
528+ } = { }
529+ ) : Promise < QueryTablesResult [ ] > {
530+ return this . queryDataSourceMetadata < QueryTablesResult [ ] > (
531+ MetadataOperationType . GET_TABLES_FOR_SCHEMAS ,
532+ { schemas } ,
533+ dataSource ,
534+ options
535+ ) ;
536+ }
537+
538+ /**
539+ * Query the data source for columns within the specified tables.
540+ */
541+ public async queryColumnsForTables (
542+ tables : QueryTablesResult [ ] ,
543+ dataSource : string = 'default' ,
544+ options : {
545+ requestId ?: string ;
546+ forceRefresh ?: boolean ;
547+ renewalThreshold ?: number ;
548+ expiration ?: number ;
549+ } = { }
550+ ) : Promise < QueryColumnsResult [ ] > {
551+ return this . queryDataSourceMetadata < QueryColumnsResult [ ] > (
552+ MetadataOperationType . GET_COLUMNS_FOR_TABLES ,
553+ { tables } ,
554+ dataSource ,
555+ options
556+ ) ;
557+ }
431558}
0 commit comments