Skip to content

Commit e7297f7

Browse files
authored
fix: databricks pre-aggregation errors (#6207)
1 parent 16ec962 commit e7297f7

File tree

2 files changed

+63
-44
lines changed

2 files changed

+63
-44
lines changed

packages/cubejs-databricks-jdbc-driver/src/DatabricksDriver.ts

Lines changed: 54 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
getEnv,
99
assertDataSource,
1010
} from '@cubejs-backend/shared';
11+
import crypto from 'crypto';
1112
import fs from 'fs';
1213
import path from 'path';
1314
import { S3, GetObjectCommand } from '@aws-sdk/client-s3';
@@ -750,16 +751,19 @@ export class DatabricksDriver extends JDBCDriver {
750751
* `fs.s3a.secret.key <aws-secret-key>`
751752
*/
752753
private async createExternalTableFromSql(tableFullName: string, sql: string, params: unknown[]) {
753-
await this.query(
754-
`
755-
CREATE TABLE ${this.getUnloadExportTableName(tableFullName)}
756-
USING CSV LOCATION '${this.config.exportBucketMountDir || this.config.exportBucket}/${tableFullName}.csv'
757-
OPTIONS (escape = '"')
758-
AS (${sql});
759-
`,
760-
params,
761-
);
762-
await this.query(`DROP TABLE ${this.getUnloadExportTableName(tableFullName)};`, []);
754+
try {
755+
await this.query(
756+
`
757+
CREATE TABLE ${this.getUnloadExportTableName(tableFullName)}
758+
USING CSV LOCATION '${this.config.exportBucketMountDir || this.config.exportBucket}/${tableFullName}.csv'
759+
OPTIONS (escape = '"')
760+
AS (${sql});
761+
`,
762+
params,
763+
);
764+
} finally {
765+
await this.query(`DROP TABLE IF EXISTS ${this.getUnloadExportTableName(tableFullName)};`, []);
766+
}
763767
}
764768

765769
/**
@@ -780,26 +784,49 @@ export class DatabricksDriver extends JDBCDriver {
780784
* `fs.s3a.secret.key <aws-secret-key>`
781785
*/
782786
private async createExternalTableFromTable(tableFullName: string, columns: string) {
783-
await this.query(
784-
`
785-
CREATE TABLE ${this.getUnloadExportTableName(tableFullName)}
786-
USING CSV LOCATION '${this.config.exportBucketMountDir || this.config.exportBucket}/${tableFullName}.csv'
787-
OPTIONS (escape = '"')
788-
AS SELECT ${columns} FROM ${tableFullName}
789-
`,
790-
[],
791-
);
792-
await this.query(`DROP TABLE ${this.getUnloadExportTableName(tableFullName)};`, []);
787+
try {
788+
await this.query(
789+
`
790+
CREATE TABLE ${this.getUnloadExportTableName(tableFullName)}
791+
USING CSV LOCATION '${this.config.exportBucketMountDir || this.config.exportBucket}/${tableFullName}.csv'
792+
OPTIONS (escape = '"')
793+
AS SELECT ${columns} FROM ${tableFullName}
794+
`,
795+
[],
796+
);
797+
} finally {
798+
await this.query(`DROP TABLE IF EXISTS ${this.getUnloadExportTableName(tableFullName)};`, []);
799+
}
793800
}
794801

795802
/**
796-
* Returns export table full name by table full name.
803+
* Returns export table name (hash) by table full name.
797804
*/
798-
private getUnloadExportTableName(tableFullName: string): null | string {
799-
const exportTable: string | string[] = tableFullName.split('.');
800-
exportTable[exportTable.length - 1] = `csv_export_${
801-
exportTable[exportTable.length - 1]
802-
}`;
803-
return exportTable.join('.');
805+
private getUnloadExportTableName(tableFullName: string): string {
806+
const table: string[] = tableFullName.split('.');
807+
const hashCharset = 'abcdefghijklmnopqrstuvwxyz012345';
808+
const digestBuffer = crypto.createHash('md5').update(tableFullName).digest();
809+
810+
let result = '';
811+
let residue = 0;
812+
let shiftCounter = 0;
813+
814+
for (let i = 0; i < 5; i++) {
815+
const byte = digestBuffer.readUInt8(i);
816+
shiftCounter += 16;
817+
// eslint-disable-next-line operator-assignment,no-bitwise
818+
residue = (byte << (shiftCounter - 8)) | residue;
819+
// eslint-disable-next-line no-bitwise
820+
while (residue >> 5) {
821+
result += hashCharset.charAt(residue % 32);
822+
shiftCounter -= 5;
823+
// eslint-disable-next-line operator-assignment,no-bitwise
824+
residue = residue >> 5;
825+
}
826+
}
827+
result += hashCharset.charAt(residue % 32);
828+
829+
table[table.length - 1] = result;
830+
return table.join('.');
804831
}
805832
}

packages/cubejs-jdbc-driver/src/JDBCDriver.ts

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -128,23 +128,15 @@ export class JDBCDriver extends BaseDriver {
128128
destroy: async (connection) => promisify(connection.close.bind(connection)),
129129
validate: async (connection) => (
130130
new Promise((resolve) => {
131-
const createStatement = promisify(connection.createStatement.bind(connection));
132-
createStatement().then((statement: JdbcStatement) => {
133-
const setQueryTimeout = promisify(statement.setQueryTimeout.bind(statement));
134-
const execute = promisify(statement.execute.bind(statement));
135-
setQueryTimeout(this.testConnectionTimeout() / 1000).then(() => {
136-
const timer = setTimeout(() => {
137-
resolve(false);
138-
}, this.testConnectionTimeout());
139-
execute('SELECT 1').then(() => {
140-
clearTimeout(timer);
141-
resolve(true);
142-
}).catch(() => {
143-
resolve(false);
144-
});
145-
}).catch(() => {
146-
resolve(false);
147-
});
131+
const isValid = promisify(connection.isValid.bind(connection));
132+
const timeout = setTimeout(() => {
133+
resolve(false);
134+
}, this.testConnectionTimeout());
135+
isValid(0).then((valid: boolean) => {
136+
clearTimeout(timeout);
137+
resolve(valid);
138+
}).catch(() => {
139+
resolve(false);
148140
});
149141
})
150142
)

0 commit comments

Comments
 (0)