Skip to content

Commit b25c199

Browse files
authored
feat(druid-driver): Retrieve types for columns with new Druid versions (#7414)
1 parent 5d1ad88 commit b25c199

File tree

5 files changed

+81
-12
lines changed

5 files changed

+81
-12
lines changed

.github/actions/integration/druid.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ export DEBUG=testcontainers
66

77
export TEST_POSTGRES_VERSION=13
88
export TEST_ZOOKEEPER_VERSION=3.5
9-
export TEST_DRUID_VERSION=0.19.0
9+
export TEST_DRUID_VERSION=27.0.0
1010

1111
echo "::group::Druid ${TEST_DRUID_VERSION}";
1212

packages/cubejs-druid-driver/docker-compose.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ services:
3333
restart: always
3434

3535
coordinator:
36-
image: apache/druid:${TEST_DRUID_VERSION:-0.19.0}
36+
image: apache/druid:${TEST_DRUID_VERSION:-27.0.0}
3737
container_name: coordinator
3838
volumes:
3939
- ./storage:/opt/data
@@ -56,7 +56,7 @@ services:
5656
start_period: 60s
5757

5858
broker:
59-
image: apache/druid:${TEST_DRUID_VERSION:-0.19.0}
59+
image: apache/druid:${TEST_DRUID_VERSION:-27.0.0}
6060
container_name: broker
6161
volumes:
6262
- broker_var:/opt/druid/var
@@ -79,7 +79,7 @@ services:
7979
start_period: 60s
8080

8181
historical:
82-
image: apache/druid:${TEST_DRUID_VERSION:-0.19.0}
82+
image: apache/druid:${TEST_DRUID_VERSION:-27.0.0}
8383
container_name: historical
8484
volumes:
8585
- ./storage:/opt/data
@@ -103,7 +103,7 @@ services:
103103
start_period: 60s
104104

105105
middlemanager:
106-
image: apache/druid:${TEST_DRUID_VERSION:-0.19.0}
106+
image: apache/druid:${TEST_DRUID_VERSION:-27.0.0}
107107
container_name: middlemanager
108108
volumes:
109109
- ./storage:/opt/data
@@ -127,7 +127,7 @@ services:
127127
start_period: 60s
128128

129129
router:
130-
image: apache/druid:${TEST_DRUID_VERSION:-0.19.0}
130+
image: apache/druid:${TEST_DRUID_VERSION:-27.0.0}
131131
container_name: router
132132
volumes:
133133
- router_var:/opt/druid/var

packages/cubejs-druid-driver/src/DruidClient.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,11 @@ export class DruidClient {
4141
});
4242
}
4343

44-
public async query<R = unknown>(query: string, parameters: { type: string, value: unknown }[]): Promise<R[]> {
44+
public async query<R = unknown>(query: string, parameters: { type: string, value: unknown }[]): Promise<{ rows: R[], columns: Record<string, { sqlType: string }> | null }> {
4545
let cancelled = false;
4646
const cancelObj: any = {};
4747

48-
const promise: Promise<R[]> & { cancel?: () => void } = (async () => {
48+
const promise: Promise<{ rows: R[], columns: any }> & { cancel?: () => void } = (async () => {
4949
cancelObj.cancel = async () => {
5050
cancelled = true;
5151
};
@@ -57,6 +57,8 @@ export class DruidClient {
5757
data: {
5858
query,
5959
parameters,
60+
header: true,
61+
sqlTypesHeader: true,
6062
resultFormat: 'object',
6163
},
6264
});
@@ -67,7 +69,19 @@ export class DruidClient {
6769
throw new Error('Query cancelled');
6870
}
6971

70-
return response.data;
72+
if (response.headers['x-druid-sql-header-included']) {
73+
const [columns, ...rows] = response.data;
74+
75+
return {
76+
columns,
77+
rows
78+
};
79+
} else {
80+
return {
81+
columns: null,
82+
rows: response.data,
83+
};
84+
}
7185
} catch (e: any) {
7286
if (cancelled) {
7387
throw new Error('Query cancelled');

packages/cubejs-druid-driver/src/DruidDriver.ts

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@ import {
88
getEnv,
99
assertDataSource,
1010
} from '@cubejs-backend/shared';
11-
import { BaseDriver, TableQueryResult } from '@cubejs-backend/base-driver';
11+
import {
12+
BaseDriver,
13+
DownloadQueryResultsOptions,
14+
DownloadQueryResultsResult,
15+
TableQueryResult, TableStructure,
16+
} from '@cubejs-backend/base-driver';
1217
import { DruidClient, DruidClientBaseConfiguration, DruidClientConfiguration } from './DruidClient';
1318
import { DruidQuery } from './DruidQuery';
1419

@@ -105,7 +110,8 @@ export class DruidDriver extends BaseDriver {
105110
}
106111

107112
public async query<R = unknown>(query: string, values: unknown[] = []): Promise<Array<R>> {
108-
return this.client.query(query, this.normalizeQueryValues(values));
113+
const result = await this.client.query<R>(query, this.normalizeQueryValues(values));
114+
return result.rows;
109115
}
110116

111117
public informationSchemaQuery() {
@@ -130,6 +136,29 @@ export class DruidDriver extends BaseDriver {
130136
]);
131137
}
132138

139+
public async downloadQueryResults(query: string, values: unknown[], _options: DownloadQueryResultsOptions): Promise<DownloadQueryResultsResult> {
140+
const { rows, columns } = await this.client.query<any>(query, this.normalizeQueryValues(values));
141+
if (!columns) {
142+
throw new Error(
143+
'You are using an old version of Druid. Unable to detect column types in readOnly mode.'
144+
);
145+
}
146+
147+
const types: TableStructure = [];
148+
149+
for (const [name, meta] of Object.entries(columns)) {
150+
types.push({
151+
name,
152+
type: this.toGenericType(meta.sqlType.toLowerCase()),
153+
});
154+
}
155+
156+
return {
157+
rows,
158+
types,
159+
};
160+
}
161+
133162
protected normalizeQueryValues(values: unknown[]) {
134163
return values.map((value) => ({
135164
value,

packages/cubejs-druid-driver/test/druid-driver.test.ts

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import path from 'path';
66
import { DruidDriver, DruidDriverConfiguration } from '../src/DruidDriver';
77

88
describe('DruidDriver', () => {
9-
let env: StartedDockerComposeEnvironment|null = null;
9+
let env: StartedDockerComposeEnvironment | null = null;
1010
let config: DruidDriverConfiguration;
1111

1212
const doWithDriver = async (callback: (driver: DruidDriver) => Promise<any>) => {
@@ -23,6 +23,8 @@ describe('DruidDriver', () => {
2323

2424
config = {
2525
url: `http://${host}:${port}`,
26+
user: 'admin',
27+
password: 'password1',
2628
};
2729

2830
return;
@@ -85,4 +87,28 @@ describe('DruidDriver', () => {
8587
}]);
8688
});
8789
});
90+
91+
it('downloadQueryResults', async () => {
92+
jest.setTimeout(10 * 1000);
93+
94+
return doWithDriver(async (driver) => {
95+
const result = await driver.downloadQueryResults(
96+
'SELECT 1 as id, true as finished, \'netherlands\' as country, CAST(\'2020-01-01T01:01:01.111Z\' as timestamp) as created UNION ALL SELECT 2 as id, false as finished, \'spain\' as country, CAST(\'2020-01-01T01:01:01.111Z\' as timestamp) as created',
97+
[],
98+
{ highWaterMark: 1 }
99+
);
100+
expect(result).toEqual({
101+
rows: [
102+
{ country: 'netherlands', created: '2020-01-01T01:01:01.111Z', finished: true, id: 1 },
103+
{ country: 'spain', created: '2020-01-01T01:01:01.111Z', finished: false, id: 2 }
104+
],
105+
types: [
106+
{ name: 'id', type: 'int' },
107+
{ name: 'finished', type: 'boolean' },
108+
{ name: 'country', type: 'text' },
109+
{ name: 'created', type: 'timestamp' }
110+
]
111+
});
112+
});
113+
});
88114
});

0 commit comments

Comments
 (0)