@@ -9,10 +9,10 @@ import {
99 InlineTables ,
1010 CacheDriverInterface ,
1111 TableStructure ,
12- DriverInterface ,
12+ DriverInterface , QueryKey ,
1313} from '@cubejs-backend/base-driver' ;
1414
15- import { QueryQueue } from './QueryQueue' ;
15+ import { QueryQueue , QueryQueueOptions } from './QueryQueue' ;
1616import { ContinueWaitError } from './ContinueWaitError' ;
1717import { LocalCacheDriver } from './LocalCacheDriver' ;
1818import { DriverFactory , DriverFactoryByDataSource } from './DriverFactory' ;
@@ -115,7 +115,7 @@ export interface QueryCacheOptions {
115115 } > ;
116116 cubeStoreDriverFactory ?: ( ) => Promise < CubeStoreDriver > ,
117117 continueWaitTimeout ?: number ;
118- cacheAndQueueDriver ? : CacheAndQueryDriverType ;
118+ cacheAndQueueDriver : CacheAndQueryDriverType ;
119119 maxInMemoryCacheEntries ?: number ;
120120 skipExternalCacheAndQueue ?: boolean ;
121121}
@@ -133,7 +133,7 @@ export class QueryCache {
133133 protected readonly redisPrefix : string ,
134134 protected readonly driverFactory : DriverFactoryByDataSource ,
135135 protected readonly logger : any ,
136- public readonly options : QueryCacheOptions = { }
136+ public readonly options : QueryCacheOptions
137137 ) {
138138 switch ( options . cacheAndQueueDriver || 'memory' ) {
139139 case 'memory' :
@@ -455,9 +455,9 @@ export class QueryCache {
455455 } ;
456456
457457 if ( ! persistent ) {
458- return queue . executeInQueue ( 'query' , cacheKey , _query , priority , opt ) ;
458+ return queue . executeInQueue ( 'query' , cacheKey as QueryKey , _query , priority , opt ) ;
459459 } else {
460- return queue . executeInQueue ( 'stream' , cacheKey , {
460+ return queue . executeInQueue ( 'stream' , cacheKey as QueryKey , {
461461 ..._query ,
462462 aliasNameToMember,
463463 } , priority , opt ) ;
@@ -563,7 +563,7 @@ export class QueryCache {
563563 redisPrefix : string ,
564564 clientFactory : DriverFactory ,
565565 executeFn : ( client : BaseDriver , req : any ) => any ,
566- options : Record < string , any > = { }
566+ options : Omit < QueryQueueOptions , 'queryHandlers' | 'cancelHandlers' >
567567 ) : QueryQueue {
568568 const queue : any = new QueryQueue ( redisPrefix , {
569569 queryHandlers : {
@@ -583,57 +583,57 @@ export class QueryCache {
583583 }
584584 return result ;
585585 } ,
586- stream : async ( req , target ) => {
587- queue . logger ( 'Streaming SQL' , { ... req } ) ;
588- await ( new Promise ( ( resolve , reject ) => {
589- let logged = false ;
590- Promise
591- . all ( [ clientFactory ( ) ] )
592- . then ( ( [ client ] ) => ( < DriverInterface > client ) . stream ( req . query , req . values , { highWaterMark : getEnv ( 'dbQueryStreamHighWaterMark' ) } ) )
593- . then ( ( source ) => {
594- const cleanup = async ( error ) => {
595- if ( source . release ) {
596- const toRelease = source . release ;
597- delete source . release ;
598- await toRelease ( ) ;
599- }
600- if ( error && ! target . destroyed ) {
601- target . destroy ( error ) ;
602- }
603- if ( ! logged && target . destroyed ) {
604- logged = true ;
605- if ( error ) {
606- queue . logger ( 'Streaming done with error' , {
607- query : req . query ,
608- query_values : req . values ,
609- error ,
610- } ) ;
611- reject ( error ) ;
612- } else {
613- queue . logger ( 'Streaming successfully completed' , {
614- requestId : req . requestId ,
615- } ) ;
616- resolve ( req . requestId ) ;
617- }
586+ } ,
587+ streamHandler : async ( req , target ) => {
588+ queue . logger ( 'Streaming SQL' , { ... req } ) ;
589+ await ( new Promise ( ( resolve , reject ) => {
590+ let logged = false ;
591+ Promise
592+ . all ( [ clientFactory ( ) ] )
593+ . then ( ( [ client ] ) => ( < DriverInterface > client ) . stream ( req . query , req . values , { highWaterMark : getEnv ( 'dbQueryStreamHighWaterMark' ) } ) )
594+ . then ( ( source ) => {
595+ const cleanup = async ( error ) => {
596+ if ( source . release ) {
597+ const toRelease = source . release ;
598+ delete source . release ;
599+ await toRelease ( ) ;
600+ }
601+ if ( error && ! target . destroyed ) {
602+ target . destroy ( error ) ;
603+ }
604+ if ( ! logged && target . destroyed ) {
605+ logged = true ;
606+ if ( error ) {
607+ queue . logger ( 'Streaming done with error' , {
608+ query : req . query ,
609+ query_values : req . values ,
610+ error ,
611+ } ) ;
612+ reject ( error ) ;
613+ } else {
614+ queue . logger ( 'Streaming successfully completed' , {
615+ requestId : req . requestId ,
616+ } ) ;
617+ resolve ( req . requestId ) ;
618618 }
619- } ;
620-
621- source . rowStream . once ( 'end' , ( ) => cleanup ( undefined ) ) ;
622- source . rowStream . once ( 'error ' , cleanup ) ;
623- source . rowStream . once ( 'close ' , ( ) => cleanup ( undefined ) ) ;
624-
625- target . once ( 'end' , ( ) => cleanup ( undefined ) ) ;
626- target . once ( 'error ' , cleanup ) ;
627- target . once ( 'close ' , ( ) => cleanup ( undefined ) ) ;
628-
629- source . rowStream . pipe ( target ) ;
630- } )
631- . catch ( ( reason ) => {
632- target . emit ( 'error' , reason ) ;
633- resolve ( reason ) ;
634- } ) ;
635- } ) ) ;
636- } ,
619+ }
620+ } ;
621+
622+ source . rowStream . once ( 'end ' , ( ) => cleanup ( undefined ) ) ;
623+ source . rowStream . once ( 'error ' , cleanup ) ;
624+ source . rowStream . once ( 'close' , ( ) => cleanup ( undefined ) ) ;
625+
626+ target . once ( 'end ' , ( ) => cleanup ( undefined ) ) ;
627+ target . once ( 'error ' , cleanup ) ;
628+ target . once ( 'close' , ( ) => cleanup ( undefined ) ) ;
629+
630+ source . rowStream . pipe ( target ) ;
631+ } )
632+ . catch ( ( reason ) => {
633+ target . emit ( 'error' , reason ) ;
634+ resolve ( reason ) ;
635+ } ) ;
636+ } ) ) ;
637637 } ,
638638 cancelHandlers : {
639639 query : async ( req ) => {
0 commit comments