Skip to content

Commit ab9539a

Browse files
committed
feat: Allow persisting multiple pre-aggregation structure versions to support staging pre-aggregation warm-up environments and multiple timezones
1 parent 32e45e2 commit ab9539a

File tree

5 files changed

+413
-45
lines changed

5 files changed

+413
-45
lines changed

packages/cubejs-query-orchestrator/driver/BaseDriver.js

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ const { cancelCombinator } = require('./utils');
44
const sortByKeys = (unordered) => {
55
const ordered = {};
66

7-
Object.keys(unordered).sort().forEach(function(key) {
7+
Object.keys(unordered).sort().forEach((key) => {
88
ordered[key] = unordered[key];
99
});
1010

@@ -13,16 +13,16 @@ const sortByKeys = (unordered) => {
1313

1414
const DbTypeToGenericType = {
1515
'timestamp without time zone': 'timestamp',
16-
'integer': 'int',
16+
integer: 'int',
1717
'character varying': 'text',
18-
'varchar': 'text',
19-
'text': 'text',
20-
'string': 'text',
21-
'boolean': 'boolean',
22-
'bigint': 'bigint',
23-
'time': 'string',
24-
'datetime': 'timestamp',
25-
'date': 'date',
18+
varchar: 'text',
19+
text: 'text',
20+
string: 'text',
21+
boolean: 'boolean',
22+
bigint: 'bigint',
23+
time: 'string',
24+
datetime: 'timestamp',
25+
date: 'date',
2626
'double precision': 'decimal'
2727
};
2828

@@ -59,7 +59,7 @@ class BaseDriver {
5959

6060
const reduceCb = (result, i) => {
6161
let schema = (result[i.table_schema] || {});
62-
let tables = (schema[i.table_name] || []);
62+
const tables = (schema[i.table_name] || []);
6363

6464
tables.push({ name: i.column_name, type: i.data_type, attributes: i.key_type ? ['primaryKey'] : [] });
6565

@@ -79,10 +79,11 @@ class BaseDriver {
7979
`SELECT schema_name FROM information_schema.schemata WHERE schema_name = ${this.param(0)}`,
8080
[schemaName]
8181
).then((schemas) => {
82-
if (schemas.length === 0) {
83-
return this.query("CREATE SCHEMA IF NOT EXISTS " + schemaName);
84-
}
85-
});
82+
if (schemas.length === 0) {
83+
return this.query(`CREATE SCHEMA IF NOT EXISTS ${schemaName}`);
84+
}
85+
return null;
86+
});
8687
}
8788

8889
getTablesQuery(schemaName) {
@@ -132,6 +133,7 @@ class BaseDriver {
132133
}
133134
}
134135

136+
// eslint-disable-next-line no-unused-vars
135137
toColumnValue(value, genericType) {
136138
return value;
137139
}

packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,33 +17,35 @@ function version(cacheKey) {
1717
for (let i = 0; i < 5; i++) {
1818
const byte = digestBuffer.readUInt8(i);
1919
shiftCounter += 8;
20+
// eslint-disable-next-line operator-assignment,no-bitwise
2021
residue = (byte << (shiftCounter - 8)) | residue;
22+
// eslint-disable-next-line no-bitwise
2123
while (residue >> 5) {
2224
result += hashCharset.charAt(residue % 32);
2325
shiftCounter -= 5;
26+
// eslint-disable-next-line operator-assignment,no-bitwise
2427
residue = residue >> 5;
2528
}
2629
}
2730
result += hashCharset.charAt(residue % 32);
2831
return result;
2932
}
3033

31-
const tablesToVersionEntries = (schema, tables) => {
32-
return R.sortBy(
33-
table => -table.last_updated_at,
34-
tables.map(table => {
35-
const match = (table.table_name || table.TABLE_NAME).match(/(.+)_(.+)_(.+)_(.+)/);
36-
if (match) {
37-
return {
38-
table_name: `${schema}.${match[1]}`,
39-
content_version: match[2],
40-
structure_version: match[3],
41-
last_updated_at: parseInt(match[4], 10)
42-
}
43-
}
44-
}).filter(R.identity)
45-
)
46-
};
34+
const tablesToVersionEntries = (schema, tables) => R.sortBy(
35+
table => -table.last_updated_at,
36+
tables.map(table => {
37+
const match = (table.table_name || table.TABLE_NAME).match(/(.+)_(.+)_(.+)_(.+)/);
38+
if (match) {
39+
return {
40+
table_name: `${schema}.${match[1]}`,
41+
content_version: match[2],
42+
structure_version: match[3],
43+
last_updated_at: parseInt(match[4], 10)
44+
};
45+
}
46+
return null;
47+
}).filter(R.identity)
48+
);
4749

4850
class PreAggregationLoadCache {
4951
constructor(redisPrefix, clientFactory, queryCache, preAggregations, options) {
@@ -177,6 +179,7 @@ class PreAggregationLoader {
177179
this.waitForRenew = options.waitForRenew;
178180
this.externalDriverFactory = preAggregations.externalDriverFactory;
179181
this.requestId = options.requestId;
182+
this.structureVersionPersistTime = preAggregations.structureVersionPersistTime;
180183
}
181184

182185
async loadPreAggregation() {
@@ -225,7 +228,7 @@ class PreAggregationLoader {
225228

226229
const versionEntries = await this.loadCache.getVersionEntries(this.preAggregation);
227230

228-
const getVersionEntryByContentVersion = (versionEntries) => versionEntries.find(
231+
const getVersionEntryByContentVersion = (entries) => entries.find(
229232
v => v.table_name === this.preAggregation.tableName && v.content_version === contentVersion
230233
);
231234

@@ -237,6 +240,7 @@ class PreAggregationLoader {
237240
// TODO this check can be redundant due to structure version is already checked in loadPreAggregation()
238241
if (
239242
!this.waitForRenew &&
243+
// eslint-disable-next-line no-use-before-define
240244
await this.loadCache.getQueryStage(PreAggregations.preAggregationQueryCacheKey(this.preAggregation))
241245
) {
242246
const versionEntryByStructureVersion = versionEntries.find(
@@ -253,7 +257,8 @@ class PreAggregationLoader {
253257
await this.driverFactory();
254258
await client.createSchemaIfNotExists(this.preAggregation.preAggregationsSchema);
255259
}
256-
const versionEntry = versionEntries.find(e => e.table_name === this.preAggregation.tableName); // TODO can be array instead of last
260+
// TODO can be array instead of last
261+
const versionEntry = versionEntries.find(e => e.table_name === this.preAggregation.tableName);
257262
const newVersionEntry = {
258263
table_name: this.preAggregation.tableName,
259264
structure_version: structureVersion,
@@ -363,6 +368,7 @@ class PreAggregationLoader {
363368
requestId: this.requestId
364369
},
365370
priority,
371+
// eslint-disable-next-line no-use-before-define
366372
{ stageQueryKey: PreAggregations.preAggregationQueryCacheKey(this.preAggregation), requestId: this.requestId }
367373
);
368374
}
@@ -526,8 +532,17 @@ class PreAggregationLoader {
526532
R.toPairs,
527533
R.map(p => p[1][0])
528534
)(versionEntries);
535+
536+
const structureVersionsToSave = R.pipe(
537+
R.filter(v => new Date().getTime() - v.last_updated_at < this.structureVersionPersistTime * 1000),
538+
R.groupBy(v => `${v.table_name}_${v.structure_version}`),
539+
R.toPairs,
540+
R.map(p => p[1][0])
541+
)(versionEntries);
542+
529543
const tablesToSave =
530544
(await this.preAggregations.tablesUsed())
545+
.concat(structureVersionsToSave.map(v => this.targetTableName(v)))
531546
.concat(versionEntriesToSave.map(v => this.targetTableName(v)))
532547
.concat([justCreatedTable]);
533548
const toDrop = actualTables
@@ -552,14 +567,16 @@ class PreAggregations {
552567
new RedisCacheDriver({ pool: options.redisPool }) :
553568
new LocalCacheDriver();
554569
this.externalDriverFactory = options.externalDriverFactory;
570+
this.structureVersionPersistTime = options.structureVersionPersistTime || 60 * 60 * 24 * 30;
571+
this.usedTablePersistTime = options.usedTablePersistTime || 600;
555572
}
556573

557574
tablesUsedRedisKey(tableName) {
558575
return `SQL_PRE_AGGREGATIONS_${this.redisPrefix}_TABLES_USED_${tableName}`;
559576
}
560577

561578
async addTableUsed(tableName) {
562-
return this.cacheDriver.set(this.tablesUsedRedisKey(tableName), true, 600);
579+
return this.cacheDriver.set(this.tablesUsedRedisKey(tableName), true, this.usedTablePersistTime);
563580
}
564581

565582
async tablesUsed() {

packages/cubejs-query-orchestrator/package.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
"node": ">=8.11.1"
1313
},
1414
"scripts": {
15-
"test": "jest --runInBand --verbose"
15+
"test": "jest --runInBand --verbose",
16+
"lint": "eslint orchestrator/*.js driver/*.js"
1617
},
1718
"dependencies": {
1819
"generic-pool": "^3.7.1",
@@ -21,6 +22,8 @@
2122
},
2223
"devDependencies": {
2324
"eslint": "^6.8.0",
25+
"eslint-config-airbnb-base": "^13.1.0",
26+
"eslint-plugin-import": "^2.16.0",
2427
"eslint-plugin-node": "^5.2.1",
2528
"jest": "^25.1.0"
2629
},

packages/cubejs-query-orchestrator/test/QueryOrchestrator.test.js

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class MockDriver {
3535
}
3636

3737
async dropTable(tableName) {
38-
this.tables = this.tables.filter(t => t !== tableName.split('.')[1]);
38+
this.tables = this.tables.filter(t => t !== tableName);
3939
return this.query(`DROP TABLE ${tableName}`);
4040
}
4141
}
@@ -49,7 +49,8 @@ describe('QueryOrchestrator', () => {
4949
preAggregationsOptions: {
5050
queueOptions: {
5151
executionTimeout: 1
52-
}
52+
},
53+
usedTablePersistTime: 1
5354
}
5455
}
5556
);
@@ -162,4 +163,64 @@ describe('QueryOrchestrator', () => {
162163
}
163164
expect(mockDriver.cancelledQueries[0]).toMatch(/orders_too_big/);
164165
});
166+
167+
test('save structure versions', async () => {
168+
mockDriver.tables = [];
169+
await queryOrchestrator.fetchQuery({
170+
query: `SELECT * FROM stb_pre_aggregations.orders`,
171+
values: [],
172+
cacheKeyQueries: {
173+
renewalThreshold: 21600,
174+
queries: []
175+
},
176+
preAggregations: [{
177+
preAggregationsSchema: "stb_pre_aggregations",
178+
tableName: "stb_pre_aggregations.orders",
179+
loadSql: ["CREATE TABLE stb_pre_aggregations.orders AS SELECT * FROM public.orders", []],
180+
invalidateKeyQueries: [["SELECT 1", []]]
181+
}],
182+
renewQuery: true,
183+
requestId: 'save structure versions'
184+
});
185+
186+
await queryOrchestrator.fetchQuery({
187+
query: `SELECT * FROM stb_pre_aggregations.orders`,
188+
values: [],
189+
cacheKeyQueries: {
190+
renewalThreshold: 21600,
191+
queries: []
192+
},
193+
preAggregations: [{
194+
preAggregationsSchema: "stb_pre_aggregations",
195+
tableName: "stb_pre_aggregations.orders",
196+
loadSql: ["CREATE TABLE stb_pre_aggregations.orders AS SELECT * FROM public.orders1", []],
197+
invalidateKeyQueries: [["SELECT 1", []]]
198+
}],
199+
renewQuery: true,
200+
requestId: 'save structure versions'
201+
});
202+
203+
await new Promise(resolve => setTimeout(() => resolve(), 1000));
204+
205+
for (let i = 0; i < 5; i++) {
206+
await queryOrchestrator.fetchQuery({
207+
query: `SELECT * FROM stb_pre_aggregations.orders`,
208+
values: [],
209+
cacheKeyQueries: {
210+
renewalThreshold: 21600,
211+
queries: []
212+
},
213+
preAggregations: [{
214+
preAggregationsSchema: "stb_pre_aggregations",
215+
tableName: "stb_pre_aggregations.orders",
216+
loadSql: ["CREATE TABLE stb_pre_aggregations.orders AS SELECT * FROM public.orders", []],
217+
invalidateKeyQueries: [["SELECT 2", []]]
218+
}],
219+
renewQuery: true,
220+
requestId: 'save structure versions'
221+
});
222+
}
223+
expect(mockDriver.tables).toContainEqual(expect.stringMatching(/orders_f5v4jw3p_4eysppzt/));
224+
expect(mockDriver.tables).toContainEqual(expect.stringMatching(/orders_mjooke4_ezlvkhjl/));
225+
});
165226
});

0 commit comments

Comments
 (0)