Skip to content

Commit bb98d78

Browse files
committed
Implement background refresh
1 parent a48cf78 commit bb98d78

File tree

3 files changed

+92
-1
lines changed

3 files changed

+92
-1
lines changed

packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,30 @@ export type QueryBody = {
101101
[key: string]: any;
102102
};
103103

104+
/*
105+
106+
stale-if-slow (default) — equivalent to current renewQuery: false
107+
If refresh keys are up-to-date, returns the value from cache
108+
If refresh keys are expired, tries to return the value from the database
109+
Returns fresh value from the database if the query executed in the database until the first “Continue wait” interval is reached
110+
Returns stale value from cache otherwise
111+
112+
stale-while-revalidate — AKA “backgroundRefresh”
113+
If refresh keys are up-to-date, returns the value from cache
114+
If refresh keys are expired, returns stale data from cache
115+
Updates the cache in background
116+
117+
must-revalidate — equivalent to current renewQuery: true
118+
If refresh keys are up-to-date, returns the value from cache
119+
If refresh keys are expired, tries to return the value from the database
120+
Returns fresh value from the database even if it takes minutes and many “Continue wait” intervals
121+
122+
no-cache — AKA “forceRefresh”
123+
Skips refresh key checks
124+
Returns fresh data from the database, even if it takes minutes and many ”Continue wait” intervals
125+
126+
*/
127+
104128
/**
105129
* Temp (partition/lambda) table definition.
106130
*/
@@ -760,6 +784,42 @@ export class QueryCache {
760784
});
761785
}
762786

787+
public async startBackgroundRefreshForQuery(queryBody: QueryBody, preAggregationsTablesToTempTables: PreAggTableToTempTable[]) {
788+
const replacePreAggregationTableNames =
789+
(queryAndParams: string | QueryWithParams) => (
790+
QueryCache.replacePreAggregationTableNames(
791+
queryAndParams,
792+
preAggregationsTablesToTempTables,
793+
)
794+
);
795+
796+
const query = replacePreAggregationTableNames(queryBody.query);
797+
const { values } = queryBody;
798+
799+
const cacheKeyQueries = this
800+
.cacheKeyQueriesFrom(queryBody)
801+
.map(replacePreAggregationTableNames);
802+
803+
const renewalThreshold = queryBody.cacheKeyQueries?.renewalThreshold;
804+
const expireSecs = this.getExpireSecs(queryBody);
805+
const cacheKey = QueryCache.queryCacheKey(queryBody);
806+
807+
this.startRenewCycle(
808+
query,
809+
values,
810+
cacheKeyQueries,
811+
expireSecs,
812+
cacheKey,
813+
renewalThreshold,
814+
{
815+
external: queryBody.external,
816+
requestId: queryBody.requestId,
817+
dataSource: queryBody.dataSource,
818+
persistent: false, // We don't need stream back as there will be no consumer
819+
}
820+
);
821+
}
822+
763823
public renewQuery(
764824
query: string | QueryWithParams,
765825
values: string[],

packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,29 @@ export class QueryOrchestrator {
143143
return this.preAggregations;
144144
}
145145

146+
/**
147+
* Starts background refresh cycle for a query.
148+
*/
149+
public async startBackgroundRefresh(queryBody: QueryBody): Promise<void> {
150+
if (!queryBody.query) {
151+
return;
152+
}
153+
154+
const {
155+
preAggregationsTablesToTempTables,
156+
values,
157+
} = await this.preAggregations.loadAllPreAggregationsIfNeeded(queryBody);
158+
159+
if (values) {
160+
queryBody = {
161+
...queryBody,
162+
values
163+
};
164+
}
165+
166+
await this.queryCache.startBackgroundRefreshForQuery(queryBody, preAggregationsTablesToTempTables);
167+
}
168+
146169
/**
147170
* Force reconcile queue logic to be executed.
148171
*/

packages/cubejs-server-core/src/core/OrchestratorApi.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,15 @@ export class OrchestratorApi {
155155
}
156156

157157
if (query.cache === 'stale-while-revalidate' && fromCache) {
158-
// TODO: Run background refresh
158+
// Start background refresh
159+
this.orchestrator.startBackgroundRefresh(query).catch(e => {
160+
this.logger('Error starting background refresh', {
161+
query: queryForLog,
162+
requestId: query.requestId,
163+
error: ((e as Error).stack || e)
164+
});
165+
});
166+
159167
return {
160168
...fromCache,
161169
slowQuery: true

0 commit comments

Comments
 (0)