Skip to content

Commit 5969f0d

Browse files
authored
fix(databricks-jdbc-driver): Rolling window & count_distinct_approx (HLL) (#8323)
1 parent fbfabea commit 5969f0d

File tree

7 files changed

+3580
-18
lines changed

7 files changed

+3580
-18
lines changed

packages/cubejs-databricks-jdbc-driver/src/DatabricksQuery.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ export class DatabricksQuery extends BaseQuery {
3333
return `hll_union_agg(${sql})`;
3434
}
3535

36+
public hllCardinality(sql: string): string {
37+
return `hll_sketch_estimate(${sql})`;
38+
}
39+
40+
public hllCardinalityMerge(sql: string): string {
41+
return `hll_sketch_estimate(hll_union_agg(${sql}))`;
42+
}
43+
3644
public countDistinctApprox(sql: string) {
3745
return `approx_count_distinct(${sql})`;
3846
}

packages/cubejs-jdbc-driver/src/JDBCDriver.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import path from 'path';
2525
import { DriverOptionsInterface, SupportedDrivers } from './supported-drivers';
2626
// eslint-disable-next-line @typescript-eslint/no-unused-vars
2727
import { JDBCDriverConfiguration } from './types';
28-
import { QueryStream, nextFn, Row } from './QueryStream';
28+
import { QueryStream, nextFn, Row, transformRow } from './QueryStream';
2929

3030
const DriverManager = require('@cubejs-backend/jdbc/lib/drivermanager');
3131
const Connection = require('@cubejs-backend/jdbc/lib/connection');
@@ -246,6 +246,7 @@ export class JDBCDriver extends BaseDriver {
246246

247247
protected async queryPromised(query: string, cancelObj: any, options: any) {
248248
options = options || {};
249+
249250
try {
250251
const conn = await this.pool.acquire();
251252
try {
@@ -337,11 +338,18 @@ export class JDBCDriver extends BaseDriver {
337338
await setQueryTimeout(600);
338339
const executeQueryAsync = promisify(statement.execute.bind(statement));
339340
const resultSet = await executeQueryAsync(query);
340-
const toObjArrayAsync =
341-
resultSet.toObjArray && promisify(resultSet.toObjArray.bind(resultSet)) ||
342-
(() => Promise.resolve(resultSet));
343341

344-
return toObjArrayAsync();
342+
if (resultSet.toObjArray) {
343+
const result: any = await (promisify(resultSet.toObjArray.bind(resultSet)))();
344+
345+
for (const [key, row] of Object.entries(result)) {
346+
result[key] = transformRow(row);
347+
}
348+
349+
return result;
350+
}
351+
352+
return resultSet;
345353
}
346354

347355
public async release() {

packages/cubejs-jdbc-driver/src/QueryStream.ts

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,18 @@ export type nextFn = () => {
99
value: Row,
1010
};
1111

12+
export function transformRow(row: any) {
13+
// eslint-disable-next-line no-restricted-syntax
14+
for (const [name, field] of Object.entries(row)) {
15+
// console.log({ name, field });
16+
if (field instanceof Int8Array) {
17+
row[name] = Buffer.from(field).toString('base64');
18+
}
19+
}
20+
21+
return row;
22+
}
23+
1224
export class QueryStream extends Readable {
1325
private next: null | nextFn;
1426

@@ -23,18 +35,6 @@ export class QueryStream extends Readable {
2335
this.next = nextFn;
2436
}
2537

26-
protected transformRow(row: any) {
27-
// eslint-disable-next-line no-restricted-syntax
28-
for (const [name, field] of Object.entries(row)) {
29-
// console.log({ name, field });
30-
if (field instanceof Int8Array) {
31-
row[name] = Buffer.from(field).toString('base64');
32-
}
33-
}
34-
35-
return row;
36-
}
37-
3838
/**
3939
* @override
4040
*/
@@ -44,7 +44,7 @@ export class QueryStream extends Readable {
4444
if (this.next) {
4545
const row = this.next();
4646
if (row.value) {
47-
this.push(this.transformRow(row.value));
47+
this.push(transformRow(row.value));
4848
}
4949
if (row.done) {
5050
this.push(null);

packages/cubejs-testing-drivers/fixtures/_schemas.json

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,30 @@
339339
"rolling_window": {
340340
"trailing": "2 month"
341341
}
342+
},
343+
{
344+
"name": "rollingCountApproxBy2Day",
345+
"type": "count_distinct_approx",
346+
"sql": "order_id",
347+
"rolling_window": {
348+
"trailing": "2 day"
349+
}
350+
},
351+
{
352+
"name": "rollingCountApproxBy2Week",
353+
"type": "count_distinct_approx",
354+
"sql": "order_id",
355+
"rolling_window": {
356+
"trailing": "2 week"
357+
}
358+
},
359+
{
360+
"name": "rollingCountApproxBy2Month",
361+
"type": "count_distinct_approx",
362+
"sql": "order_id",
363+
"rolling_window": {
364+
"trailing": "2 month"
365+
}
342366
}
343367
]
344368
}

packages/cubejs-testing-drivers/src/tests/testQueries.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1535,6 +1535,50 @@ export function testQueries(type: string, { includeIncrementalSchemaSuite, exten
15351535
expect(response.rawData()).toMatchSnapshot();
15361536
});
15371537

1538+
if (includeHLLSuite) {
1539+
execute('querying BigECommerce: rolling count_distinct_approx window by 2 day', async () => {
1540+
const response = await client.load({
1541+
measures: [
1542+
'BigECommerce.rollingCountApproxBy2Day',
1543+
],
1544+
timeDimensions: [{
1545+
dimension: 'BigECommerce.orderDate',
1546+
granularity: 'month',
1547+
dateRange: ['2020-01-01', '2020-12-31'],
1548+
}],
1549+
});
1550+
expect(response.rawData()).toMatchSnapshot();
1551+
});
1552+
1553+
execute('querying BigECommerce: rolling count_distinct_approx window by 2 week', async () => {
1554+
const response = await client.load({
1555+
measures: [
1556+
'BigECommerce.rollingCountApproxBy2Week',
1557+
],
1558+
timeDimensions: [{
1559+
dimension: 'BigECommerce.orderDate',
1560+
granularity: 'month',
1561+
dateRange: ['2020-01-01', '2020-12-31'],
1562+
}],
1563+
});
1564+
expect(response.rawData()).toMatchSnapshot();
1565+
});
1566+
1567+
execute('querying BigECommerce: rolling count_distinct_approx window by 2 month', async () => {
1568+
const response = await client.load({
1569+
measures: [
1570+
'BigECommerce.rollingCountApproxBy2Month',
1571+
],
1572+
timeDimensions: [{
1573+
dimension: 'BigECommerce.orderDate',
1574+
granularity: 'month',
1575+
dateRange: ['2020-01-01', '2020-12-31'],
1576+
}],
1577+
});
1578+
expect(response.rawData()).toMatchSnapshot();
1579+
});
1580+
}
1581+
15381582
execute('querying BigECommerce: totalProfitYearAgo', async () => {
15391583
const response = await client.load({
15401584
measures: [

0 commit comments

Comments
 (0)