Skip to content

Commit 2c625cf

Browse files
committed
Implement background refresh
1 parent a48cf78 commit 2c625cf

File tree

3 files changed

+68
-1
lines changed

3 files changed

+68
-1
lines changed

packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -760,6 +760,42 @@ export class QueryCache {
760760
});
761761
}
762762

763+
public async startBackgroundRefreshForQuery(queryBody: QueryBody, preAggregationsTablesToTempTables: PreAggTableToTempTable[]) {
764+
const replacePreAggregationTableNames =
765+
(queryAndParams: string | QueryWithParams) => (
766+
QueryCache.replacePreAggregationTableNames(
767+
queryAndParams,
768+
preAggregationsTablesToTempTables,
769+
)
770+
);
771+
772+
const query = replacePreAggregationTableNames(queryBody.query);
773+
const { values } = queryBody;
774+
775+
const cacheKeyQueries = this
776+
.cacheKeyQueriesFrom(queryBody)
777+
.map(replacePreAggregationTableNames);
778+
779+
const renewalThreshold = queryBody.cacheKeyQueries?.renewalThreshold;
780+
const expireSecs = this.getExpireSecs(queryBody);
781+
const cacheKey = QueryCache.queryCacheKey(queryBody);
782+
783+
this.startRenewCycle(
784+
query,
785+
values,
786+
cacheKeyQueries,
787+
expireSecs,
788+
cacheKey,
789+
renewalThreshold,
790+
{
791+
external: queryBody.external,
792+
requestId: queryBody.requestId,
793+
dataSource: queryBody.dataSource,
794+
persistent: false, // We don't need stream back as there will be no consumer
795+
}
796+
);
797+
}
798+
763799
public renewQuery(
764800
query: string | QueryWithParams,
765801
values: string[],

packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,29 @@ export class QueryOrchestrator {
143143
return this.preAggregations;
144144
}
145145

146+
/**
147+
* Starts background refresh cycle for a query.
148+
*/
149+
public async startBackgroundRefresh(queryBody: QueryBody): Promise<void> {
150+
if (!queryBody.query) {
151+
return;
152+
}
153+
154+
const {
155+
preAggregationsTablesToTempTables,
156+
values,
157+
} = await this.preAggregations.loadAllPreAggregationsIfNeeded(queryBody);
158+
159+
if (values) {
160+
queryBody = {
161+
...queryBody,
162+
values
163+
};
164+
}
165+
166+
await this.queryCache.startBackgroundRefreshForQuery(queryBody, preAggregationsTablesToTempTables);
167+
}
168+
146169
/**
147170
* Force reconcile queue logic to be executed.
148171
*/

packages/cubejs-server-core/src/core/OrchestratorApi.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,15 @@ export class OrchestratorApi {
155155
}
156156

157157
if (query.cache === 'stale-while-revalidate' && fromCache) {
158-
// TODO: Run background refresh
158+
// Start background refresh
159+
this.orchestrator.startBackgroundRefresh(query).catch(e => {
160+
this.logger('Error starting background refresh', {
161+
query: queryForLog,
162+
requestId: query.requestId,
163+
error: ((e as Error).stack || e)
164+
});
165+
});
166+
159167
return {
160168
...fromCache,
161169
slowQuery: true

0 commit comments

Comments
 (0)