Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/cubejs-backend-shared/src/promises.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ export const retryWithTimeout = <T>(
);

/**
* High order function that makes to debounce multi async calls to single call at one time
* Debunks multiple asynchronous calls at once.
*/
export const asyncDebounce = <Ret, Arguments>(
fn: (...args: Arguments[]) => Promise<Ret>,
Expand Down
53 changes: 29 additions & 24 deletions packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import crypto from 'crypto';
import csvWriter from 'csv-write-stream';
import { LRUCache } from 'lru-cache';
import { pipeline } from 'stream';
import { getEnv, MaybeCancelablePromise, streamToArray } from '@cubejs-backend/shared';
import {asyncDebounce, getEnv, MaybeCancelablePromise, streamToArray} from '@cubejs-backend/shared';
import { CubeStoreCacheDriver, CubeStoreDriver } from '@cubejs-backend/cubestore-driver';
import {
BaseDriver,
Expand Down Expand Up @@ -34,6 +34,12 @@ export type QueryWithParams = [
options?: QueryOptions
];

export type LoadRefreshKeyOptions = {
requestId?: string;
skipRefreshKeyWaitForRenew?: boolean;
dataSource: string
};

export type Query = {
requestId?: string;
dataSource: string;
Expand Down Expand Up @@ -771,32 +777,31 @@ export class QueryCache {
public loadRefreshKeys(
cacheKeyQueries: QueryWithParams[],
expireSecs: number,
options: {
requestId?: string;
skipRefreshKeyWaitForRenew?: boolean;
dataSource: string
}
options: LoadRefreshKeyOptions
) {
return cacheKeyQueries.map((q) => {
const [query, values, queryOptions]: QueryWithParams = Array.isArray(q) ? q : [q, [], {}];
return this.cacheQueryResult(
query,
values,
[query, values],
expireSecs,
{
renewalThreshold: this.options.refreshKeyRenewalThreshold || queryOptions?.renewalThreshold || 2 * 60,
renewalKey: q,
waitForRenew: !options.skipRefreshKeyWaitForRenew,
requestId: options.requestId,
dataSource: options.dataSource,
useInMemory: true,
external: queryOptions?.external,
},
);
});
return cacheKeyQueries.map((q) => this.loadRefreshKey(q, expireSecs, options));
}

public loadRefreshKey = asyncDebounce(async (q: QueryWithParams, expireSecs: number, options: LoadRefreshKeyOptions) => {
const [query, values, queryOptions]: QueryWithParams = Array.isArray(q) ? q : [q, [], {}];

return this.cacheQueryResult(
query,
values,
[query, values],
expireSecs,
{
renewalThreshold: this.options.refreshKeyRenewalThreshold || queryOptions?.renewalThreshold || 2 * 60,
renewalKey: q,
waitForRenew: !options.skipRefreshKeyWaitForRenew,
requestId: options.requestId,
dataSource: options.dataSource,
useInMemory: true,
external: queryOptions?.external,
},
);
});

public withLock = <T = any>(
key: string,
ttl: number,
Expand Down
Loading