Skip to content

Commit 15badfe

Browse files
authored
fix: Use last rollup table types to avoid type guessing for unionWithSourceData lambda queries (#6337)
1 parent 0f85e06 commit 15badfe

File tree

4 files changed

+88
-24
lines changed

4 files changed

+88
-24
lines changed

packages/cubejs-base-driver/src/BaseDriver.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ export abstract class BaseDriver implements DriverInterface {
362362
return value;
363363
}
364364

365-
public async tableColumnTypes(table: string) {
365+
public async tableColumnTypes(table: string): Promise<TableStructure> {
366366
const [schema, name] = table.split('.');
367367

368368
const columns = await this.query<TableColumnQueryResult>(

packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {
1212
StreamTableData,
1313
StreamingSourceTableData,
1414
QueryOptions,
15-
ExternalDriverCompatibilities,
15+
ExternalDriverCompatibilities, TableStructure, TableColumnQueryResult,
1616
} from '@cubejs-backend/base-driver';
1717
import { getEnv } from '@cubejs-backend/shared';
1818
import { format as formatSql, escape } from 'sqlstring';
@@ -163,6 +163,22 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface {
163163
);
164164
}
165165

166+
public async tableColumnTypes(table: string): Promise<TableStructure> {
167+
const [schema, name] = table.split('.');
168+
169+
const columns = await this.query<TableColumnQueryResult>(
170+
`SELECT column_name as ${this.quoteIdentifier('column_name')},
171+
table_name as ${this.quoteIdentifier('table_name')},
172+
table_schema as ${this.quoteIdentifier('table_schema')},
173+
data_type as ${this.quoteIdentifier('data_type')}
174+
FROM information_schema.columns
175+
WHERE table_name = ${this.param(0)} AND table_schema = ${this.param(1)}`,
176+
[name, schema]
177+
);
178+
179+
return columns.map(c => ({ name: c.column_name, type: this.toGenericType(c.data_type) }));
180+
}
181+
166182
public quoteIdentifier(identifier: string): string {
167183
return `\`${identifier}\``;
168184
}

packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ import {
2222
DriverInterface,
2323
InlineTable,
2424
SaveCancelFn,
25-
StreamOptions,
26-
UnloadOptions
25+
StreamOptions, TableStructure,
26+
UnloadOptions,
2727
} from '@cubejs-backend/base-driver';
2828
import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver';
2929
import { PreAggTableToTempTable, Query, QueryBody, QueryCache, QueryTuple, QueryWithParams } from './QueryCache';
@@ -263,6 +263,8 @@ class PreAggregationLoadCache {
263263

264264
private tables: { [redisKey: string]: TableCacheEntry[] };
265265

266+
private tableColumnTypes: { [cacheKey: string]: { [tableName: string]: TableStructure } };
267+
266268
// TODO this is in memory cache structure as well however it depends on
267269
// data source only and load cache is per data source for now.
268270
// Make it per data source key in case load cache scope is broaden.
@@ -290,10 +292,11 @@ class PreAggregationLoadCache {
290292
this.tablePrefixes = options.tablePrefixes;
291293
this.versionEntries = {};
292294
this.tables = {};
295+
this.tableColumnTypes = {};
293296
}
294297

295298
protected async tablesFromCache(preAggregation, forceRenew?) {
296-
let tables = forceRenew ? null : await this.queryCache.getCacheDriver().get(this.tablesRedisKey(preAggregation));
299+
let tables = forceRenew ? null : await this.queryCache.getCacheDriver().get(this.tablesCachePrefixKey(preAggregation));
297300
if (!tables) {
298301
tables = await this.preAggregations.getLoadCacheQueue(this.dataSource).executeInQueue(
299302
'query',
@@ -315,7 +318,7 @@ class PreAggregationLoadCache {
315318

316319
const newTables = await this.fetchTablesNoCache(preAggregation);
317320
await this.queryCache.getCacheDriver().set(
318-
this.tablesRedisKey(preAggregation),
321+
this.tablesCachePrefixKey(preAggregation),
319322
newTables,
320323
this.preAggregations.options.preAggregationsSchemaCacheExpire || 60 * 60
321324
);
@@ -332,12 +335,12 @@ class PreAggregationLoadCache {
332335
return client.getTablesQuery(preAggregation.preAggregationsSchema);
333336
}
334337

335-
public tablesRedisKey(preAggregation: PreAggregationDescription) {
338+
public tablesCachePrefixKey(preAggregation: PreAggregationDescription) {
336339
return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES', `${preAggregation.dataSource}${preAggregation.preAggregationsSchema}${preAggregation.external ? '_EXT' : ''}`);
337340
}
338341

339342
protected async getTablesQuery(preAggregation) {
340-
const redisKey = this.tablesRedisKey(preAggregation);
343+
const redisKey = this.tablesCachePrefixKey(preAggregation);
341344
if (!this.tables[redisKey]) {
342345
const tables = this.preAggregations.options.skipExternalCacheAndQueue && preAggregation.external ?
343346
await this.fetchTablesNoCache(preAggregation) :
@@ -350,6 +353,22 @@ class PreAggregationLoadCache {
350353
return this.tables[redisKey];
351354
}
352355

356+
protected async getTableColumnTypes(preAggregation: PreAggregationDescription, tableName: string): Promise<TableStructure> {
357+
const prefixKey = this.tablesCachePrefixKey(preAggregation);
358+
if (!this.tableColumnTypes[prefixKey]?.[tableName]) {
359+
if (!this.preAggregations.options.skipExternalCacheAndQueue && preAggregation.external) {
360+
throw new Error(`Lambda union with source data feature is supported only by external rollups stored in Cube Store but was invoked for '${preAggregation.preAggregationId}'`);
361+
}
362+
const client = await this.externalDriverFactory();
363+
const columnTypes = await client.tableColumnTypes(tableName);
364+
if (!this.tableColumnTypes[prefixKey]) {
365+
this.tableColumnTypes[prefixKey] = {};
366+
}
367+
this.tableColumnTypes[prefixKey][tableName] = columnTypes;
368+
}
369+
return this.tableColumnTypes[prefixKey][tableName];
370+
}
371+
353372
private async calculateVersionEntries(preAggregation): Promise<VersionEntriesObj> {
354373
let versionEntries = tablesToVersionEntries(
355374
preAggregation.preAggregationsSchema,
@@ -394,7 +413,7 @@ class PreAggregationLoadCache {
394413
if (this.tablePrefixes && !this.tablePrefixes.find(p => preAggregation.tableName.split('.')[1].startsWith(p))) {
395414
throw new Error(`Load cache tries to load table ${preAggregation.tableName} outside of tablePrefixes filter: ${this.tablePrefixes.join(', ')}`);
396415
}
397-
const redisKey = this.tablesRedisKey(preAggregation);
416+
const redisKey = this.tablesCachePrefixKey(preAggregation);
398417
if (!this.versionEntries[redisKey]) {
399418
this.versionEntries[redisKey] = this.calculateVersionEntries(preAggregation).catch(e => {
400419
delete this.versionEntries[redisKey];
@@ -450,6 +469,7 @@ class PreAggregationLoadCache {
450469
protected async reset(preAggregation) {
451470
await this.tablesFromCache(preAggregation, true);
452471
this.tables = {};
472+
this.tableColumnTypes = {};
453473
this.queryStageState = undefined;
454474
this.versionEntries = {};
455475
}
@@ -1655,8 +1675,9 @@ export class PreAggregationPartitionRangeLoader {
16551675

16561676
if (this.preAggregation.rollupLambdaId) {
16571677
if (this.lambdaQuery && loadResults.length > 0) {
1658-
const { buildRangeEnd } = loadResults[loadResults.length - 1];
1659-
lambdaTable = await this.downloadLambdaTable(buildRangeEnd);
1678+
const { buildRangeEnd, targetTableName } = loadResults[loadResults.length - 1];
1679+
const lambdaTypes = await this.loadCache.getTableColumnTypes(this.preAggregation, targetTableName);
1680+
lambdaTable = await this.downloadLambdaTable(buildRangeEnd, lambdaTypes);
16601681
}
16611682
const rollupLambdaResults = this.preAggregationsTablesToTempTables.filter(tempTableResult => tempTableResult[1].rollupLambdaId === this.preAggregation.rollupLambdaId);
16621683
const filteredResults = loadResults.filter(
@@ -1708,7 +1729,7 @@ export class PreAggregationPartitionRangeLoader {
17081729
/**
17091730
* Downloads the lambda table from the source DB.
17101731
*/
1711-
private async downloadLambdaTable(fromDate: string): Promise<InlineTable> {
1732+
private async downloadLambdaTable(fromDate: string, lambdaTypes: TableStructure): Promise<InlineTable> {
17121733
const { sqlAndParams, cacheKeyQueries } = this.lambdaQuery;
17131734
const [query, params] = sqlAndParams;
17141735
const values = params.map((p) => {
@@ -1733,6 +1754,7 @@ export class PreAggregationPartitionRangeLoader {
17331754
dataSource: this.dataSource,
17341755
external: false,
17351756
useCsvQuery: true,
1757+
lambdaTypes,
17361758
}
17371759
);
17381760
if (data.rowCount === this.options.maxSourceRowLimit) {
@@ -2393,7 +2415,7 @@ export class PreAggregations {
23932415
preAggregations.map(
23942416
async preAggregation => {
23952417
const { dataSource, preAggregationsSchema } = preAggregation;
2396-
const cacheKey = getLoadCacheByDataSource(dataSource, preAggregationsSchema).tablesRedisKey(preAggregation);
2418+
const cacheKey = getLoadCacheByDataSource(dataSource, preAggregationsSchema).tablesCachePrefixKey(preAggregation);
23972419
if (!firstByCacheKey[cacheKey]) {
23982420
firstByCacheKey[cacheKey] = getLoadCacheByDataSource(dataSource, preAggregationsSchema).getVersionEntries(preAggregation);
23992421
const res = await firstByCacheKey[cacheKey];

packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import csvWriter from 'csv-write-stream';
22
import LRUCache from 'lru-cache';
3+
import { pipeline } from 'stream';
34
import { MaybeCancelablePromise, streamToArray } from '@cubejs-backend/shared';
45
import { CubeStoreCacheDriver, CubeStoreDriver } from '@cubejs-backend/cubestore-driver';
5-
import { BaseDriver, InlineTables, CacheDriverInterface } from '@cubejs-backend/base-driver';
6+
import { BaseDriver, InlineTables, CacheDriverInterface, TableStructure } from '@cubejs-backend/base-driver';
67

78
import { QueryQueue } from './QueryQueue';
89
import { ContinueWaitError } from './ContinueWaitError';
@@ -246,6 +247,7 @@ export class QueryCache {
246247
persistent: queryBody.persistent,
247248
dataSource: queryBody.dataSource,
248249
useCsvQuery: queryBody.useCsvQuery,
250+
lambdaTypes: queryBody.lambdaTypes,
249251
aliasNameToMember: queryBody.aliasNameToMember,
250252
});
251253
} else {
@@ -426,6 +428,7 @@ export class QueryCache {
426428
requestId,
427429
inlineTables,
428430
useCsvQuery,
431+
lambdaTypes,
429432
persistent,
430433
aliasNameToMember,
431434
tablesSchema,
@@ -437,6 +440,7 @@ export class QueryCache {
437440
requestId?: string,
438441
inlineTables?: InlineTables,
439442
useCsvQuery?: boolean,
443+
lambdaTypes?: TableStructure,
440444
persistent?: boolean,
441445
aliasNameToMember?: { [alias: string]: string },
442446
tablesSchema?: boolean,
@@ -453,6 +457,7 @@ export class QueryCache {
453457
requestId,
454458
inlineTables,
455459
useCsvQuery,
460+
lambdaTypes,
456461
tablesSchema,
457462
};
458463

@@ -504,24 +509,41 @@ export class QueryCache {
504509
}
505510

506511
private async csvQuery(client, q) {
507-
const tableData = await client.downloadQueryResults(q.query, q.values, q);
508-
const headers = tableData.types.map(c => c.name);
512+
const headers = q.lambdaTypes.map(c => c.name);
509513
const writer = csvWriter({
510514
headers,
511515
sendHeaders: false,
512516
});
513-
tableData.rows.forEach(
514-
row => writer.write(row)
515-
);
516-
writer.end();
517-
const lines = await streamToArray(writer);
518-
if (tableData.release) {
519-
await tableData.release();
517+
let tableData;
518+
try {
519+
if (client.stream) {
520+
tableData = await client.stream(q.query, q.values, q);
521+
const errors = [];
522+
await pipeline(tableData.rowStream, writer, (err) => {
523+
if (err) {
524+
errors.push(err);
525+
}
526+
});
527+
if (errors.length > 0) {
528+
throw new Error(`Lambda query errors ${errors.join(', ')}`);
529+
}
530+
} else {
531+
tableData = await client.downloadQueryResults(q.query, q.values, q);
532+
tableData.rows.forEach(
533+
row => writer.write(row)
534+
);
535+
writer.end();
536+
}
537+
} finally {
538+
if (tableData?.release) {
539+
await tableData.release();
540+
}
520541
}
542+
const lines = await streamToArray(writer);
521543
const rowCount = lines.length;
522544
const csvRows = lines.join('');
523545
return {
524-
types: tableData.types,
546+
types: q.lambdaTypes,
525547
csvRows,
526548
rowCount,
527549
};
@@ -708,6 +730,7 @@ export class QueryCache {
708730
external?: boolean,
709731
dataSource: string,
710732
useCsvQuery?: boolean,
733+
lambdaTypes?: TableStructure,
711734
persistent?: boolean,
712735
}
713736
) {
@@ -741,6 +764,7 @@ export class QueryCache {
741764
requestId: options.requestId,
742765
dataSource: options.dataSource,
743766
useCsvQuery: options.useCsvQuery,
767+
lambdaTypes: options.lambdaTypes,
744768
persistent: options.persistent,
745769
}
746770
),
@@ -815,6 +839,7 @@ export class QueryCache {
815839
forceNoCache?: boolean,
816840
useInMemory?: boolean,
817841
useCsvQuery?: boolean,
842+
lambdaTypes?: TableStructure,
818843
persistent?: boolean,
819844
}
820845
) {
@@ -831,6 +856,7 @@ export class QueryCache {
831856
persistent: options.persistent,
832857
dataSource: options.dataSource,
833858
useCsvQuery: options.useCsvQuery,
859+
lambdaTypes: options.lambdaTypes,
834860
}).then(res => {
835861
const result = {
836862
time: (new Date()).getTime(),

0 commit comments

Comments
 (0)