diff --git a/shared-code/src/data-access-layer/data-access-objects/data-access-object-clickhouse.ts b/shared-code/src/data-access-layer/data-access-objects/data-access-object-clickhouse.ts index e52ba08e..d05e21b0 100644 --- a/shared-code/src/data-access-layer/data-access-objects/data-access-object-clickhouse.ts +++ b/shared-code/src/data-access-layer/data-access-objects/data-access-object-clickhouse.ts @@ -1,8 +1,10 @@ /* eslint-disable security/detect-object-injection */ import { createClient, ClickHouseClient } from '@clickhouse/client'; import * as csv from 'csv'; +import getPort from 'get-port'; import { Readable, Stream } from 'stream'; import { LRUStorage } from '../../caching/lru-storage.js'; +import { getTunnel } from '../../helpers/get-ssh-tunnel.js'; import { DAO_CONSTANTS } from '../../helpers/data-access-objects-constants.js'; import { ERROR_MESSAGES } from '../../helpers/errors/error-messages.js'; import { tableSettingsFieldValidator } from '../../helpers/validation/table-settings-validator.js'; @@ -29,7 +31,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements } public async addRowInTable(tableName: string, row: Record): Promise> { - const client = this.getClickHouseClient(); + const client = await this.getClickHouseClient(); try { await client.insert({ table: tableName, @@ -46,7 +48,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements tableName: string, primaryKey: Record, ): Promise> { - const client = this.getClickHouseClient(); + const client = await this.getClickHouseClient(); try { const whereClause = this.buildWhereClause(primaryKey); const query = `ALTER TABLE ${this.escapeIdentifier(tableName)} DELETE WHERE ${whereClause}`; @@ -66,7 +68,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements if (!referencedFieldName || !fieldValues.length) { return []; } - const client = this.getClickHouseClient(); + const client = await this.getClickHouseClient(); try { const columnsToSelect = identityColumnName ? `${this.escapeIdentifier(referencedFieldName)}, ${this.escapeIdentifier(identityColumnName)}` @@ -92,7 +94,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements primaryKey: Record, settings: TableSettingsDS, ): Promise> { - const client = this.getClickHouseClient(); + const client = await this.getClickHouseClient(); try { let availableFields: string[] = []; if (settings) { @@ -127,7 +129,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements return []; } - const client = this.getClickHouseClient(); + const client = await this.getClickHouseClient(); try { let availableFields: string[] = []; if (settings) { @@ -172,7 +174,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements : DAO_CONSTANTS.DEFAULT_PAGINATION.perPage; const offset = (page - 1) * perPage; - const client = this.getClickHouseClient(); + const client = await this.getClickHouseClient(); try { if (!tableStructure) { @@ -318,7 +320,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements return cachedPrimaryColumns; } - const client = this.getClickHouseClient(); + const client = await this.getClickHouseClient(); try { const query = ` SELECT name, type @@ -349,7 +351,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements } public async getTablesFromDB(): Promise> { - const client = this.getClickHouseClient(); + const client = await this.getClickHouseClient(); try { const database = this.connection.database || 'default'; const query = ` @@ -382,7 +384,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements return cachedTableStructure; } - const client = this.getClickHouseClient(); + const client = await this.getClickHouseClient(); try { const database = this.connection.database || 'default'; const query = ` @@ -430,7 +432,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements } public async testConnect(): Promise { - const client = this.getClickHouseClient(); + const client = await this.getClickHouseClient(); try { const result = await client.query({ query: 'SELECT 1', @@ -457,7 +459,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements row: Record, primaryKey: Record, ): Promise> { - const client = this.getClickHouseClient(); + const client = await this.getClickHouseClient(); try { const setClause = Object.entries(row) .map(([key, value]) => { @@ -486,7 +488,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements return []; } - const client = this.getClickHouseClient(); + const client = await this.getClickHouseClient(); try { const setClause = Object.entries(newValues) .map(([key, value]) => { @@ -511,7 +513,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements return 0; } - const client = this.getClickHouseClient(); + const client = await this.getClickHouseClient(); try { const whereConditions = primaryKeys.map((pk) => `(${this.buildWhereClause(pk)})`).join(' OR '); @@ -537,7 +539,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements } public async isView(tableName: string): Promise { - const client = this.getClickHouseClient(); + const client = await this.getClickHouseClient(); try { const database = this.connection.database || 'default'; const query = ` @@ -597,7 +599,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements } public async importCSVInTable(file: Express.Multer.File, tableName: string): Promise { - const client = this.getClickHouseClient(); + const client = await this.getClickHouseClient(); try { const stream = new Readable(); stream.push(file.buffer); @@ -623,7 +625,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements } public async executeRawQuery(query: string, _tableName?: string): Promise>> { - const client = this.getClickHouseClient(); + const client = await this.getClickHouseClient(); try { const result = await client.query({ query, @@ -811,8 +813,15 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements }; } - private getClickHouseClient(): ClickHouseClient { - const { host, port, username, password, database, ssl, cert } = this.connection; + private async getClickHouseClient(): Promise { + if (this.connection.ssh) { + return this.createTunneledClickHouseClient(); + } + return this.createDirectClickHouseClient(this.connection.host, this.connection.port); + } + + private createDirectClickHouseClient(host: string, port: number): ClickHouseClient { + const { username, password, database, ssl, cert } = this.connection; const protocol = ssl ? 'https' : 'http'; const url = `${protocol}://${host}:${port}`; @@ -831,4 +840,44 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements return createClient(clientConfig); } + + private async createTunneledClickHouseClient(): Promise { + const connectionCopy = { ...this.connection }; + + const cachedTnl = LRUStorage.getTunnelCache(connectionCopy); + if (cachedTnl && cachedTnl.clickhouse && cachedTnl.server && cachedTnl.client) { + return cachedTnl.clickhouse; + } + + const freePort = await getPort(); + + return new Promise(async (resolve, reject): Promise => { + try { + const [server, client] = await getTunnel(connectionCopy, freePort); + const clickhouseClient = this.createDirectClickHouseClient('127.0.0.1', freePort); + + const tnlCachedObj = { + server: server, + client: client, + clickhouse: clickhouseClient, + }; + + LRUStorage.setTunnelCache(connectionCopy, tnlCachedObj); + resolve(clickhouseClient); + + client.on('error', (e) => { + LRUStorage.delTunnelCache(connectionCopy); + reject(e); + }); + + server.on('error', (e) => { + LRUStorage.delTunnelCache(connectionCopy); + reject(e); + }); + } catch (error) { + LRUStorage.delTunnelCache(connectionCopy); + reject(error); + } + }); + } }