From f21f1bccc11c4286632a85bfe89a14adfb854df3 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 23 Jul 2025 14:19:13 +0200 Subject: [PATCH 1/3] fix(query-orchestrator): Reduce number of refresh key queries --- .../cubejs-backend-shared/src/promises.ts | 2 +- .../src/orchestrator/QueryCache.ts | 53 ++++++++++--------- 2 files changed, 30 insertions(+), 25 deletions(-) diff --git a/packages/cubejs-backend-shared/src/promises.ts b/packages/cubejs-backend-shared/src/promises.ts index d458296060554..c3c4711b3b22c 100644 --- a/packages/cubejs-backend-shared/src/promises.ts +++ b/packages/cubejs-backend-shared/src/promises.ts @@ -265,7 +265,7 @@ export const retryWithTimeout = ( ); /** - * High order function that makes to debounce multi async calls to single call at one time + * Debunks multiple asynchronous calls at once. */ export const asyncDebounce = ( fn: (...args: Arguments[]) => Promise, diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index d24bc065663be..6d6cfc968c9d5 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -2,7 +2,7 @@ import crypto from 'crypto'; import csvWriter from 'csv-write-stream'; import { LRUCache } from 'lru-cache'; import { pipeline } from 'stream'; -import { getEnv, MaybeCancelablePromise, streamToArray } from '@cubejs-backend/shared'; +import {asyncDebounce, getEnv, MaybeCancelablePromise, streamToArray} from '@cubejs-backend/shared'; import { CubeStoreCacheDriver, CubeStoreDriver } from '@cubejs-backend/cubestore-driver'; import { BaseDriver, @@ -34,6 +34,12 @@ export type QueryWithParams = [ options?: QueryOptions ]; +export type LoadRefreshKeyOptions = { + requestId?: string; + skipRefreshKeyWaitForRenew?: boolean; + dataSource: string +}; + export type Query = { requestId?: string; dataSource: string; @@ -771,32 +777,31 @@ export class QueryCache { public loadRefreshKeys( cacheKeyQueries: QueryWithParams[], expireSecs: number, - options: { - requestId?: string; - skipRefreshKeyWaitForRenew?: boolean; - dataSource: string - } + options: LoadRefreshKeyOptions ) { - return cacheKeyQueries.map((q) => { - const [query, values, queryOptions]: QueryWithParams = Array.isArray(q) ? q : [q, [], {}]; - return this.cacheQueryResult( - query, - values, - [query, values], - expireSecs, - { - renewalThreshold: this.options.refreshKeyRenewalThreshold || queryOptions?.renewalThreshold || 2 * 60, - renewalKey: q, - waitForRenew: !options.skipRefreshKeyWaitForRenew, - requestId: options.requestId, - dataSource: options.dataSource, - useInMemory: true, - external: queryOptions?.external, - }, - ); - }); + return cacheKeyQueries.map((q) => this.loadRefreshKey(q, expireSecs, options)); } + public loadRefreshKey = asyncDebounce(async (q: QueryWithParams, expireSecs: number, options: LoadRefreshKeyOptions) => { + const [query, values, queryOptions]: QueryWithParams = Array.isArray(q) ? q : [q, [], {}]; + + return this.cacheQueryResult( + query, + values, + [query, values], + expireSecs, + { + renewalThreshold: this.options.refreshKeyRenewalThreshold || queryOptions?.renewalThreshold || 2 * 60, + renewalKey: q, + waitForRenew: !options.skipRefreshKeyWaitForRenew, + requestId: options.requestId, + dataSource: options.dataSource, + useInMemory: true, + external: queryOptions?.external, + }, + ); + }); + public withLock = ( key: string, ttl: number, From bdd29cf3c04630cb40fb6ef19e206352a6cabc17 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 23 Jul 2025 16:13:17 +0200 Subject: [PATCH 2/3] Update packages/cubejs-backend-shared/src/promises.ts Co-authored-by: Konstantin Burkalev --- packages/cubejs-backend-shared/src/promises.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cubejs-backend-shared/src/promises.ts b/packages/cubejs-backend-shared/src/promises.ts index c3c4711b3b22c..5dd71d729fc7b 100644 --- a/packages/cubejs-backend-shared/src/promises.ts +++ b/packages/cubejs-backend-shared/src/promises.ts @@ -265,7 +265,7 @@ export const retryWithTimeout = ( ); /** - * Debunks multiple asynchronous calls at once. + * Creates a debounced version of an asynchronous function. */ export const asyncDebounce = ( fn: (...args: Arguments[]) => Promise, From 9e9fe40b0f980c0e3ade34459ec9c940a93fa0e0 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 23 Jul 2025 16:16:07 +0200 Subject: [PATCH 3/3] chore: fix --- .../cubejs-query-orchestrator/src/orchestrator/QueryCache.ts | 2 +- .../test/integration/postgres/pre-aggregations.test.ts | 4 ++-- packages/cubejs-testing/test/smoke-cubesql.test.ts | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index 6d6cfc968c9d5..035c8ea40adc1 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -2,7 +2,7 @@ import crypto from 'crypto'; import csvWriter from 'csv-write-stream'; import { LRUCache } from 'lru-cache'; import { pipeline } from 'stream'; -import {asyncDebounce, getEnv, MaybeCancelablePromise, streamToArray} from '@cubejs-backend/shared'; +import { asyncDebounce, getEnv, MaybeCancelablePromise, streamToArray } from '@cubejs-backend/shared'; import { CubeStoreCacheDriver, CubeStoreDriver } from '@cubejs-backend/cubestore-driver'; import { BaseDriver, diff --git a/packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations.test.ts b/packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations.test.ts index 81ea1daee1e3e..f793fa86b02f1 100644 --- a/packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations.test.ts +++ b/packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations.test.ts @@ -1922,7 +1922,7 @@ describe('PreAggregations', () => { }, { id: 'visitors.source' }], - cubestoreSupportMultistage: getEnv("nativeSqlPlanner") + cubestoreSupportMultistage: getEnv('nativeSqlPlanner') }); const queryAndParams = query.buildSqlAndParams(); @@ -2000,7 +2000,7 @@ describe('PreAggregations', () => { }, { id: 'visitors.source' }], - cubestoreSupportMultistage: getEnv("nativeSqlPlanner") + cubestoreSupportMultistage: getEnv('nativeSqlPlanner') }); const queryAndParams = query.buildSqlAndParams(); diff --git a/packages/cubejs-testing/test/smoke-cubesql.test.ts b/packages/cubejs-testing/test/smoke-cubesql.test.ts index 9b50ae6b531d1..49533434f697e 100644 --- a/packages/cubejs-testing/test/smoke-cubesql.test.ts +++ b/packages/cubejs-testing/test/smoke-cubesql.test.ts @@ -156,7 +156,7 @@ describe('SQL API', () => { Authorization: token, }, body: JSON.stringify({ - query: `SELECT orderDate FROM ECommerce LIMIT 0;`, + query: 'SELECT orderDate FROM ECommerce LIMIT 0;', }), });