Skip to content

Commit f8f640c

Browse files
committed
improve caching in pre-agg-load-cache
1 parent d0cb744 commit f8f640c

File tree

1 file changed

+48
-3
lines changed

1 file changed

+48
-3
lines changed

packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,16 @@ type PreAggregationLoadCacheOptions = {
1616
tablePrefixes?: string[],
1717
};
1818

19+
function createDeferred() {
20+
let resolve;
21+
let reject;
22+
const promise = new Promise((res, rej) => {
23+
resolve = res;
24+
reject = rej;
25+
});
26+
return { promise, resolve, reject };
27+
}
28+
1929
export class PreAggregationLoadCache {
2030
private readonly driverFactory: DriverFactory;
2131

@@ -25,6 +35,8 @@ export class PreAggregationLoadCache {
2535

2636
private readonly queryResults: any;
2737

38+
private queryResultRequests: { [redisKey: string]: { resolve: CallableFunction, reject: CallableFunction }[]} = {};
39+
2840
private readonly externalDriverFactory: any;
2941

3042
private readonly requestId: any;
@@ -190,9 +202,26 @@ export class PreAggregationLoadCache {
190202

191203
public async keyQueryResult(sqlQuery: QueryWithParams, waitForRenew: boolean, priority: number) {
192204
const [query, values, queryOptions]: QueryWithParams = Array.isArray(sqlQuery) ? sqlQuery : [sqlQuery, [], {}];
205+
const queryKey = this.queryCache.queryRedisKey([query, values]);
193206

194-
if (!this.queryResults[this.queryCache.queryRedisKey([query, values])]) {
195-
this.queryResults[this.queryCache.queryRedisKey([query, values])] = await this.queryCache.cacheQueryResult(
207+
// Have result in cache
208+
if (this.queryResults[queryKey]) {
209+
return this.queryResults[queryKey];
210+
}
211+
212+
// There is ongoing request
213+
if (this.queryResultRequests[queryKey]) {
214+
const { promise, resolve, reject } = createDeferred();
215+
this.queryResultRequests[queryKey].push({ resolve, reject });
216+
217+
return promise;
218+
}
219+
220+
// Making query for a first time
221+
this.queryResultRequests[queryKey] = [];
222+
223+
try {
224+
this.queryResults[queryKey] = await this.queryCache.cacheQueryResult(
196225
query,
197226
values,
198227
[query, values],
@@ -209,8 +238,24 @@ export class PreAggregationLoadCache {
209238
external: queryOptions?.external
210239
}
211240
);
241+
242+
let r = (this.queryResultRequests[queryKey] || []).pop();
243+
while (r) {
244+
r.resolve(this.queryResults[queryKey]);
245+
r = this.queryResultRequests[queryKey].pop();
246+
}
247+
248+
return this.queryResults[queryKey];
249+
} catch (err) {
250+
let r = (this.queryResultRequests[queryKey] || []).pop();
251+
while (r) {
252+
r.reject(err);
253+
r = this.queryResultRequests[queryKey].pop();
254+
}
255+
throw err;
256+
} finally {
257+
this.queryResultRequests[queryKey] = null;
212258
}
213-
return this.queryResults[this.queryCache.queryRedisKey([query, values])];
214259
}
215260

216261
public hasKeyQueryResult(keyQuery) {

0 commit comments

Comments
 (0)