Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
/* eslint-disable security/detect-object-injection */
import { createClient, ClickHouseClient } from '@clickhouse/client';
import * as csv from 'csv';
import getPort from 'get-port';
import { Readable, Stream } from 'stream';
import { LRUStorage } from '../../caching/lru-storage.js';
import { getTunnel } from '../../helpers/get-ssh-tunnel.js';
import { DAO_CONSTANTS } from '../../helpers/data-access-objects-constants.js';
import { ERROR_MESSAGES } from '../../helpers/errors/error-messages.js';
import { tableSettingsFieldValidator } from '../../helpers/validation/table-settings-validator.js';
Expand All @@ -29,7 +31,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
}

public async addRowInTable(tableName: string, row: Record<string, unknown>): Promise<Record<string, unknown>> {
const client = this.getClickHouseClient();
const client = await this.getClickHouseClient();
try {
await client.insert({
table: tableName,
Expand All @@ -46,7 +48,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
tableName: string,
primaryKey: Record<string, unknown>,
): Promise<Record<string, unknown>> {
const client = this.getClickHouseClient();
const client = await this.getClickHouseClient();
try {
const whereClause = this.buildWhereClause(primaryKey);
const query = `ALTER TABLE ${this.escapeIdentifier(tableName)} DELETE WHERE ${whereClause}`;
Expand All @@ -66,7 +68,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
if (!referencedFieldName || !fieldValues.length) {
return [];
}
const client = this.getClickHouseClient();
const client = await this.getClickHouseClient();
try {
const columnsToSelect = identityColumnName
? `${this.escapeIdentifier(referencedFieldName)}, ${this.escapeIdentifier(identityColumnName)}`
Expand All @@ -92,7 +94,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
primaryKey: Record<string, unknown>,
settings: TableSettingsDS,
): Promise<Record<string, unknown>> {
const client = this.getClickHouseClient();
const client = await this.getClickHouseClient();
try {
let availableFields: string[] = [];
if (settings) {
Expand Down Expand Up @@ -127,7 +129,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
return [];
}

const client = this.getClickHouseClient();
const client = await this.getClickHouseClient();
try {
let availableFields: string[] = [];
if (settings) {
Expand Down Expand Up @@ -172,7 +174,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
: DAO_CONSTANTS.DEFAULT_PAGINATION.perPage;

const offset = (page - 1) * perPage;
const client = this.getClickHouseClient();
const client = await this.getClickHouseClient();

try {
if (!tableStructure) {
Expand Down Expand Up @@ -318,7 +320,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
return cachedPrimaryColumns;
}

const client = this.getClickHouseClient();
const client = await this.getClickHouseClient();
try {
const query = `
SELECT name, type
Expand Down Expand Up @@ -349,7 +351,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
}

public async getTablesFromDB(): Promise<Array<TableDS>> {
const client = this.getClickHouseClient();
const client = await this.getClickHouseClient();
try {
const database = this.connection.database || 'default';
const query = `
Expand Down Expand Up @@ -382,7 +384,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
return cachedTableStructure;
}

const client = this.getClickHouseClient();
const client = await this.getClickHouseClient();
try {
const database = this.connection.database || 'default';
const query = `
Expand Down Expand Up @@ -430,7 +432,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
}

public async testConnect(): Promise<TestConnectionResultDS> {
const client = this.getClickHouseClient();
const client = await this.getClickHouseClient();
try {
const result = await client.query({
query: 'SELECT 1',
Expand All @@ -457,7 +459,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
row: Record<string, unknown>,
primaryKey: Record<string, unknown>,
): Promise<Record<string, unknown>> {
const client = this.getClickHouseClient();
const client = await this.getClickHouseClient();
try {
const setClause = Object.entries(row)
.map(([key, value]) => {
Expand Down Expand Up @@ -486,7 +488,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
return [];
}

const client = this.getClickHouseClient();
const client = await this.getClickHouseClient();
try {
const setClause = Object.entries(newValues)
.map(([key, value]) => {
Expand All @@ -511,7 +513,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
return 0;
}

const client = this.getClickHouseClient();
const client = await this.getClickHouseClient();
try {
const whereConditions = primaryKeys.map((pk) => `(${this.buildWhereClause(pk)})`).join(' OR ');

Expand All @@ -537,7 +539,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
}

public async isView(tableName: string): Promise<boolean> {
const client = this.getClickHouseClient();
const client = await this.getClickHouseClient();
try {
const database = this.connection.database || 'default';
const query = `
Expand Down Expand Up @@ -597,7 +599,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
}

public async importCSVInTable(file: Express.Multer.File, tableName: string): Promise<void> {
const client = this.getClickHouseClient();
const client = await this.getClickHouseClient();
try {
const stream = new Readable();
stream.push(file.buffer);
Expand All @@ -623,7 +625,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
}

public async executeRawQuery(query: string, _tableName?: string): Promise<Array<Record<string, unknown>>> {
const client = this.getClickHouseClient();
const client = await this.getClickHouseClient();
try {
const result = await client.query({
query,
Expand Down Expand Up @@ -811,8 +813,15 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
};
}

private getClickHouseClient(): ClickHouseClient {
const { host, port, username, password, database, ssl, cert } = this.connection;
private async getClickHouseClient(): Promise<ClickHouseClient> {
if (this.connection.ssh) {
return this.createTunneledClickHouseClient();
}
return this.createDirectClickHouseClient(this.connection.host, this.connection.port);
}

private createDirectClickHouseClient(host: string, port: number): ClickHouseClient {
const { username, password, database, ssl, cert } = this.connection;
const protocol = ssl ? 'https' : 'http';
const url = `${protocol}://${host}:${port}`;

Expand All @@ -831,4 +840,44 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements

return createClient(clientConfig);
}

private async createTunneledClickHouseClient(): Promise<ClickHouseClient> {
const connectionCopy = { ...this.connection };

const cachedTnl = LRUStorage.getTunnelCache(connectionCopy);
if (cachedTnl && cachedTnl.clickhouse && cachedTnl.server && cachedTnl.client) {
return cachedTnl.clickhouse;
}

const freePort = await getPort();

return new Promise<ClickHouseClient>(async (resolve, reject): Promise<void> => {
try {
const [server, client] = await getTunnel(connectionCopy, freePort);
const clickhouseClient = this.createDirectClickHouseClient('127.0.0.1', freePort);

const tnlCachedObj = {
server: server,
client: client,
Comment on lines +860 to +861
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The object property shorthand can be used here to improve code readability and reduce redundancy. Instead of explicitly assigning 'server: server' and 'client: client', these can be simplified using shorthand property syntax.

Suggested change
server: server,
client: client,
server,
client,

Copilot uses AI. Check for mistakes.
clickhouse: clickhouseClient,
};

LRUStorage.setTunnelCache(connectionCopy, tnlCachedObj);
resolve(clickhouseClient);

client.on('error', (e) => {
LRUStorage.delTunnelCache(connectionCopy);
reject(e);
});

server.on('error', (e) => {
LRUStorage.delTunnelCache(connectionCopy);
reject(e);
});
} catch (error) {
LRUStorage.delTunnelCache(connectionCopy);
reject(error);
}
});
Comment on lines +854 to +881
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a race condition in the error handler registration. The Promise is resolved on line 866 before error handlers are registered on lines 868-876. If an error occurs immediately after the tunnel is established but before error handlers are attached, it will go unhandled. Additionally, if an error occurs after the promise has already been resolved, calling reject will have no effect since promises can only be settled once.

The error handlers should be registered before resolving the promise, and they should not call reject after the promise is resolved. Consider restructuring to match the pattern used in similar implementations where error handlers either don't reject after resolving, or the logic is structured to prevent this race condition.

Copilot uses AI. Check for mistakes.
Comment on lines +854 to +881
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the Promise constructor with an async executor function is an anti-pattern. The executor function is marked as async but this is unnecessary and can lead to subtle bugs. The async keyword on the executor means it returns a Promise, but the Promise constructor doesn't handle this returned promise. If an error is thrown in an async executor before any await, it will create an unhandled promise rejection instead of being caught by the try-catch.

The async keyword should be removed from the executor function, or the code should be restructured to avoid the Promise constructor entirely by using async/await directly.

Suggested change
return new Promise<ClickHouseClient>(async (resolve, reject): Promise<void> => {
try {
const [server, client] = await getTunnel(connectionCopy, freePort);
const clickhouseClient = this.createDirectClickHouseClient('127.0.0.1', freePort);
const tnlCachedObj = {
server: server,
client: client,
clickhouse: clickhouseClient,
};
LRUStorage.setTunnelCache(connectionCopy, tnlCachedObj);
resolve(clickhouseClient);
client.on('error', (e) => {
LRUStorage.delTunnelCache(connectionCopy);
reject(e);
});
server.on('error', (e) => {
LRUStorage.delTunnelCache(connectionCopy);
reject(e);
});
} catch (error) {
LRUStorage.delTunnelCache(connectionCopy);
reject(error);
}
});
try {
const [server, client] = await getTunnel(connectionCopy, freePort);
const clickhouseClient = this.createDirectClickHouseClient('127.0.0.1', freePort);
const tnlCachedObj = {
server: server,
client: client,
clickhouse: clickhouseClient,
};
LRUStorage.setTunnelCache(connectionCopy, tnlCachedObj);
client.on('error', () => {
LRUStorage.delTunnelCache(connectionCopy);
});
server.on('error', () => {
LRUStorage.delTunnelCache(connectionCopy);
});
return clickhouseClient;
} catch (error) {
LRUStorage.delTunnelCache(connectionCopy);
throw error;
}

Copilot uses AI. Check for mistakes.
}
}
Loading