Skip to content

Commit 9234293

Browse files
authored
chore(query-orchestrator): Extract Pre-aggregation-related classes to separate files (#9274)
* some typos and refactoring * rafactor CacheKey type def * typo * refactor imports in time.ts * just typo * remove unused: redisPrefix from PreAggregationLoadCache class * remove unused: redisPrefix from PreAggregationLoader & PreAggregationPartitionRangeLoader * refactor PreAggregations: move every class to a separate file * refactor PreAggregations: delete moved * code reformat * better naming in utcToLocalTimeZone() * code polishment in PreAggregationLoadCache class * a bit of code polishment in PreAggregationLoader class * attempt to fix CacheKey type
1 parent 4406d98 commit 9234293

File tree

12 files changed

+1861
-1822
lines changed

12 files changed

+1861
-1822
lines changed

packages/cubejs-backend-shared/src/promises.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ export interface CancelableIntervalOptions {
122122
}
123123

124124
/**
125-
* It's helps to create an interval that can be canceled with awaiting latest execution
125+
* It helps to create an interval that can be canceled with awaiting latest execution
126126
*/
127127
export function createCancelableInterval<T>(
128128
fn: (token: CancelToken) => Promise<T>,

packages/cubejs-backend-shared/src/time.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
import { DateRange, extendMoment } from 'moment-range';
2-
import { unitOfTime } from 'moment-timezone';
1+
import type { unitOfTime } from 'moment-timezone';
2+
import type { DateRange } from 'moment-range';
3+
import Moment from 'moment-timezone';
4+
import { extendMoment } from 'moment-range';
35

4-
const Moment = require('moment-timezone');
5-
6-
const moment = extendMoment(Moment);
6+
const moment = extendMoment(Moment as any);
77

88
export type QueryDateRange = [string, string];
99
type SqlInterval = string;
@@ -236,11 +236,11 @@ export const utcToLocalTimeZone = (timezone: string, timestampFormat: string, ti
236236
const parsedTime = Date.parse(`${timestamp}Z`);
237237
// TODO parsedTime might be incorrect offset for conversion
238238
const offset = zone.utcOffset(parsedTime);
239-
const inDbTimeZoneDate = new Date(parsedTime - offset * 60 * 1000);
239+
const localTimeZoneDate = new Date(parsedTime - offset * 60 * 1000);
240240
if (timestampFormat === 'YYYY-MM-DD[T]HH:mm:ss.SSS[Z]' || timestampFormat === 'YYYY-MM-DDTHH:mm:ss.SSSZ') {
241-
return inDbTimeZoneDate.toJSON();
241+
return localTimeZoneDate.toJSON();
242242
} else if (timestampFormat === 'YYYY-MM-DDTHH:mm:ss.SSS') {
243-
return inDbTimeZoneDate.toJSON().replace('Z', '');
243+
return localTimeZoneDate.toJSON().replace('Z', '');
244244
}
245245
}
246246

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
import { TableStructure } from '@cubejs-backend/base-driver';
2+
import { DriverFactory } from './DriverFactory';
3+
import { QueryCache, QueryTuple, QueryWithParams } from './QueryCache';
4+
import {
5+
PreAggregationDescription,
6+
PreAggregations,
7+
TableCacheEntry,
8+
tablesToVersionEntries,
9+
VersionEntriesObj,
10+
VersionEntry
11+
} from './PreAggregations';
12+
13+
type PreAggregationLoadCacheOptions = {
14+
requestId?: string,
15+
dataSource: string,
16+
tablePrefixes?: string[],
17+
};
18+
19+
export class PreAggregationLoadCache {
20+
private readonly driverFactory: DriverFactory;
21+
22+
private queryCache: QueryCache;
23+
24+
private preAggregations: PreAggregations;
25+
26+
private readonly queryResults: any;
27+
28+
private readonly externalDriverFactory: any;
29+
30+
private readonly requestId: any;
31+
32+
private versionEntries: { [redisKey: string]: Promise<VersionEntriesObj> };
33+
34+
private tables: { [redisKey: string]: TableCacheEntry[] };
35+
36+
private tableColumnTypes: { [cacheKey: string]: { [tableName: string]: TableStructure } };
37+
38+
// TODO this is in memory cache structure as well however it depends on
39+
// data source only and load cache is per data source for now.
40+
// Make it per data source key in case load cache scope is broaden.
41+
private queryStageState: any;
42+
43+
private readonly dataSource: string;
44+
45+
private readonly tablePrefixes: string[] | null;
46+
47+
public constructor(
48+
clientFactory: DriverFactory,
49+
queryCache,
50+
preAggregations,
51+
options: PreAggregationLoadCacheOptions = { dataSource: 'default' }
52+
) {
53+
this.dataSource = options.dataSource;
54+
this.driverFactory = clientFactory;
55+
this.queryCache = queryCache;
56+
this.preAggregations = preAggregations;
57+
this.queryResults = {};
58+
this.externalDriverFactory = preAggregations.externalDriverFactory;
59+
this.requestId = options.requestId;
60+
this.tablePrefixes = options.tablePrefixes;
61+
this.versionEntries = {};
62+
this.tables = {};
63+
this.tableColumnTypes = {};
64+
}
65+
66+
protected async tablesFromCache(preAggregation, forceRenew: boolean = false) {
67+
let tables = forceRenew ? null : await this.queryCache.getCacheDriver().get(this.tablesCachePrefixKey(preAggregation));
68+
if (!tables) {
69+
tables = await this.preAggregations.getLoadCacheQueue(this.dataSource).executeInQueue(
70+
'query',
71+
`Fetch tables for ${preAggregation.preAggregationsSchema}`,
72+
{
73+
preAggregation, requestId: this.requestId
74+
},
75+
0,
76+
{ requestId: this.requestId }
77+
);
78+
}
79+
return tables;
80+
}
81+
82+
public async fetchTables(preAggregation: PreAggregationDescription) {
83+
if (preAggregation.external && !this.externalDriverFactory) {
84+
throw new Error('externalDriverFactory is not provided. Please use CUBEJS_DEV_MODE=true or provide Cube Store connection env variables for production usage.');
85+
}
86+
87+
const newTables = await this.fetchTablesNoCache(preAggregation);
88+
await this.queryCache.getCacheDriver().set(
89+
this.tablesCachePrefixKey(preAggregation),
90+
newTables,
91+
this.preAggregations.options.preAggregationsSchemaCacheExpire || 60 * 60
92+
);
93+
return newTables;
94+
}
95+
96+
private async fetchTablesNoCache(preAggregation: PreAggregationDescription) {
97+
const client = preAggregation.external ?
98+
await this.externalDriverFactory() :
99+
await this.driverFactory();
100+
if (this.tablePrefixes && client.getPrefixTablesQuery && this.preAggregations.options.skipExternalCacheAndQueue) {
101+
return client.getPrefixTablesQuery(preAggregation.preAggregationsSchema, this.tablePrefixes);
102+
}
103+
return client.getTablesQuery(preAggregation.preAggregationsSchema);
104+
}
105+
106+
public tablesCachePrefixKey(preAggregation: PreAggregationDescription) {
107+
return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES', `${preAggregation.dataSource}${preAggregation.preAggregationsSchema}${preAggregation.external ? '_EXT' : ''}`);
108+
}
109+
110+
protected async getTablesQuery(preAggregation) {
111+
const redisKey = this.tablesCachePrefixKey(preAggregation);
112+
if (!this.tables[redisKey]) {
113+
const tables = this.preAggregations.options.skipExternalCacheAndQueue && preAggregation.external ?
114+
await this.fetchTablesNoCache(preAggregation) :
115+
await this.tablesFromCache(preAggregation);
116+
if (tables === undefined) {
117+
throw new Error('Pre-aggregation tables are undefined.');
118+
}
119+
this.tables[redisKey] = tables;
120+
}
121+
return this.tables[redisKey];
122+
}
123+
124+
public async getTableColumnTypes(preAggregation: PreAggregationDescription, tableName: string): Promise<TableStructure> {
125+
const prefixKey = this.tablesCachePrefixKey(preAggregation);
126+
if (!this.tableColumnTypes[prefixKey]?.[tableName]) {
127+
if (!this.preAggregations.options.skipExternalCacheAndQueue && preAggregation.external) {
128+
throw new Error(`Lambda union with source data feature is supported only by external rollups stored in Cube Store but was invoked for '${preAggregation.preAggregationId}'`);
129+
}
130+
const client = await this.externalDriverFactory();
131+
const columnTypes = await client.tableColumnTypes(tableName);
132+
if (!this.tableColumnTypes[prefixKey]) {
133+
this.tableColumnTypes[prefixKey] = {};
134+
}
135+
this.tableColumnTypes[prefixKey][tableName] = columnTypes;
136+
}
137+
return this.tableColumnTypes[prefixKey][tableName];
138+
}
139+
140+
private async calculateVersionEntries(preAggregation): Promise<VersionEntriesObj> {
141+
let versionEntries = tablesToVersionEntries(
142+
preAggregation.preAggregationsSchema,
143+
await this.getTablesQuery(preAggregation)
144+
);
145+
// It presumes strong consistency guarantees for external pre-aggregation tables ingestion
146+
if (!preAggregation.external) {
147+
const [,, queries] = await this.fetchQueryStageState();
148+
const targetTableNamesInQueue = (Object.keys(queries))
149+
.map(q => PreAggregations.targetTableName(queries[q].query.newVersionEntry));
150+
151+
versionEntries = versionEntries.filter(
152+
e => targetTableNamesInQueue.indexOf(PreAggregations.targetTableName(e)) === -1
153+
);
154+
}
155+
156+
const byContent: { [key: string]: VersionEntry } = {};
157+
const byStructure: { [key: string]: VersionEntry } = {};
158+
const byTableName: { [key: string]: VersionEntry } = {};
159+
160+
versionEntries.forEach(e => {
161+
const contentKey = `${e.table_name}_${e.content_version}`;
162+
if (!byContent[contentKey]) {
163+
byContent[contentKey] = e;
164+
}
165+
const structureKey = `${e.table_name}_${e.structure_version}`;
166+
if (!byStructure[structureKey]) {
167+
byStructure[structureKey] = e;
168+
}
169+
if (!byTableName[e.table_name]) {
170+
byTableName[e.table_name] = e;
171+
}
172+
});
173+
174+
return { versionEntries, byContent, byStructure, byTableName };
175+
}
176+
177+
public async getVersionEntries(preAggregation): Promise<VersionEntriesObj> {
178+
if (this.tablePrefixes && !this.tablePrefixes.find(p => preAggregation.tableName.split('.')[1].startsWith(p))) {
179+
throw new Error(`Load cache tries to load table ${preAggregation.tableName} outside of tablePrefixes filter: ${this.tablePrefixes.join(', ')}`);
180+
}
181+
const redisKey = this.tablesCachePrefixKey(preAggregation);
182+
if (!this.versionEntries[redisKey]) {
183+
this.versionEntries[redisKey] = this.calculateVersionEntries(preAggregation).catch(e => {
184+
delete this.versionEntries[redisKey];
185+
throw e;
186+
});
187+
}
188+
return this.versionEntries[redisKey];
189+
}
190+
191+
public async keyQueryResult(sqlQuery: QueryWithParams, waitForRenew: boolean, priority: number) {
192+
const [query, values, queryOptions]: QueryTuple = Array.isArray(sqlQuery) ? sqlQuery : [sqlQuery, [], {}];
193+
194+
if (!this.queryResults[this.queryCache.queryRedisKey([query, values])]) {
195+
this.queryResults[this.queryCache.queryRedisKey([query, values])] = await this.queryCache.cacheQueryResult(
196+
query,
197+
<string[]>values,
198+
[query, <string[]>values],
199+
60 * 60,
200+
{
201+
renewalThreshold: this.queryCache.options.refreshKeyRenewalThreshold
202+
|| queryOptions?.renewalThreshold || 2 * 60,
203+
renewalKey: [query, values],
204+
waitForRenew,
205+
priority,
206+
requestId: this.requestId,
207+
dataSource: this.dataSource,
208+
useInMemory: true,
209+
external: queryOptions?.external
210+
}
211+
);
212+
}
213+
return this.queryResults[this.queryCache.queryRedisKey([query, values])];
214+
}
215+
216+
public hasKeyQueryResult(keyQuery) {
217+
return !!this.queryResults[this.queryCache.queryRedisKey(keyQuery)];
218+
}
219+
220+
public async getQueryStage(stageQueryKey) {
221+
const queue = await this.preAggregations.getQueue(this.dataSource);
222+
await this.fetchQueryStageState(queue);
223+
return queue.getQueryStage(stageQueryKey, undefined, this.queryStageState);
224+
}
225+
226+
protected async fetchQueryStageState(queue?) {
227+
queue = queue || await this.preAggregations.getQueue(this.dataSource);
228+
if (!this.queryStageState) {
229+
this.queryStageState = await queue.fetchQueryStageState();
230+
}
231+
return this.queryStageState;
232+
}
233+
234+
public async reset(preAggregation) {
235+
await this.tablesFromCache(preAggregation, true);
236+
this.tables = {};
237+
this.tableColumnTypes = {};
238+
this.queryStageState = undefined;
239+
this.versionEntries = {};
240+
}
241+
}

0 commit comments

Comments
 (0)