11import { TableStructure } from '@cubejs-backend/base-driver' ;
2- import { asyncDebounce } from '@cubejs-backend/shared' ;
32import { DriverFactory } from './DriverFactory' ;
43import { QueryCache , QueryWithParams } from './QueryCache' ;
54import {
@@ -17,6 +16,16 @@ type PreAggregationLoadCacheOptions = {
1716 tablePrefixes ?: string [ ] ,
1817} ;
1918
19+ function createDeferred ( ) {
20+ let resolve ;
21+ let reject ;
22+ const promise = new Promise ( ( res , rej ) => {
23+ resolve = res ;
24+ reject = rej ;
25+ } ) ;
26+ return { promise, resolve, reject } ;
27+ }
28+
2029export class PreAggregationLoadCache {
2130 private readonly driverFactory : DriverFactory ;
2231
@@ -26,6 +35,8 @@ export class PreAggregationLoadCache {
2635
2736 private readonly queryResults : any ;
2837
38+ private queryResultRequests : { [ redisKey : string ] : { resolve : CallableFunction , reject : CallableFunction } [ ] } = { } ;
39+
2940 private readonly externalDriverFactory : any ;
3041
3142 private readonly requestId : any ;
@@ -45,8 +56,6 @@ export class PreAggregationLoadCache {
4556
4657 private readonly tablePrefixes : string [ ] | null ;
4758
48- private readonly cacheQueryResultDebounced : Function ;
49-
5059 public constructor (
5160 clientFactory : DriverFactory ,
5261 queryCache ,
@@ -64,7 +73,6 @@ export class PreAggregationLoadCache {
6473 this . versionEntries = { } ;
6574 this . tables = { } ;
6675 this . tableColumnTypes = { } ;
67- this . cacheQueryResultDebounced = asyncDebounce ( this . queryCache . cacheQueryResult . bind ( this . queryCache ) ) ;
6876 }
6977
7078 protected async tablesFromCache ( preAggregation , forceRenew : boolean = false ) {
@@ -201,26 +209,53 @@ export class PreAggregationLoadCache {
201209 return this . queryResults [ queryKey ] ;
202210 }
203211
212+ // There is ongoing request
213+ if ( this . queryResultRequests [ queryKey ] ) {
214+ const { promise, resolve, reject } = createDeferred ( ) ;
215+ this . queryResultRequests [ queryKey ] . push ( { resolve, reject } ) ;
216+
217+ return promise ;
218+ }
219+
220+ // Making query for a first time
221+ this . queryResultRequests [ queryKey ] = [ ] ;
204222
205- this . queryResults [ queryKey ] = await this . cacheQueryResultDebounced (
206- query ,
207- values ,
208- [ query , values ] ,
209- 60 * 60 ,
210- {
211- renewalThreshold : this . queryCache . options . refreshKeyRenewalThreshold
212- || queryOptions ?. renewalThreshold || 2 * 60 ,
213- renewalKey : [ query , values ] ,
214- waitForRenew,
215- priority,
216- requestId : this . requestId ,
217- dataSource : this . dataSource ,
218- useInMemory : true ,
219- external : queryOptions ?. external
223+ try {
224+ this . queryResults [ queryKey ] = await this . queryCache . cacheQueryResult (
225+ query ,
226+ values ,
227+ [ query , values ] ,
228+ 60 * 60 ,
229+ {
230+ renewalThreshold : this . queryCache . options . refreshKeyRenewalThreshold
231+ || queryOptions ?. renewalThreshold || 2 * 60 ,
232+ renewalKey : [ query , values ] ,
233+ waitForRenew,
234+ priority,
235+ requestId : this . requestId ,
236+ dataSource : this . dataSource ,
237+ useInMemory : true ,
238+ external : queryOptions ?. external
239+ }
240+ ) ;
241+
242+ let r = ( this . queryResultRequests [ queryKey ] || [ ] ) . pop ( ) ;
243+ while ( r ) {
244+ r . resolve ( this . queryResults [ queryKey ] ) ;
245+ r = this . queryResultRequests [ queryKey ] . pop ( ) ;
220246 }
221- ) ;
222247
223- return this . queryResults [ queryKey ] ;
248+ return this . queryResults [ queryKey ] ;
249+ } catch ( err ) {
250+ let r = ( this . queryResultRequests [ queryKey ] || [ ] ) . pop ( ) ;
251+ while ( r ) {
252+ r . reject ( err ) ;
253+ r = this . queryResultRequests [ queryKey ] . pop ( ) ;
254+ }
255+ throw err ;
256+ } finally {
257+ this . queryResultRequests [ queryKey ] = null ;
258+ }
224259 }
225260
226261 public hasKeyQueryResult ( keyQuery ) {
0 commit comments