Skip to content

feat(query-ocherator): QueryCache - improve usage of in-memory cache #9838

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Cube is a semantic layer for building data applications. This is a monorepo cont

## Development Commands

**Note: This project uses Yarn as the package manager.**
**Note: This project uses Yarn as the package manager. Node.js v22.15.0 is recommended.**

### Core Build Commands
```bash
Expand Down
225 changes: 136 additions & 89 deletions packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,55 @@ export type CacheKey =
[CacheKeyItem, CacheKeyItem, CacheKeyItem] |
[CacheKeyItem, CacheKeyItem, CacheKeyItem, CacheKeyItem];

export type QueryWithRetryAndReleaseOptions = {
cacheKey: CacheKey;
dataSource: string;
external: boolean;
priority?: number;
requestId?: string;
spanId?: string;
inlineTables?: InlineTables;
useCsvQuery?: boolean;
lambdaTypes?: TableStructure;
persistent?: boolean;
aliasNameToMember?: { [alias: string]: string };
};

export type CacheQueryResultOptions = {
dataSource: string;
renewalThreshold?: number;
renewalKey?: any;
priority?: number;
external?: boolean;
requestId?: string;
waitForRenew?: boolean;
forceNoCache?: boolean;
useInMemory?: boolean;
useCsvQuery?: boolean;
lambdaTypes?: TableStructure;
persistent?: boolean;
primaryQuery?: boolean;
renewCycle?: boolean;
};

type CacheEntry = {
time: number;
result: any;
renewalKey: string;
};

type CheckCacheOptions = {
renewalKey: string;
renewalThreshold: number;
requestId: string;
expiration: number;
useInMemory: boolean;
spanId: string;
cacheKey: CacheKey;
primaryQuery: boolean;
renewCycle: boolean;
};

export interface QueryCacheOptions {
refreshKeyRenewalThreshold?: number;
externalQueueOptions?: any;
Expand Down Expand Up @@ -153,7 +196,9 @@ export class QueryCache {
}

this.memoryCache = new LRUCache<string, CacheEntry>({
max: options.maxInMemoryCacheEntries || 10000
max: options.maxInMemoryCacheEntries || 10000,
allowStale: false,
updateAgeOnGet: false,
});
}

Expand Down Expand Up @@ -416,32 +461,22 @@ export class QueryCache {
lambdaTypes,
persistent,
aliasNameToMember,
}: {
cacheKey: CacheKey,
dataSource: string,
external: boolean,
priority?: number,
requestId?: string,
spanId?: string,
inlineTables?: InlineTables,
useCsvQuery?: boolean,
lambdaTypes?: TableStructure,
persistent?: boolean,
aliasNameToMember?: { [alias: string]: string },
}
}: QueryWithRetryAndReleaseOptions
) {
const queue = external
? this.getExternalQueue()
: await this.getQueue(dataSource);

const _query = {
const queryDef = {
queryKey: cacheKey,
query,
values,
requestId,
inlineTables,
useCsvQuery,
lambdaTypes,
// Used only for streaming
aliasNameToMember
};

const opt = {
Expand All @@ -451,12 +486,9 @@ export class QueryCache {
};

if (!persistent) {
return queue.executeInQueue('query', cacheKey as QueryKey, _query, priority, opt);
return queue.executeInQueue('query', cacheKey as QueryKey, queryDef, priority, opt);
} else {
return queue.executeInQueue('stream', cacheKey as QueryKey, {
..._query,
aliasNameToMember,
}, priority, opt);
return queue.executeInQueue('stream', cacheKey as QueryKey, queryDef, priority, opt);
}
}

Expand Down Expand Up @@ -837,32 +869,78 @@ export class QueryCache {
callback: () => MaybeCancelablePromise<T>,
) => this.cacheDriver.withLock(`lock:${key}`, callback, ttl, true);

protected async checkInCache(
redisKey: string,
opts: CheckCacheOptions
): Promise<any> {
if (opts.useInMemory) {
const inMemoryResult = this.checkInMemoryCache(redisKey, opts);
if (inMemoryResult) {
return inMemoryResult;
}
}

const cachedResult = await this.cacheDriver.get(redisKey);

if (opts.useInMemory) {
this.memoryCache.set(redisKey, cachedResult, {
ttl: opts.renewalThreshold * 1000
});
}

return cachedResult;
}

protected checkInMemoryCache(
redisKey: string,
opts: CheckCacheOptions
): any {
const inMemoryValue = this.memoryCache.get(redisKey);
if (!inMemoryValue) {
return null;
}

const renewedAgo = (new Date()).getTime() - inMemoryValue.time;

if (
opts.renewalKey && (
!opts.renewalThreshold ||
renewedAgo > opts.renewalThreshold * 1000 ||
inMemoryValue.renewalKey !== opts.renewalKey
) || renewedAgo > opts.expiration * 1000
) {
this.memoryCache.delete(redisKey);
return null;
}

this.logger('Found in memory cache entry', {
cacheKey: opts.cacheKey,
time: inMemoryValue.time,
renewedAgo,
renewalKey: inMemoryValue.renewalKey,
newRenewalKey: opts.renewalKey,
renewalThreshold: opts.renewalThreshold,
requestId: opts.requestId,
spanId: opts.spanId,
primaryQuery: opts.primaryQuery,
renewCycle: opts.renewCycle
});

return inMemoryValue;
}

public async cacheQueryResult(
query: string | QueryWithParams,
values: string[],
cacheKey: CacheKey,
expiration: number,
options: {
renewalThreshold?: number,
renewalKey?: any,
priority?: number,
external?: boolean,
requestId?: string,
dataSource: string,
waitForRenew?: boolean,
forceNoCache?: boolean,
useInMemory?: boolean,
useCsvQuery?: boolean,
lambdaTypes?: TableStructure,
persistent?: boolean,
primaryQuery?: boolean,
renewCycle?: boolean,
}
options: CacheQueryResultOptions
) {
const spanId = crypto.randomBytes(16).toString('hex');
options = options || { dataSource: 'default' };

const { renewalThreshold, primaryQuery, renewCycle } = options;
const renewalKey = options.renewalKey && this.queryRedisKey(options.renewalKey);

const redisKey = this.queryRedisKey(cacheKey);
const fetchNew = () => (
this.queryWithRetryAndRelease(query, values, {
Expand Down Expand Up @@ -922,56 +1000,27 @@ export class QueryCache {
return fetchNew();
}

let res;

const inMemoryCacheDisablePeriod = 5 * 60 * 1000;

if (options.useInMemory) {
const inMemoryValue = this.memoryCache.get(redisKey);
if (inMemoryValue) {
const renewedAgo = (new Date()).getTime() - inMemoryValue.time;

if (
renewalKey && (
!renewalThreshold ||
!inMemoryValue.time ||
// Do not cache in memory in last 5 minutes of expiry.
// Most likely it'll cause race condition of refreshing data with different refreshKey values.
renewedAgo + inMemoryCacheDisablePeriod > renewalThreshold * 1000 ||
inMemoryValue.renewalKey !== renewalKey
) || renewedAgo > expiration * 1000 || renewedAgo > inMemoryCacheDisablePeriod
) {
this.memoryCache.delete(redisKey);
} else {
this.logger('Found in memory cache entry', {
cacheKey,
time: inMemoryValue.time,
renewedAgo,
renewalKey: inMemoryValue.renewalKey,
newRenewalKey: renewalKey,
renewalThreshold,
requestId: options.requestId,
spanId,
primaryQuery,
renewCycle
});
res = inMemoryValue;
}
const cachedResult = await this.checkInCache(
redisKey,
{
requestId: options.requestId,
useInMemory: options.useInMemory,
renewalKey,
renewalThreshold,
expiration,
spanId,
cacheKey,
primaryQuery,
renewCycle
}
}

if (!res) {
res = await this.cacheDriver.get(redisKey);
}

if (res) {
const parsedResult = res;
const renewedAgo = (new Date()).getTime() - parsedResult.time;
);
if (cachedResult) {
const renewedAgo = (new Date()).getTime() - cachedResult.time;
this.logger('Found cache entry', {
cacheKey,
time: parsedResult.time,
time: cachedResult.time,
renewedAgo,
renewalKey: parsedResult.renewalKey,
renewalKey: cachedResult.renewalKey,
newRenewalKey: renewalKey,
renewalThreshold,
requestId: options.requestId,
Expand All @@ -982,9 +1031,9 @@ export class QueryCache {
if (
renewalKey && (
!renewalThreshold ||
!parsedResult.time ||
!cachedResult.time ||
renewedAgo > renewalThreshold * 1000 ||
parsedResult.renewalKey !== renewalKey
cachedResult.renewalKey !== renewalKey
)
) {
if (options.waitForRenew) {
Expand All @@ -999,11 +1048,9 @@ export class QueryCache {
});
}
}

this.logger('Using cache for', { cacheKey, requestId: options.requestId, spanId, primaryQuery, renewCycle });
if (options.useInMemory && renewedAgo + inMemoryCacheDisablePeriod <= renewalThreshold * 1000) {
this.memoryCache.set(redisKey, parsedResult);
}
return parsedResult.result;
return cachedResult.result;
} else {
this.logger('Missing cache for', { cacheKey, requestId: options.requestId, spanId, primaryQuery, renewCycle });
return fetchNew();
Expand Down
Loading