Skip to content

Commit cdfbd1e

Browse files
authored
feat: ksql and rollup pre-aggregations (#8619)
Added support for pre-aggregations for ksqldb using select statement and direct load from kafka
1 parent 70ff901 commit cdfbd1e

File tree

21 files changed

+594
-36
lines changed

21 files changed

+594
-36
lines changed

packages/cubejs-base-driver/src/driver.interface.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ export interface StreamTableData extends DownloadTableBase {
9090
export interface StreamingSourceTableData extends DownloadTableBase {
9191
streamingTable: string;
9292
selectStatement?: string;
93+
sourceTable?: any,
9394
partitions?: number;
9495
streamOffset?: string;
9596
streamingSource: {
@@ -130,6 +131,7 @@ export type StreamOptions = {
130131

131132
export type StreamingSourceOptions = {
132133
streamOffset?: boolean;
134+
outputColumnTypes?: TableColumn[]
133135
};
134136

135137
export interface DownloadQueryResultsBase {

packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type CreateTableOptions = {
4646
files?: string[]
4747
aggregations?: string
4848
selectStatement?: string
49+
sourceTable?: any
4950
sealAt?: string
5051
delimiter?: string
5152
};
@@ -118,6 +119,9 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface {
118119
if (options.selectStatement) {
119120
withEntries.push(`select_statement = ${escape(options.selectStatement)}`);
120121
}
122+
if (options.sourceTable) {
123+
withEntries.push(`source_table = ${escape(`CREATE TABLE ${options.sourceTable.tableName} (${options.sourceTable.types.map(t => `${t.name} ${this.fromGenericType(t.type)}`).join(', ')})`)}`);
124+
}
121125
if (options.streamOffset) {
122126
withEntries.push(`stream_offset = '${options.streamOffset}'`);
123127
}
@@ -431,6 +435,7 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface {
431435
indexes,
432436
files: locations,
433437
selectStatement: tableData.selectStatement,
438+
sourceTable: tableData.sourceTable,
434439
streamOffset: tableData.streamOffset,
435440
sealAt
436441
};

packages/cubejs-ksql-driver/src/KsqlDriver.ts

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import {
1010
} from '@cubejs-backend/shared';
1111
import {
1212
BaseDriver, DriverCapabilities,
13-
DriverInterface, QueryOptions,
13+
DriverInterface, TableColumn,
1414
} from '@cubejs-backend/base-driver';
1515
import { Kafka } from 'kafkajs';
1616
import sqlstring, { format as formatSql } from 'sqlstring';
@@ -64,6 +64,12 @@ type KsqlDescribeResponse = {
6464
}
6565
};
6666

67+
type KsqlQueryOptions = {
68+
outputColumnTypes?: TableColumn[],
69+
streamOffset?: string,
70+
selectStatement?: string,
71+
};
72+
6773
/**
6874
* KSQL driver class.
6975
*/
@@ -161,7 +167,7 @@ export class KsqlDriver extends BaseDriver implements DriverInterface {
161167
}
162168
}
163169

164-
public async query<R = unknown>(query: string, values?: unknown[], options: { streamOffset?: string } = {}): Promise<R> {
170+
public async query<R = unknown>(query: string, values?: unknown[], options: KsqlQueryOptions = {}): Promise<R> {
165171
if (query.toLowerCase().startsWith('select')) {
166172
throw new Error('Select queries for ksql allowed only from Cube Store. In order to query ksql create pre-aggregation first.');
167173
}
@@ -261,13 +267,15 @@ export class KsqlDriver extends BaseDriver implements DriverInterface {
261267
}
262268

263269
// eslint-disable-next-line @typescript-eslint/no-unused-vars
264-
public loadPreAggregationIntoTable(preAggregationTableName: string, loadSql: string, params: any[], options: any): Promise<any> {
265-
return this.query(loadSql.replace(preAggregationTableName, this.tableDashName(preAggregationTableName)), params, { streamOffset: options?.streamOffset });
270+
public loadPreAggregationIntoTable(preAggregationTableName: string, loadSql: string, params: any[], options: KsqlQueryOptions): Promise<any> {
271+
const { streamOffset } = options || {};
272+
return this.query(loadSql.replace(preAggregationTableName, this.tableDashName(preAggregationTableName)), params, { streamOffset });
266273
}
267274

268275
// eslint-disable-next-line @typescript-eslint/no-unused-vars
269276
public async downloadTable(table: string, options: any): Promise<any> {
270-
return this.getStreamingTableData(this.tableDashName(table), { streamOffset: options?.streamOffset });
277+
const { streamOffset } = options || {};
278+
return this.getStreamingTableData(this.tableDashName(table), { streamOffset });
271279
}
272280

273281
// eslint-disable-next-line @typescript-eslint/no-unused-vars
@@ -278,11 +286,12 @@ export class KsqlDriver extends BaseDriver implements DriverInterface {
278286
}
279287

280288
const selectStatement = sqlstring.format(query, params);
281-
return this.getStreamingTableData(table, { selectStatement, streamOffset: options?.streamOffset });
289+
const { streamOffset, outputColumnTypes } = options || {};
290+
return this.getStreamingTableData(table, { selectStatement, streamOffset, outputColumnTypes });
282291
}
283292

284-
private async getStreamingTableData(streamingTable: string, options: { selectStatement?: string, streamOffset?: string } = {}) {
285-
const { selectStatement, streamOffset } = options;
293+
private async getStreamingTableData(streamingTable: string, options: KsqlQueryOptions = {}) {
294+
const { selectStatement, streamOffset, outputColumnTypes } = options;
286295
const describe = await this.describeTable(streamingTable);
287296
const name = this.config.streamingSourceName || 'default';
288297
const kafkaDirectDownload = !!this.config.kafkaHost;
@@ -304,13 +313,20 @@ export class KsqlDriver extends BaseDriver implements DriverInterface {
304313
url: this.config.url
305314
}
306315
};
316+
const sourceTableTypes = await this.tableColumnTypes(streamingTable, describe);
317+
streamingTable = kafkaDirectDownload ? describe.sourceDescription?.topic : streamingTable;
318+
307319
return {
308-
types: await this.tableColumnTypes(streamingTable, describe),
320+
types: outputColumnTypes || sourceTableTypes,
309321
partitions: describe.sourceDescription?.partitions,
310-
streamingTable: kafkaDirectDownload ? describe.sourceDescription?.topic : streamingTable,
322+
streamingTable,
311323
streamOffset,
312324
selectStatement,
313-
streamingSource
325+
streamingSource,
326+
sourceTable: outputColumnTypes ? {
327+
types: sourceTableTypes,
328+
tableName: streamingTable
329+
} : null
314330
};
315331
}
316332

@@ -344,7 +360,8 @@ export class KsqlDriver extends BaseDriver implements DriverInterface {
344360

345361
public capabilities(): DriverCapabilities {
346362
return {
347-
streamingSource: true
363+
streamingSource: true,
364+
unloadWithoutTempTable: true,
348365
};
349366
}
350367
}

packages/cubejs-ksql-driver/src/KsqlQuery.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ export class KsqlQuery extends BaseQuery {
5555
return `\`${name}\``;
5656
}
5757

58+
public castToString(sql: string) {
59+
return `CAST(${sql} as varchar(255))`;
60+
}
61+
5862
public concatStringsSql(strings: string[]) {
5963
return `CONCAT(${strings.join(', ')})`;
6064
}
@@ -111,7 +115,7 @@ export class KsqlQuery extends BaseQuery {
111115
}
112116

113117
public static extractTableFromSimpleSelectAsteriskQuery(sql: string) {
114-
const match = sql.match(/^\s*select\s+\*\s+from\s+([a-zA-Z0-9_\-`".*]+)\s*/i);
118+
const match = sql.replace(/\n/g, ' ').match(/^\s*select\s+.*\s+from\s+([a-zA-Z0-9_\-`".*]+)\s*/i);
115119
return match && match[1];
116120
}
117121
}

packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ function getStructureVersion(preAggregation) {
104104
if (preAggregation.streamOffset) {
105105
versionArray.push(preAggregation.streamOffset);
106106
}
107+
if (preAggregation.outputColumnTypes) {
108+
versionArray.push(preAggregation.outputColumnTypes);
109+
}
107110

108111
return version(versionArray.length === 1 ? versionArray[0] : versionArray);
109112
}
@@ -815,6 +818,9 @@ export class PreAggregationLoader {
815818
if (this.preAggregation.streamOffset) {
816819
versionArray.push(this.preAggregation.streamOffset);
817820
}
821+
if (this.preAggregation.outputColumnTypes) {
822+
versionArray.push(this.preAggregation.outputColumnTypes);
823+
}
818824
versionArray.push(invalidationKeys);
819825
return version(versionArray);
820826
}
@@ -964,7 +970,11 @@ export class PreAggregationLoader {
964970
targetTableName,
965971
query,
966972
params,
967-
{ streamOffset: this.preAggregation.streamOffset, ...queryOptions }
973+
{
974+
streamOffset: this.preAggregation.streamOffset,
975+
outputColumnTypes: this.preAggregation.outputColumnTypes,
976+
...queryOptions
977+
}
968978
));
969979

970980
await this.createIndexes(client, newVersionEntry, saveCancelFn, queryOptions);
@@ -1107,7 +1117,11 @@ export class PreAggregationLoader {
11071117
targetTableName,
11081118
query,
11091119
params,
1110-
{ streamOffset: this.preAggregation.streamOffset, ...queryOptions }
1120+
{
1121+
streamOffset: this.preAggregation.streamOffset,
1122+
outputColumnTypes: this.preAggregation.outputColumnTypes,
1123+
...queryOptions
1124+
}
11111125
));
11121126

11131127
return queryOptions;
@@ -1156,6 +1170,7 @@ export class PreAggregationLoader {
11561170
sql,
11571171
params, {
11581172
streamOffset: this.preAggregation.streamOffset,
1173+
outputColumnTypes: this.preAggregation.outputColumnTypes,
11591174
...queryOptions,
11601175
...capabilities,
11611176
...this.getStreamingOptions(),
@@ -1261,7 +1276,11 @@ export class PreAggregationLoader {
12611276
tableData.rowStream = stream;
12621277
}
12631278
} else {
1264-
tableData = await saveCancelFn(client.downloadTable(table, { streamOffset: this.preAggregation.streamOffset, ...externalDriverCapabilities }));
1279+
tableData = await saveCancelFn(client.downloadTable(table, {
1280+
streamOffset: this.preAggregation.streamOffset,
1281+
outputColumnTypes: this.preAggregation.outputColumnTypes,
1282+
...externalDriverCapabilities
1283+
}));
12651284
}
12661285

12671286
if (!tableData.types) {

packages/cubejs-schema-compiler/src/adapter/BaseQuery.js

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2746,9 +2746,12 @@ export class BaseQuery {
27462746
}
27472747

27482748
newSubQueryForCube(cube, options) {
2749-
return this.options.queryFactory
2750-
? this.options.queryFactory.createQuery(cube, this.compilers, this.subQueryOptions(options))
2751-
: this.newSubQuery(options);
2749+
if (this.options.queryFactory) {
2750+
options.paramAllocator = null;
2751+
return this.options.queryFactory.createQuery(cube, this.compilers, this.subQueryOptions(options));
2752+
}
2753+
2754+
return this.newSubQuery(options);
27522755
}
27532756

27542757
subQueryOptions(options) {
@@ -2942,6 +2945,60 @@ export class BaseQuery {
29422945
);
29432946
}
29442947

2948+
preAggregationOutputColumnTypes(cube, preAggregation) {
2949+
return this.cacheValue(
2950+
['preAggregationOutputColumnTypes', cube, JSON.stringify(preAggregation)],
2951+
() => {
2952+
if (!preAggregation.outputColumnTypes) {
2953+
return null;
2954+
}
2955+
2956+
if (preAggregation.type === 'rollup') {
2957+
const query = this.preAggregations.rollupPreAggregationQuery(cube, preAggregation);
2958+
2959+
const evaluatedMapOutputColumnTypes = preAggregation.outputColumnTypes.reduce((acc, outputColumnType) => {
2960+
acc.set(outputColumnType.name, outputColumnType);
2961+
return acc;
2962+
}, new Map());
2963+
2964+
const findSchemaType = member => {
2965+
const outputSchemaType = evaluatedMapOutputColumnTypes.get(member);
2966+
if (!outputSchemaType) {
2967+
throw new UserError(`Output schema type for ${member} not found in pre-aggregation ${preAggregation}`);
2968+
}
2969+
2970+
return {
2971+
name: this.aliasName(member),
2972+
type: outputSchemaType.type,
2973+
};
2974+
};
2975+
2976+
// The order of the output columns is important, it should match the order in the select statement
2977+
const outputColumnTypes = [
2978+
...(query.dimensions || []).map(d => findSchemaType(d.dimension)),
2979+
...(query.timeDimensions || []).map(t => ({
2980+
name: `${this.aliasName(t.dimension)}_${t.granularity}`,
2981+
type: 'TIMESTAMP'
2982+
})),
2983+
...(query.measures || []).map(m => findSchemaType(m.measure)),
2984+
];
2985+
2986+
return outputColumnTypes;
2987+
}
2988+
throw new UserError('Output schema is only supported for rollup pre-aggregations');
2989+
},
2990+
{ inputProps: { }, cache: this.queryCache }
2991+
);
2992+
}
2993+
2994+
preAggregationUniqueKeyColumns(cube, preAggregation) {
2995+
if (preAggregation.uniqueKeyColumns) {
2996+
return preAggregation.uniqueKeyColumns.map(key => this.aliasName(`${cube}.${key}`));
2997+
}
2998+
2999+
return this.dimensionColumns();
3000+
}
3001+
29453002
preAggregationReadOnly(_cube, _preAggregation) {
29463003
return false;
29473004
}

packages/cubejs-schema-compiler/src/adapter/PreAggregations.js

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import R from 'ramda';
2+
import { FROM_PARTITION_RANGE, TO_PARTITION_RANGE } from '@cubejs-backend/shared';
23

34
import { UserError } from '../compiler/UserError';
45

@@ -189,7 +190,7 @@ export class PreAggregations {
189190

190191
const uniqueKeyColumnsDefault = () => null;
191192
const uniqueKeyColumns = ({
192-
rollup: () => queryForSqlEvaluation.dimensionColumns(),
193+
rollup: () => queryForSqlEvaluation.preAggregationUniqueKeyColumns(cube, preAggregation),
193194
originalSql: () => preAggregation.uniqueKeyColumns || null
194195
}[preAggregation.type] || uniqueKeyColumnsDefault)();
195196

@@ -209,6 +210,7 @@ export class PreAggregations {
209210
preAggregationsSchema: queryForSqlEvaluation.preAggregationSchema(),
210211
loadSql: queryForSqlEvaluation.preAggregationLoadSql(cube, preAggregation, tableName),
211212
sql: queryForSqlEvaluation.preAggregationSql(cube, preAggregation),
213+
outputColumnTypes: queryForSqlEvaluation.preAggregationOutputColumnTypes(cube, preAggregation),
212214
uniqueKeyColumns,
213215
aggregationsColumns,
214216
dataSource: queryForSqlEvaluation.dataSource,
@@ -219,7 +221,7 @@ export class PreAggregations {
219221
queryForSqlEvaluation.parseSecondDuration(preAggregation.refreshKey.updateWindow),
220222
preAggregationStartEndQueries:
221223
(preAggregation.partitionGranularity || references.timeDimensions[0]?.granularity) &&
222-
this.refreshRangeQuery().preAggregationStartEndQueries(cube, preAggregation),
224+
this.refreshRangeQuery(cube).preAggregationStartEndQueries(cube, preAggregation),
223225
matchedTimeDimensionDateRange:
224226
preAggregation.partitionGranularity && (
225227
matchedTimeDimension && matchedTimeDimension.boundaryDateRangeFormatted() ||
@@ -1041,12 +1043,15 @@ export class PreAggregations {
10411043
return { preAggregations, result };
10421044
}
10431045

1044-
refreshRangeQuery() {
1045-
return this.query.newSubQuery({
1046-
rowLimit: null,
1047-
offset: null,
1048-
preAggregationQuery: true,
1049-
});
1046+
refreshRangeQuery(cube) {
1047+
return this.query.newSubQueryForCube(
1048+
cube,
1049+
{
1050+
rowLimit: null,
1051+
offset: null,
1052+
preAggregationQuery: true,
1053+
}
1054+
);
10501055
}
10511056

10521057
originalSqlPreAggregationQuery(cube, aggregation) {

packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,12 @@ export class CubeEvaluator extends CubeSymbols {
273273
preAggregation.refreshRangeEnd = preAggregation.buildRangeEnd;
274274
delete preAggregation.buildRangeEnd;
275275
}
276+
277+
if (preAggregation.outputColumnTypes) {
278+
preAggregation.outputColumnTypes.forEach(column => {
279+
column.name = this.evaluateReferences(cube.name, column.member, { originalSorting: true });
280+
});
281+
}
276282
}
277283
}
278284
}

0 commit comments

Comments
 (0)