Skip to content

Commit 17f1c1d

Browse files
committed
[WIP] feat(clickhouse): Use ClickHouse query with parameters via HTTP interface
1 parent 07e4293 commit 17f1c1d

File tree

9 files changed

+213
-33
lines changed

9 files changed

+213
-33
lines changed

packages/cubejs-clickhouse-driver/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@
2727
"integration:clickhouse": "jest dist/test"
2828
},
2929
"dependencies": {
30+
"@clickhouse/client-common": "^1.8.0",
3031
"@cubejs-backend/apla-clickhouse": "^1.7",
3132
"@cubejs-backend/base-driver": "1.1.3",
3233
"@cubejs-backend/shared": "1.1.3",
3334
"generic-pool": "^3.6.0",
3435
"moment": "^2.24.0",
35-
"sqlstring": "^2.3.1",
3636
"uuid": "^8.3.2"
3737
},
3838
"license": "Apache-2.0",

packages/cubejs-clickhouse-driver/src/ClickHouseDriver.ts

Lines changed: 68 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@ import {
2323
} from '@cubejs-backend/base-driver';
2424
import genericPool, { Pool } from 'generic-pool';
2525
import { v4 as uuidv4 } from 'uuid';
26-
import sqlstring from 'sqlstring';
26+
import { formatQueryParams } from '@clickhouse/client-common';
2727

2828
import { HydrationStream, transformRow } from './HydrationStream';
2929

30+
// TODO migrate to `@clickhouse/client`, upstream clickhouse client
3031
const ClickHouse = require('@cubejs-backend/apla-clickhouse');
3132

3233
const ClickhouseTypeToGeneric: Record<string, string> = {
@@ -222,14 +223,47 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
222223
true;
223224
}
224225

225-
public async query(query: string, values: unknown[]) {
226-
return this.queryResponse(query, values).then((res: any) => this.normaliseResponse(res));
226+
public async query(query: string, values?: unknown[]) {
227+
return this.queryResponse(query, values).then((res: any) => this.normaliseResponse(res)).catch((err: unknown) => {
228+
// TODO drop this log, or make it shorter
229+
throw new Error(`Failed during query; query: ${query}; values: ${values}`, { cause: err });
230+
});
231+
}
232+
233+
// TODO make static, use somewhere in tests
234+
protected prepareParams(values: unknown[]): Record<string, string> {
235+
// apla-clickhouse allows to add query params via querystring.stringify and `queryOptions` object
236+
// https://github.com/cube-js/apla-node-clickhouse/blob/5a6577fc97ba6911171753fc65b2cd2f6170f2f7/src/clickhouse.js#L347-L348
237+
// https://github.com/cube-js/apla-node-clickhouse/blob/5a6577fc97ba6911171753fc65b2cd2f6170f2f7/src/clickhouse.js#L265-L266
238+
// https://github.com/cube-js/apla-node-clickhouse/blob/5a6577fc97ba6911171753fc65b2cd2f6170f2f7/src/clickhouse.js#L336-L338
239+
// https://github.com/cube-js/apla-node-clickhouse/blob/5a6577fc97ba6911171753fc65b2cd2f6170f2f7/src/clickhouse.js#L173-L175
240+
241+
// We can use `toSearchParams` or `formatQueryParams` from `@clickhouse/client-common` to prepare params
242+
// Beware - these functions marked as "For implementations usage only - should not be re-exported", so, probably, they could be moved or disappear completely
243+
// https://github.com/ClickHouse/clickhouse-js/blob/a15cce93545c792852e34c05ce31954c75d11486/packages/client-common/src/utils/url.ts#L57-L61
244+
245+
// HTTP interface itself is documented, so it should be mostly fine
246+
// https://clickhouse.com/docs/en/interfaces/cli#cli-queries-with-parameters
247+
// https://clickhouse.com/docs/en/interfaces/http#cli-queries-with-parameters
248+
249+
return Object.fromEntries(values.map((value, idx) => {
250+
const paramName = this.paramName(idx);
251+
const paramKey = `param_${paramName}`;
252+
const preparedValue = formatQueryParams(value);
253+
return [paramKey, preparedValue];
254+
}));
227255
}
228256

229-
protected queryResponse(query: string, values: unknown[]) {
230-
const formattedQuery = sqlstring.format(query, values);
257+
protected queryResponse(query: string, values?: unknown[]) {
258+
// todo drop this
259+
console.log('queryResponse call', query, values);
231260

232-
return this.withConnection((connection, queryId) => connection.querying(formattedQuery, {
261+
const paramsValues = this.prepareParams(values ?? []);
262+
263+
// todo drop this
264+
console.log('queryResponse prepared', query, paramsValues);
265+
266+
return this.withConnection((connection, queryId) => connection.querying(query, {
233267
dataObjects: true,
234268
queryOptions: {
235269
query_id: queryId,
@@ -241,6 +275,9 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
241275
//
242276
//
243277
...(this.readOnlyMode ? {} : { join_use_nulls: 1 }),
278+
279+
// Add parameter values to query string
280+
...paramsValues,
244281
}
245282
}));
246283
}
@@ -309,17 +346,34 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
309346
return [{ schema_name: this.config.queryOptions.database }];
310347
}
311348

349+
// TODO make static
350+
protected paramName(paramIndex: number): string {
351+
return `p${paramIndex}`;
352+
}
353+
354+
// TODO make static
355+
public param(paramIndex: number): string {
356+
// TODO not always string
357+
return `{${this.paramName(paramIndex)}:String}`;
358+
}
359+
312360
public async stream(
313361
query: string,
314362
values: unknown[],
315363
// eslint-disable-next-line @typescript-eslint/no-unused-vars
316364
{ highWaterMark }: StreamOptions
317365
): Promise<StreamTableDataWithTypes> {
366+
// todo drop this
367+
console.log('stream call', query, values);
368+
318369
// eslint-disable-next-line no-underscore-dangle
319370
const conn = await (<any> this.pool)._factory.create();
320371

321372
try {
322-
const formattedQuery = sqlstring.format(query, values);
373+
const paramsValues = this.prepareParams(values ?? []);
374+
375+
// todo drop this
376+
console.log('stream prepared', query, paramsValues);
323377

324378
return await new Promise((resolve, reject) => {
325379
const options = {
@@ -333,12 +387,16 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
333387
//
334388
//
335389
...(this.readOnlyMode ? {} : { join_use_nulls: 1 }),
390+
391+
// Add parameter values to query string
392+
...paramsValues,
336393
}
337394
};
338395

339-
const originalStream = conn.query(formattedQuery, options, (err: Error | null, result: any) => {
396+
const originalStream = conn.query(query, options, (err: Error | null, result: any) => {
340397
if (err) {
341-
reject(err);
398+
// TODO remove message, or make it shorter
399+
reject(new Error(`Failed during stream createion; query: ${query}; values: ${values}`, { cause: err }));
342400
} else {
343401
const rowStream = new HydrationStream(result.meta);
344402
originalStream.pipe(rowStream);
@@ -416,7 +474,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
416474
}
417475

418476
public getTablesQuery(schemaName: string) {
419-
return this.query('SELECT name as table_name FROM system.tables WHERE database = ?', [schemaName]);
477+
return this.query('SELECT name as table_name FROM system.tables WHERE database = {p0:String}', [schemaName]);
420478
}
421479

422480
protected getExportBucket(

packages/cubejs-clickhouse-driver/test/ClickHouseDriver.test.ts

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,23 @@ describe('ClickHouseDriver', () => {
5858
[]
5959
);
6060

61-
await driver.query('INSERT INTO test.types_test VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', [
61+
function mkPlaceholdersTuple(len: number): string {
62+
const parts = new Array(len).fill('').map((_, idx) => driver.param(idx));
63+
return `(${parts.join(',')})`;
64+
}
65+
66+
async function insert(table: string, values: Array<unknown>): Promise<void> {
67+
const placeholders = mkPlaceholdersTuple(values.length);
68+
await driver.query(`INSERT INTO ${table} VALUES ${placeholders}`, values);
69+
}
70+
71+
await insert('test.types_test', [
6272
'2020-01-01', '2020-01-01 00:00:00', '2020-01-01 00:00:00.000', '2020-01-01 00:00:00.000000', '2020-01-01 00:00:00.000000000', 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1.01, 1.01, 1.01, 'hello', 'world'
6373
]);
64-
await driver.query('INSERT INTO test.types_test VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', [
74+
await insert('test.types_test', [
6575
'2020-01-02', '2020-01-02 00:00:00', '2020-01-02 00:00:00.123', '2020-01-02 00:00:00.123456', '2020-01-02 00:00:00.123456789', 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2.02, 2.02, 2.02, 'hello', 'world'
6676
]);
67-
await driver.query('INSERT INTO test.types_test VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', [
77+
await insert('test.types_test', [
6878
'2020-01-03', '2020-01-03 00:00:00', '2020-01-03 00:00:00.234', '2020-01-03 00:00:00.234567', '2020-01-03 00:00:00.234567890', 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3.03, 3.03, 3.03, 'hello', 'world'
6979
]);
7080
});
@@ -205,8 +215,8 @@ describe('ClickHouseDriver', () => {
205215
try {
206216
await driver.createSchemaIfNotExists(name);
207217
await driver.query(`CREATE TABLE ${name}.test (x Int32, s String) ENGINE Log`, []);
208-
await driver.query(`INSERT INTO ${name}.test VALUES (?, ?), (?, ?), (?, ?)`, [1, 'str1', 2, 'str2', 3, 'str3']);
209-
const values = await driver.query(`SELECT * FROM ${name}.test WHERE x = ?`, [2]);
218+
await driver.query(`INSERT INTO ${name}.test VALUES ({p0:Int32}, {p1:String}), ({p2:Int32}, {p3:String}), ({p4:Int32}, {p5:String})`, [1, 'str1', 2, 'str2', 3, 'str3']);
219+
const values = await driver.query(`SELECT * FROM ${name}.test WHERE x = {p0:Int32}`, [2]);
210220
expect(values).toEqual([{ x: '2', s: 'str2' }]);
211221
} finally {
212222
await driver.query(`DROP DATABASE ${name}`, []);
@@ -220,10 +230,10 @@ describe('ClickHouseDriver', () => {
220230
try {
221231
await driver.createSchemaIfNotExists(name);
222232
await driver.query(`CREATE TABLE ${name}.a (x Int32, s String) ENGINE Log`, []);
223-
await driver.query(`INSERT INTO ${name}.a VALUES (?, ?), (?, ?), (?, ?)`, [1, 'str1', 2, 'str2', 3, 'str3']);
233+
await driver.query(`INSERT INTO ${name}.a VALUES ({p0:Int32}, {p1:String}), ({p2:Int32}, {p3:String}), ({p4:Int32}, {p5:String})`, [1, 'str1', 2, 'str2', 3, 'str3']);
224234

225235
await driver.query(`CREATE TABLE ${name}.b (x Int32, s String) ENGINE Log`, []);
226-
await driver.query(`INSERT INTO ${name}.b VALUES (?, ?), (?, ?), (?, ?)`, [2, 'str2', 3, 'str3', 4, 'str4']);
236+
await driver.query(`INSERT INTO ${name}.b VALUES ({p0:Int32}, {p1:String}), ({p2:Int32}, {p3:String}), ({p4:Int32}, {p5:String})`, [2, 'str2', 3, 'str3', 4, 'str4']);
227237

228238
const values = await driver.query(`SELECT * FROM ${name}.a LEFT OUTER JOIN ${name}.b ON a.x = b.x`, []);
229239
expect(values).toEqual([
@@ -245,7 +255,7 @@ describe('ClickHouseDriver', () => {
245255

246256
it('datetime with specific timezone', async () => {
247257
await doWithDriver(async (driver) => {
248-
const rows = await driver.query('SELECT toDateTime(?, \'Asia/Istanbul\') as dt', [
258+
const rows = await driver.query('SELECT toDateTime({p0:String}, \'Asia/Istanbul\') as dt', [
249259
'2020-01-01 00:00:00'
250260
]);
251261
expect(rows).toEqual([{

packages/cubejs-schema-compiler/package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
"uuid": "^8.3.2"
5858
},
5959
"devDependencies": {
60+
"@clickhouse/client-common": "^1.8.0",
6061
"@cubejs-backend/apla-clickhouse": "^1.7.0",
6162
"@cubejs-backend/linter": "^1.0.0",
6263
"@cubejs-backend/query-orchestrator": "1.1.3",
@@ -68,7 +69,6 @@
6869
"@types/lru-cache": "^5.1.0",
6970
"@types/node": "^18",
7071
"@types/ramda": "^0.27.34",
71-
"@types/sqlstring": "^2.3.0",
7272
"@types/syntax-error": "^1.4.1",
7373
"@types/uuid": "^8.3.0",
7474
"antlr4ts-cli": "^0.5.0-alpha.4",
@@ -79,7 +79,6 @@
7979
"request": "^2.88.0",
8080
"request-promise": "^4.2.4",
8181
"source-map-support": "^0.5.19",
82-
"sqlstring": "^2.3.1",
8382
"testcontainers": "^10.10.4",
8483
"typescript": "~5.2.2",
8584
"uuid": "^8.3.2"

packages/cubejs-schema-compiler/src/adapter/ClickHouseQuery.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { BaseQuery } from './BaseQuery';
33
import { BaseFilter } from './BaseFilter';
44
import { UserError } from '../compiler/UserError';
55
import { BaseTimeDimension } from './BaseTimeDimension';
6+
import { ParamAllocator } from './ParamAllocator';
67

78
const GRANULARITY_TO_INTERVAL = {
89
day: 'Day',
@@ -14,6 +15,13 @@ const GRANULARITY_TO_INTERVAL = {
1415
year: 'Year',
1516
};
1617

18+
class ClickHouseParamAllocator extends ParamAllocator {
19+
public paramPlaceHolder(paramIndex: number): string {
20+
// TODO not always string
21+
return `{p${paramIndex}:String}`;
22+
}
23+
}
24+
1725
class ClickHouseFilter extends BaseFilter {
1826
public likeIgnoreCase(column, not, param, type) {
1927
const p = (!type || type === 'contains' || type === 'ends') ? '%' : '';
@@ -31,6 +39,10 @@ class ClickHouseFilter extends BaseFilter {
3139
}
3240

3341
export class ClickHouseQuery extends BaseQuery {
42+
public newParamAllocator(expressionParams) {
43+
return new ClickHouseParamAllocator(expressionParams);
44+
}
45+
3446
public newFilter(filter) {
3547
return new ClickHouseFilter(this, filter);
3648
}
@@ -268,6 +280,8 @@ export class ClickHouseQuery extends BaseQuery {
268280

269281
public sqlTemplates() {
270282
const templates = super.sqlTemplates();
283+
// TODO not every param is a string
284+
templates.params.param = '{p{{ param_index }}:String}';
271285
templates.functions.DATETRUNC = 'DATE_TRUNC({{ args_concat }})';
272286
// TODO: Introduce additional filter in jinja? or parseDateTimeBestEffort?
273287
// https://github.com/ClickHouse/ClickHouse/issues/19351

packages/cubejs-schema-compiler/test/integration/clickhouse/ClickHouseDbRunner.js

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
import { formatQueryParams } from '@clickhouse/client-common';
2+
// TODO migrate to `@clickhouse/client`, upstream clickhouse client
13
/* eslint-disable */
24
import ClickHouse from '@cubejs-backend/apla-clickhouse';
35
import { GenericContainer } from 'testcontainers';
4-
import { format as formatSql } from 'sqlstring';
56
import { v4 as uuidv4 } from 'uuid';
67
import { ClickHouseQuery } from '../../../src/adapter/ClickHouseQuery';
78
import { BaseDbRunner } from "../utils/BaseDbRunner";
@@ -136,13 +137,29 @@ export class ClickHouseDbRunner extends BaseDbRunner {
136137
} : {};
137138

138139
for (const [query, params] of queries) {
139-
requests.push(clickHouse.querying(formatSql(query, params), {
140+
// ClickHouse have named and typed parameters
141+
// But our interface for parameters between query builder and driver is Array<unknown>
142+
// We don't have access to driver here, so instead this replicates logic from driver
143+
const preparedValues = Object.fromEntries(params.map((value, idx) => {
144+
const paramName = `p${idx}`;
145+
const paramKey = `param_${paramName}`;
146+
const preparedValue = formatQueryParams(value);
147+
return [paramKey, preparedValue];
148+
}));
149+
// TODO drop this
150+
console.log("ClickHOuse testQueries prepared", query, preparedValues);
151+
requests.push(clickHouse.querying(query, {
140152
dataObjects: true,
141153
queryOptions: {
142154
session_id: clickHouse.sessionId,
143155
join_use_nulls: '1',
144-
...extendedDateTimeResultsOptions
156+
...extendedDateTimeResultsOptions,
157+
// Add parameter values to query string
158+
...preparedValues,
145159
}
160+
}).catch((err) => {
161+
// TODO remove message, or make it shorter. ort not, it's just tests
162+
throw new Error(`Failed during query; query: ${query}; params: ${params}`, { cause: err });
146163
}));
147164
}
148165

0 commit comments

Comments
 (0)