diff --git a/CLAUDE.md b/CLAUDE.md index 9a4f718d7bd33..79884aceba708 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -13,7 +13,7 @@ Cube is a semantic layer for building data applications. This is a monorepo cont ## Development Commands -**Note: This project uses Yarn as the package manager.** +**Note: This project uses Yarn as the package manager. Node.js v22.15.0 is recommended.** ### Core Build Commands ```bash diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index 9159cdc855713..011f2e82e02a5 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -95,12 +95,55 @@ export type CacheKey = [CacheKeyItem, CacheKeyItem, CacheKeyItem] | [CacheKeyItem, CacheKeyItem, CacheKeyItem, CacheKeyItem]; +export type QueryWithRetryAndReleaseOptions = { + cacheKey: CacheKey; + dataSource: string; + external: boolean; + priority?: number; + requestId?: string; + spanId?: string; + inlineTables?: InlineTables; + useCsvQuery?: boolean; + lambdaTypes?: TableStructure; + persistent?: boolean; + aliasNameToMember?: { [alias: string]: string }; +}; + +export type CacheQueryResultOptions = { + dataSource: string; + renewalThreshold?: number; + renewalKey?: any; + priority?: number; + external?: boolean; + requestId?: string; + waitForRenew?: boolean; + forceNoCache?: boolean; + useInMemory?: boolean; + useCsvQuery?: boolean; + lambdaTypes?: TableStructure; + persistent?: boolean; + primaryQuery?: boolean; + renewCycle?: boolean; +}; + type CacheEntry = { time: number; result: any; renewalKey: string; }; +type CheckCacheOptions = { + renewalKey: string; + renewalThreshold: number; + requestId: string; + expiration: number; + useInMemory: boolean; + spanId: string; + cacheKey: CacheKey; + primaryQuery: boolean; + renewCycle: boolean; +}; + export interface QueryCacheOptions { refreshKeyRenewalThreshold?: number; externalQueueOptions?: any; @@ -153,7 +196,9 @@ export class QueryCache { } this.memoryCache = new LRUCache({ - max: options.maxInMemoryCacheEntries || 10000 + max: options.maxInMemoryCacheEntries || 10000, + allowStale: false, + updateAgeOnGet: false, }); } @@ -416,25 +461,13 @@ export class QueryCache { lambdaTypes, persistent, aliasNameToMember, - }: { - cacheKey: CacheKey, - dataSource: string, - external: boolean, - priority?: number, - requestId?: string, - spanId?: string, - inlineTables?: InlineTables, - useCsvQuery?: boolean, - lambdaTypes?: TableStructure, - persistent?: boolean, - aliasNameToMember?: { [alias: string]: string }, - } + }: QueryWithRetryAndReleaseOptions ) { const queue = external ? this.getExternalQueue() : await this.getQueue(dataSource); - const _query = { + const queryDef = { queryKey: cacheKey, query, values, @@ -442,6 +475,8 @@ export class QueryCache { inlineTables, useCsvQuery, lambdaTypes, + // Used only for streaming + aliasNameToMember }; const opt = { @@ -451,12 +486,9 @@ export class QueryCache { }; if (!persistent) { - return queue.executeInQueue('query', cacheKey as QueryKey, _query, priority, opt); + return queue.executeInQueue('query', cacheKey as QueryKey, queryDef, priority, opt); } else { - return queue.executeInQueue('stream', cacheKey as QueryKey, { - ..._query, - aliasNameToMember, - }, priority, opt); + return queue.executeInQueue('stream', cacheKey as QueryKey, queryDef, priority, opt); } } @@ -837,32 +869,78 @@ export class QueryCache { callback: () => MaybeCancelablePromise, ) => this.cacheDriver.withLock(`lock:${key}`, callback, ttl, true); + protected async checkInCache( + redisKey: string, + opts: CheckCacheOptions + ): Promise { + if (opts.useInMemory) { + const inMemoryResult = this.checkInMemoryCache(redisKey, opts); + if (inMemoryResult) { + return inMemoryResult; + } + } + + const cachedResult = await this.cacheDriver.get(redisKey); + + if (opts.useInMemory) { + this.memoryCache.set(redisKey, cachedResult, { + ttl: opts.renewalThreshold * 1000 + }); + } + + return cachedResult; + } + + protected checkInMemoryCache( + redisKey: string, + opts: CheckCacheOptions + ): any { + const inMemoryValue = this.memoryCache.get(redisKey); + if (!inMemoryValue) { + return null; + } + + const renewedAgo = (new Date()).getTime() - inMemoryValue.time; + + if ( + opts.renewalKey && ( + !opts.renewalThreshold || + renewedAgo > opts.renewalThreshold * 1000 || + inMemoryValue.renewalKey !== opts.renewalKey + ) || renewedAgo > opts.expiration * 1000 + ) { + this.memoryCache.delete(redisKey); + return null; + } + + this.logger('Found in memory cache entry', { + cacheKey: opts.cacheKey, + time: inMemoryValue.time, + renewedAgo, + renewalKey: inMemoryValue.renewalKey, + newRenewalKey: opts.renewalKey, + renewalThreshold: opts.renewalThreshold, + requestId: opts.requestId, + spanId: opts.spanId, + primaryQuery: opts.primaryQuery, + renewCycle: opts.renewCycle + }); + + return inMemoryValue; + } + public async cacheQueryResult( query: string | QueryWithParams, values: string[], cacheKey: CacheKey, expiration: number, - options: { - renewalThreshold?: number, - renewalKey?: any, - priority?: number, - external?: boolean, - requestId?: string, - dataSource: string, - waitForRenew?: boolean, - forceNoCache?: boolean, - useInMemory?: boolean, - useCsvQuery?: boolean, - lambdaTypes?: TableStructure, - persistent?: boolean, - primaryQuery?: boolean, - renewCycle?: boolean, - } + options: CacheQueryResultOptions ) { const spanId = crypto.randomBytes(16).toString('hex'); - options = options || { dataSource: 'default' }; + const { renewalThreshold, primaryQuery, renewCycle } = options; const renewalKey = options.renewalKey && this.queryRedisKey(options.renewalKey); + const redisKey = this.queryRedisKey(cacheKey); const fetchNew = () => ( this.queryWithRetryAndRelease(query, values, { @@ -922,56 +1000,27 @@ export class QueryCache { return fetchNew(); } - let res; - - const inMemoryCacheDisablePeriod = 5 * 60 * 1000; - - if (options.useInMemory) { - const inMemoryValue = this.memoryCache.get(redisKey); - if (inMemoryValue) { - const renewedAgo = (new Date()).getTime() - inMemoryValue.time; - - if ( - renewalKey && ( - !renewalThreshold || - !inMemoryValue.time || - // Do not cache in memory in last 5 minutes of expiry. - // Most likely it'll cause race condition of refreshing data with different refreshKey values. - renewedAgo + inMemoryCacheDisablePeriod > renewalThreshold * 1000 || - inMemoryValue.renewalKey !== renewalKey - ) || renewedAgo > expiration * 1000 || renewedAgo > inMemoryCacheDisablePeriod - ) { - this.memoryCache.delete(redisKey); - } else { - this.logger('Found in memory cache entry', { - cacheKey, - time: inMemoryValue.time, - renewedAgo, - renewalKey: inMemoryValue.renewalKey, - newRenewalKey: renewalKey, - renewalThreshold, - requestId: options.requestId, - spanId, - primaryQuery, - renewCycle - }); - res = inMemoryValue; - } + const cachedResult = await this.checkInCache( + redisKey, + { + requestId: options.requestId, + useInMemory: options.useInMemory, + renewalKey, + renewalThreshold, + expiration, + spanId, + cacheKey, + primaryQuery, + renewCycle } - } - - if (!res) { - res = await this.cacheDriver.get(redisKey); - } - - if (res) { - const parsedResult = res; - const renewedAgo = (new Date()).getTime() - parsedResult.time; + ); + if (cachedResult) { + const renewedAgo = (new Date()).getTime() - cachedResult.time; this.logger('Found cache entry', { cacheKey, - time: parsedResult.time, + time: cachedResult.time, renewedAgo, - renewalKey: parsedResult.renewalKey, + renewalKey: cachedResult.renewalKey, newRenewalKey: renewalKey, renewalThreshold, requestId: options.requestId, @@ -982,9 +1031,9 @@ export class QueryCache { if ( renewalKey && ( !renewalThreshold || - !parsedResult.time || + !cachedResult.time || renewedAgo > renewalThreshold * 1000 || - parsedResult.renewalKey !== renewalKey + cachedResult.renewalKey !== renewalKey ) ) { if (options.waitForRenew) { @@ -999,11 +1048,9 @@ export class QueryCache { }); } } + this.logger('Using cache for', { cacheKey, requestId: options.requestId, spanId, primaryQuery, renewCycle }); - if (options.useInMemory && renewedAgo + inMemoryCacheDisablePeriod <= renewalThreshold * 1000) { - this.memoryCache.set(redisKey, parsedResult); - } - return parsedResult.result; + return cachedResult.result; } else { this.logger('Missing cache for', { cacheKey, requestId: options.requestId, spanId, primaryQuery, renewCycle }); return fetchNew();