Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/integration/clickhouse.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
set -eo pipefail

# Debug log for test containers
export DEBUG=testcontainers
export DEBUG=testcontainers,testcontainers:containers

export TEST_CLICKHOUSE_VERSION=23.11

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/drivers-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,6 @@ jobs:
timeout_minutes: 20
command: |
cd ./packages/cubejs-testing-drivers
export DEBUG=testcontainers
export DEBUG=testcontainers,testcontainers:containers
yarn ${{ matrix.database }}-full

2 changes: 1 addition & 1 deletion packages/cubejs-clickhouse-driver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
"integration:clickhouse": "jest dist/test"
},
"dependencies": {
"@clickhouse/client-common": "^1.8.0",
"@cubejs-backend/apla-clickhouse": "^1.7",
"@cubejs-backend/base-driver": "1.1.3",
"@cubejs-backend/shared": "1.1.3",
"generic-pool": "^3.6.0",
"moment": "^2.24.0",
"sqlstring": "^2.3.1",
"uuid": "^8.3.2"
},
"license": "Apache-2.0",
Expand Down
78 changes: 68 additions & 10 deletions packages/cubejs-clickhouse-driver/src/ClickHouseDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import {
} from '@cubejs-backend/base-driver';
import genericPool, { Pool } from 'generic-pool';
import { v4 as uuidv4 } from 'uuid';
import sqlstring from 'sqlstring';
import { formatQueryParams } from '@clickhouse/client-common';

import { HydrationStream, transformRow } from './HydrationStream';

// TODO migrate to `@clickhouse/client`, upstream clickhouse client
const ClickHouse = require('@cubejs-backend/apla-clickhouse');

const ClickhouseTypeToGeneric: Record<string, string> = {
Expand Down Expand Up @@ -222,14 +223,47 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
true;
}

public async query(query: string, values: unknown[]) {
return this.queryResponse(query, values).then((res: any) => this.normaliseResponse(res));
public async query(query: string, values?: unknown[]) {
return this.queryResponse(query, values).then((res: any) => this.normaliseResponse(res)).catch((err: unknown) => {
// TODO drop this log, or make it shorter
throw new Error(`Failed during query; query: ${query}; values: ${values}`, { cause: err });
});
}

// TODO make static, use somewhere in tests
protected prepareParams(values: unknown[]): Record<string, string> {
// apla-clickhouse allows to add query params via querystring.stringify and `queryOptions` object
// https://github.com/cube-js/apla-node-clickhouse/blob/5a6577fc97ba6911171753fc65b2cd2f6170f2f7/src/clickhouse.js#L347-L348
// https://github.com/cube-js/apla-node-clickhouse/blob/5a6577fc97ba6911171753fc65b2cd2f6170f2f7/src/clickhouse.js#L265-L266
// https://github.com/cube-js/apla-node-clickhouse/blob/5a6577fc97ba6911171753fc65b2cd2f6170f2f7/src/clickhouse.js#L336-L338
// https://github.com/cube-js/apla-node-clickhouse/blob/5a6577fc97ba6911171753fc65b2cd2f6170f2f7/src/clickhouse.js#L173-L175

// We can use `toSearchParams` or `formatQueryParams` from `@clickhouse/client-common` to prepare params
// Beware - these functions marked as "For implementations usage only - should not be re-exported", so, probably, they could be moved or disappear completely
// https://github.com/ClickHouse/clickhouse-js/blob/a15cce93545c792852e34c05ce31954c75d11486/packages/client-common/src/utils/url.ts#L57-L61

// HTTP interface itself is documented, so it should be mostly fine
// https://clickhouse.com/docs/en/interfaces/cli#cli-queries-with-parameters
// https://clickhouse.com/docs/en/interfaces/http#cli-queries-with-parameters

return Object.fromEntries(values.map((value, idx) => {
const paramName = this.paramName(idx);
const paramKey = `param_${paramName}`;
const preparedValue = formatQueryParams(value);
return [paramKey, preparedValue];
}));
}

protected queryResponse(query: string, values: unknown[]) {
const formattedQuery = sqlstring.format(query, values);
protected queryResponse(query: string, values?: unknown[]) {
// todo drop this
console.log('queryResponse call', query, values);

return this.withConnection((connection, queryId) => connection.querying(formattedQuery, {
const paramsValues = this.prepareParams(values ?? []);

// todo drop this
console.log('queryResponse prepared', query, paramsValues);

return this.withConnection((connection, queryId) => connection.querying(query, {
dataObjects: true,
queryOptions: {
query_id: queryId,
Expand All @@ -241,6 +275,9 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
//
//
...(this.readOnlyMode ? {} : { join_use_nulls: 1 }),

// Add parameter values to query string
...paramsValues,
}
}));
}
Expand Down Expand Up @@ -309,17 +346,34 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
return [{ schema_name: this.config.queryOptions.database }];
}

// TODO make static
protected paramName(paramIndex: number): string {
return `p${paramIndex}`;
}

// TODO make static
public param(paramIndex: number): string {
// TODO not always string
return `{${this.paramName(paramIndex)}:String}`;
}

public async stream(
query: string,
values: unknown[],
// eslint-disable-next-line @typescript-eslint/no-unused-vars
{ highWaterMark }: StreamOptions
): Promise<StreamTableDataWithTypes> {
// todo drop this
console.log('stream call', query, values);

// eslint-disable-next-line no-underscore-dangle
const conn = await (<any> this.pool)._factory.create();

try {
const formattedQuery = sqlstring.format(query, values);
const paramsValues = this.prepareParams(values ?? []);

// todo drop this
console.log('stream prepared', query, paramsValues);

return await new Promise((resolve, reject) => {
const options = {
Expand All @@ -333,12 +387,16 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
//
//
...(this.readOnlyMode ? {} : { join_use_nulls: 1 }),

// Add parameter values to query string
...paramsValues,
}
};

const originalStream = conn.query(formattedQuery, options, (err: Error | null, result: any) => {
const originalStream = conn.query(query, options, (err: Error | null, result: any) => {
if (err) {
reject(err);
// TODO remove message, or make it shorter
reject(new Error(`Failed during stream createion; query: ${query}; values: ${values}`, { cause: err }));
} else {
const rowStream = new HydrationStream(result.meta);
originalStream.pipe(rowStream);
Expand Down Expand Up @@ -416,7 +474,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
}

public getTablesQuery(schemaName: string) {
return this.query('SELECT name as table_name FROM system.tables WHERE database = ?', [schemaName]);
return this.query('SELECT name as table_name FROM system.tables WHERE database = {p0:String}', [schemaName]);
}

protected getExportBucket(
Expand Down
26 changes: 18 additions & 8 deletions packages/cubejs-clickhouse-driver/test/ClickHouseDriver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,23 @@ describe('ClickHouseDriver', () => {
[]
);

await driver.query('INSERT INTO test.types_test VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', [
function mkPlaceholdersTuple(len: number): string {
const parts = new Array(len).fill('').map((_, idx) => driver.param(idx));
return `(${parts.join(',')})`;
}

async function insert(table: string, values: Array<unknown>): Promise<void> {
const placeholders = mkPlaceholdersTuple(values.length);
await driver.query(`INSERT INTO ${table} VALUES ${placeholders}`, values);
}

await insert('test.types_test', [
'2020-01-01', '2020-01-01 00:00:00', '2020-01-01 00:00:00.000', '2020-01-01 00:00:00.000000', '2020-01-01 00:00:00.000000000', 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1.01, 1.01, 1.01, 'hello', 'world'
]);
await driver.query('INSERT INTO test.types_test VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', [
await insert('test.types_test', [
'2020-01-02', '2020-01-02 00:00:00', '2020-01-02 00:00:00.123', '2020-01-02 00:00:00.123456', '2020-01-02 00:00:00.123456789', 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2.02, 2.02, 2.02, 'hello', 'world'
]);
await driver.query('INSERT INTO test.types_test VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', [
await insert('test.types_test', [
'2020-01-03', '2020-01-03 00:00:00', '2020-01-03 00:00:00.234', '2020-01-03 00:00:00.234567', '2020-01-03 00:00:00.234567890', 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3.03, 3.03, 3.03, 'hello', 'world'
]);
});
Expand Down Expand Up @@ -205,8 +215,8 @@ describe('ClickHouseDriver', () => {
try {
await driver.createSchemaIfNotExists(name);
await driver.query(`CREATE TABLE ${name}.test (x Int32, s String) ENGINE Log`, []);
await driver.query(`INSERT INTO ${name}.test VALUES (?, ?), (?, ?), (?, ?)`, [1, 'str1', 2, 'str2', 3, 'str3']);
const values = await driver.query(`SELECT * FROM ${name}.test WHERE x = ?`, [2]);
await driver.query(`INSERT INTO ${name}.test VALUES ({p0:Int32}, {p1:String}), ({p2:Int32}, {p3:String}), ({p4:Int32}, {p5:String})`, [1, 'str1', 2, 'str2', 3, 'str3']);
const values = await driver.query(`SELECT * FROM ${name}.test WHERE x = {p0:Int32}`, [2]);
expect(values).toEqual([{ x: '2', s: 'str2' }]);
} finally {
await driver.query(`DROP DATABASE ${name}`, []);
Expand All @@ -220,10 +230,10 @@ describe('ClickHouseDriver', () => {
try {
await driver.createSchemaIfNotExists(name);
await driver.query(`CREATE TABLE ${name}.a (x Int32, s String) ENGINE Log`, []);
await driver.query(`INSERT INTO ${name}.a VALUES (?, ?), (?, ?), (?, ?)`, [1, 'str1', 2, 'str2', 3, 'str3']);
await driver.query(`INSERT INTO ${name}.a VALUES ({p0:Int32}, {p1:String}), ({p2:Int32}, {p3:String}), ({p4:Int32}, {p5:String})`, [1, 'str1', 2, 'str2', 3, 'str3']);

await driver.query(`CREATE TABLE ${name}.b (x Int32, s String) ENGINE Log`, []);
await driver.query(`INSERT INTO ${name}.b VALUES (?, ?), (?, ?), (?, ?)`, [2, 'str2', 3, 'str3', 4, 'str4']);
await driver.query(`INSERT INTO ${name}.b VALUES ({p0:Int32}, {p1:String}), ({p2:Int32}, {p3:String}), ({p4:Int32}, {p5:String})`, [2, 'str2', 3, 'str3', 4, 'str4']);

const values = await driver.query(`SELECT * FROM ${name}.a LEFT OUTER JOIN ${name}.b ON a.x = b.x`, []);
expect(values).toEqual([
Expand All @@ -245,7 +255,7 @@ describe('ClickHouseDriver', () => {

it('datetime with specific timezone', async () => {
await doWithDriver(async (driver) => {
const rows = await driver.query('SELECT toDateTime(?, \'Asia/Istanbul\') as dt', [
const rows = await driver.query('SELECT toDateTime({p0:String}, \'Asia/Istanbul\') as dt', [
'2020-01-01 00:00:00'
]);
expect(rows).toEqual([{
Expand Down
3 changes: 1 addition & 2 deletions packages/cubejs-schema-compiler/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"uuid": "^8.3.2"
},
"devDependencies": {
"@clickhouse/client-common": "^1.8.0",
"@cubejs-backend/apla-clickhouse": "^1.7.0",
"@cubejs-backend/linter": "^1.0.0",
"@cubejs-backend/query-orchestrator": "1.1.3",
Expand All @@ -68,7 +69,6 @@
"@types/lru-cache": "^5.1.0",
"@types/node": "^18",
"@types/ramda": "^0.27.34",
"@types/sqlstring": "^2.3.0",
"@types/syntax-error": "^1.4.1",
"@types/uuid": "^8.3.0",
"antlr4ts-cli": "^0.5.0-alpha.4",
Expand All @@ -79,7 +79,6 @@
"request": "^2.88.0",
"request-promise": "^4.2.4",
"source-map-support": "^0.5.19",
"sqlstring": "^2.3.1",
"testcontainers": "^10.10.4",
"typescript": "~5.2.2",
"uuid": "^8.3.2"
Expand Down
14 changes: 14 additions & 0 deletions packages/cubejs-schema-compiler/src/adapter/ClickHouseQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { BaseQuery } from './BaseQuery';
import { BaseFilter } from './BaseFilter';
import { UserError } from '../compiler/UserError';
import { BaseTimeDimension } from './BaseTimeDimension';
import { ParamAllocator } from './ParamAllocator';

const GRANULARITY_TO_INTERVAL = {
day: 'Day',
Expand All @@ -14,6 +15,13 @@ const GRANULARITY_TO_INTERVAL = {
year: 'Year',
};

class ClickHouseParamAllocator extends ParamAllocator {
public paramPlaceHolder(paramIndex: number): string {
// TODO not always string
return `{p${paramIndex}:String}`;
}
}

class ClickHouseFilter extends BaseFilter {
public likeIgnoreCase(column, not, param, type) {
const p = (!type || type === 'contains' || type === 'ends') ? '%' : '';
Expand All @@ -31,6 +39,10 @@ class ClickHouseFilter extends BaseFilter {
}

export class ClickHouseQuery extends BaseQuery {
public newParamAllocator(expressionParams) {
return new ClickHouseParamAllocator(expressionParams);
}

public newFilter(filter) {
return new ClickHouseFilter(this, filter);
}
Expand Down Expand Up @@ -268,6 +280,8 @@ export class ClickHouseQuery extends BaseQuery {

public sqlTemplates() {
const templates = super.sqlTemplates();
// TODO not every param is a string
templates.params.param = '{p{{ param_index }}:String}';
templates.functions.DATETRUNC = 'DATE_TRUNC({{ args_concat }})';
// TODO: Introduce additional filter in jinja? or parseDateTimeBestEffort?
// https://github.com/ClickHouse/ClickHouse/issues/19351
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { formatQueryParams } from '@clickhouse/client-common';
// TODO migrate to `@clickhouse/client`, upstream clickhouse client
/* eslint-disable */
import ClickHouse from '@cubejs-backend/apla-clickhouse';
import { GenericContainer } from 'testcontainers';
import { format as formatSql } from 'sqlstring';
import { v4 as uuidv4 } from 'uuid';
import { ClickHouseQuery } from '../../../src/adapter/ClickHouseQuery';
import { BaseDbRunner } from "../utils/BaseDbRunner";
Expand Down Expand Up @@ -136,13 +137,29 @@ export class ClickHouseDbRunner extends BaseDbRunner {
} : {};

for (const [query, params] of queries) {
requests.push(clickHouse.querying(formatSql(query, params), {
// ClickHouse have named and typed parameters
// But our interface for parameters between query builder and driver is Array<unknown>
// We don't have access to driver here, so instead this replicates logic from driver
const preparedValues = Object.fromEntries(params.map((value, idx) => {
const paramName = `p${idx}`;
const paramKey = `param_${paramName}`;
const preparedValue = formatQueryParams(value);
return [paramKey, preparedValue];
}));
// TODO drop this
console.log("ClickHOuse testQueries prepared", query, preparedValues);
requests.push(clickHouse.querying(query, {
dataObjects: true,
queryOptions: {
session_id: clickHouse.sessionId,
join_use_nulls: '1',
...extendedDateTimeResultsOptions
...extendedDateTimeResultsOptions,
// Add parameter values to query string
...preparedValues,
}
}).catch((err) => {
// TODO remove message, or make it shorter. ort not, it's just tests
throw new Error(`Failed during query; query: ${query}; params: ${params}`, { cause: err });
}));
}

Expand Down
Loading
Loading