Skip to content

Commit 7f97af4

Browse files
authored
feat(databricks-driver): Support HLL feature with export bucket (#8301)
1 parent 48eaf2a commit 7f97af4

File tree

4 files changed

+70
-41
lines changed

4 files changed

+70
-41
lines changed

packages/cubejs-databricks-jdbc-driver/src/DatabricksDriver.ts

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import {
1717
SASProtocol,
1818
generateBlobSASQueryParameters,
1919
} from '@azure/storage-blob';
20-
import { DriverCapabilities, QueryColumnsResult, QueryOptions, QuerySchemasResult, QueryTablesResult, UnloadOptions } from '@cubejs-backend/base-driver';
20+
import { DriverCapabilities, QueryColumnsResult, QueryOptions, QuerySchemasResult, QueryTablesResult, UnloadOptions, GenericDataBaseType } from '@cubejs-backend/base-driver';
2121
import {
2222
JDBCDriver,
2323
JDBCDriverConfiguration,
@@ -99,7 +99,13 @@ type ShowDatabasesRow = {
9999
databaseName: string,
100100
};
101101

102+
type ColumnInfo = {
103+
name: any;
104+
type: GenericDataBaseType;
105+
};
106+
102107
const DatabricksToGenericType: Record<string, string> = {
108+
binary: 'hll_datasketches',
103109
'decimal(10,0)': 'bigint',
104110
};
105111

@@ -537,8 +543,9 @@ export class DatabricksDriver extends JDBCDriver {
537543
public async queryColumnTypes(
538544
sql: string,
539545
params?: unknown[]
540-
): Promise<{ name: any; type: string; }[]> {
546+
): Promise<ColumnInfo[]> {
541547
const result = [];
548+
542549
// eslint-disable-next-line camelcase
543550
const response = await this.query<{col_name: string; data_type: string}>(
544551
`DESCRIBE QUERY ${sql}`,
@@ -631,7 +638,7 @@ export class DatabricksDriver extends JDBCDriver {
631638
private async unloadWithSql(tableFullName: string, sql: string, params: unknown[]) {
632639
const types = await this.queryColumnTypes(sql, params);
633640

634-
await this.createExternalTableFromSql(tableFullName, sql, params);
641+
await this.createExternalTableFromSql(tableFullName, sql, params, types);
635642

636643
return types;
637644
}
@@ -641,9 +648,8 @@ export class DatabricksDriver extends JDBCDriver {
641648
*/
642649
private async unloadWithTable(tableFullName: string) {
643650
const types = await this.tableColumnTypes(tableFullName);
644-
const columns = types.map(t => t.name).join(', ');
645651

646-
await this.createExternalTableFromTable(tableFullName, columns);
652+
await this.createExternalTableFromTable(tableFullName, types);
647653

648654
return types;
649655
}
@@ -735,14 +741,17 @@ export class DatabricksDriver extends JDBCDriver {
735741
},
736742
region: this.config.awsRegion,
737743
});
744+
738745
const url = new URL(pathname);
739746
const list = await client.listObjectsV2({
740747
Bucket: url.host,
741748
Prefix: url.pathname.slice(1),
742749
});
750+
743751
if (list.Contents === undefined) {
744752
throw new Error(`No content in specified path: ${pathname}`);
745753
}
754+
746755
const csvFile = await Promise.all(
747756
list.Contents
748757
.filter(file => file.Key && /.csv$/i.test(file.Key))
@@ -758,9 +767,23 @@ export class DatabricksDriver extends JDBCDriver {
758767
throw new Error('No CSV files were exported to the specified bucket. ' +
759768
'Please check your export bucket configuration.');
760769
}
770+
761771
return csvFile;
762772
}
763773

774+
protected generateTableColumnsForExport(columns: ColumnInfo[]): string {
775+
const wrapped = columns.map((c) => {
776+
if (c.type === 'hll_datasketches') {
777+
// [UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE] The CSV datasource doesn't support type \"BINARY\". SQLSTATE: 0A000
778+
return `base64(${c.name})`;
779+
} else {
780+
return c.name;
781+
}
782+
});
783+
784+
return wrapped.join(', ');
785+
}
786+
764787
/**
765788
* Saves specified query to the configured bucket. This requires Databricks
766789
* cluster to be configured.
@@ -778,14 +801,20 @@ export class DatabricksDriver extends JDBCDriver {
778801
* `fs.s3a.access.key <aws-access-key>`
779802
* `fs.s3a.secret.key <aws-secret-key>`
780803
*/
781-
private async createExternalTableFromSql(tableFullName: string, sql: string, params: unknown[]) {
804+
private async createExternalTableFromSql(tableFullName: string, sql: string, params: unknown[], columns: ColumnInfo[]) {
805+
let select = sql;
806+
807+
if (columns.find((column) => column.type === 'hll_datasketches')) {
808+
select = `SELECT ${this.generateTableColumnsForExport(columns)} FROM (${sql})`;
809+
}
810+
782811
try {
783812
await this.query(
784813
`
785814
CREATE TABLE ${tableFullName}
786815
USING CSV LOCATION '${this.config.exportBucketMountDir || this.config.exportBucket}/${tableFullName}.csv'
787816
OPTIONS (escape = '"')
788-
AS (${sql});
817+
AS (${select});
789818
`,
790819
params,
791820
);
@@ -811,14 +840,14 @@ export class DatabricksDriver extends JDBCDriver {
811840
* `fs.s3a.access.key <aws-access-key>`
812841
* `fs.s3a.secret.key <aws-secret-key>`
813842
*/
814-
private async createExternalTableFromTable(tableFullName: string, columns: string) {
843+
private async createExternalTableFromTable(tableFullName: string, columns: ColumnInfo[]) {
815844
try {
816845
await this.query(
817846
`
818847
CREATE TABLE _${tableFullName}
819848
USING CSV LOCATION '${this.config.exportBucketMountDir || this.config.exportBucket}/${tableFullName}.csv'
820849
OPTIONS (escape = '"')
821-
AS SELECT ${columns} FROM ${tableFullName}
850+
AS SELECT ${this.generateTableColumnsForExport(columns)} FROM ${tableFullName}
822851
`,
823852
[],
824853
);

packages/cubejs-druid-driver/src/DruidQuery.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,6 @@ export class DruidQuery extends BaseQuery {
5050
}
5151

5252
public nowTimestampSql(): string {
53-
return `CURRENT_TIMESTAMP`;
53+
return 'CURRENT_TIMESTAMP';
5454
}
5555
}

packages/cubejs-testing-drivers/src/helpers/getComposePath.ts

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,26 @@
22
import fs from 'fs-extra';
33
import path from 'path';
44
import * as YAML from 'yaml';
5-
import { getFixtures } from './getFixtures';
5+
6+
import type { Fixture } from '../types/Fixture';
67

78
/**
89
* Returns docker compose file by data source type.
910
*/
10-
export function getComposePath(type: string, isLocal: boolean): [path: string, file: string] {
11+
export function getComposePath(type: string, fixture: Fixture, isLocal: boolean): [path: string, file: string] {
1112
const _path = path.resolve(process.cwd(), './.temp');
1213
const _file = `${type}-compose.yaml`;
13-
const { cube, data } = getFixtures(type);
14+
1415
const depends_on = ['store'];
15-
if (cube.depends_on) {
16-
depends_on.concat(cube.depends_on);
16+
if (fixture.cube.depends_on) {
17+
depends_on.concat(fixture.cube.depends_on);
1718
}
19+
1820
const links = ['store'];
19-
if (cube.links) {
20-
depends_on.concat(cube.links);
21+
if (fixture.cube.links) {
22+
depends_on.concat(fixture.cube.links);
2123
}
24+
2225
const volumes = [
2326
'./cube.js:/cube/conf/cube.js',
2427
'./package.json:/cube/conf/package.json',
@@ -29,7 +32,7 @@ export function getComposePath(type: string, isLocal: boolean): [path: string, f
2932
services: {
3033
...(!isLocal ? {
3134
cube: {
32-
...cube,
35+
...fixture.cube,
3336
container_name: 'cube',
3437
image: 'cubejs/cube:testing-drivers',
3538
depends_on,
@@ -46,12 +49,14 @@ export function getComposePath(type: string, isLocal: boolean): [path: string, f
4649
}
4750
}
4851
};
49-
if (data) {
52+
53+
if (fixture.data) {
5054
compose.services.data = {
51-
...data,
55+
...fixture.data,
5256
container_name: 'data',
5357
};
5458
}
59+
5560
fs.writeFileSync(
5661
path.resolve(_path, _file),
5762
YAML.stringify(compose),

packages/cubejs-testing-drivers/src/helpers/runEnvironment.ts

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ export async function runEnvironment(
9090
suf?: string,
9191
{ extendedEnv }: { extendedEnv?: string } = {}
9292
): Promise<Environment> {
93-
const fixtures = getFixtures(type, extendedEnv);
93+
const fixture = getFixtures(type, extendedEnv);
9494
getTempPath();
9595
getSchemaPath(type, suf);
9696
getCubeJsPath(type);
@@ -110,7 +110,7 @@ export async function runEnvironment(
110110
})
111111
.argv;
112112
const isLocal = mode === 'local';
113-
const [composePath, composeFile] = getComposePath(type, isLocal);
113+
const [composePath, composeFile] = getComposePath(type, fixture, isLocal);
114114
const compose = new DockerComposeEnvironment(
115115
composePath,
116116
composeFile,
@@ -120,16 +120,8 @@ export async function runEnvironment(
120120
CUBEJS_TELEMETRY: 'false',
121121
});
122122

123-
const _path = `${path.resolve(process.cwd(), `./fixtures/${type}.env`)}`;
124-
if (fs.existsSync(_path)) {
125-
config({
126-
path: _path,
127-
encoding: 'utf8',
128-
override: true,
129-
});
130-
}
131-
Object.keys(fixtures.cube.environment).forEach((key) => {
132-
const val = fixtures.cube.environment[key];
123+
Object.keys(fixture.cube.environment).forEach((key) => {
124+
const val = fixture.cube.environment[key];
133125
const { length } = val;
134126

135127
if (val.indexOf('${') === 0 && val.indexOf('}') === length - 1) {
@@ -144,8 +136,8 @@ export async function runEnvironment(
144136

145137
if (process.env[key]) {
146138
compose.withEnvironment({ [key]: <string>process.env[key] });
147-
} else if (fixtures.cube.environment[key]) {
148-
process.env[key] = fixtures.cube.environment[key];
139+
} else if (fixture.cube.environment[key]) {
140+
process.env[key] = fixture.cube.environment[key];
149141
}
150142
});
151143

@@ -166,8 +158,8 @@ export async function runEnvironment(
166158
};
167159

168160
const cliEnv = isLocal ? new CubeCliEnvironment(composePath) : null;
169-
const mappedDataPort = fixtures.data ? environment.getContainer('data').getMappedPort(
170-
parseInt(fixtures.data.ports[0], 10),
161+
const mappedDataPort = fixture.data ? environment.getContainer('data').getMappedPort(
162+
parseInt(fixture.data.ports[0], 10),
171163
) : null;
172164
if (cliEnv) {
173165
cliEnv.withEnvironment({
@@ -184,19 +176,19 @@ export async function runEnvironment(
184176
}
185177
const cube = cliEnv ? {
186178
port: 4000,
187-
pgPort: parseInt(fixtures.cube.ports[1], 10),
179+
pgPort: parseInt(fixture.cube.ports[1], 10),
188180
logs: cliEnv.cli?.stdout || process.stdout
189181
} : {
190182
port: environment.getContainer('cube').getMappedPort(
191-
parseInt(fixtures.cube.ports[0], 10),
183+
parseInt(fixture.cube.ports[0], 10),
192184
),
193-
pgPort: fixtures.cube.ports[1] && environment.getContainer('cube').getMappedPort(
194-
parseInt(fixtures.cube.ports[1], 10),
185+
pgPort: fixture.cube.ports[1] && environment.getContainer('cube').getMappedPort(
186+
parseInt(fixture.cube.ports[1], 10),
195187
) || undefined,
196188
logs: await environment.getContainer('cube').logs(),
197189
};
198190

199-
if (fixtures.data) {
191+
if (fixture.data) {
200192
const data = {
201193
port: mappedDataPort!,
202194
logs: await environment.getContainer('data').logs(),
@@ -207,6 +199,9 @@ export async function runEnvironment(
207199
data,
208200
stop: async () => {
209201
await environment.down({ timeout: 30 * 1000 });
202+
if (cliEnv) {
203+
await cliEnv.down();
204+
}
210205
},
211206
};
212207
}

0 commit comments

Comments
 (0)