Skip to content

Commit 0a43cd4

Browse files
authored
Merge pull request #1477 from rocket-admin/backend_clickhouse_ssh
refactor: update getClickHouseClient to be asynchronous and implement SSH tunneling
2 parents 9affeb8 + e27e537 commit 0a43cd4

File tree

1 file changed

+67
-18
lines changed

1 file changed

+67
-18
lines changed

shared-code/src/data-access-layer/data-access-objects/data-access-object-clickhouse.ts

Lines changed: 67 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
/* eslint-disable security/detect-object-injection */
22
import { createClient, ClickHouseClient } from '@clickhouse/client';
33
import * as csv from 'csv';
4+
import getPort from 'get-port';
45
import { Readable, Stream } from 'stream';
56
import { LRUStorage } from '../../caching/lru-storage.js';
7+
import { getTunnel } from '../../helpers/get-ssh-tunnel.js';
68
import { DAO_CONSTANTS } from '../../helpers/data-access-objects-constants.js';
79
import { ERROR_MESSAGES } from '../../helpers/errors/error-messages.js';
810
import { tableSettingsFieldValidator } from '../../helpers/validation/table-settings-validator.js';
@@ -29,7 +31,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
2931
}
3032

3133
public async addRowInTable(tableName: string, row: Record<string, unknown>): Promise<Record<string, unknown>> {
32-
const client = this.getClickHouseClient();
34+
const client = await this.getClickHouseClient();
3335
try {
3436
await client.insert({
3537
table: tableName,
@@ -46,7 +48,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
4648
tableName: string,
4749
primaryKey: Record<string, unknown>,
4850
): Promise<Record<string, unknown>> {
49-
const client = this.getClickHouseClient();
51+
const client = await this.getClickHouseClient();
5052
try {
5153
const whereClause = this.buildWhereClause(primaryKey);
5254
const query = `ALTER TABLE ${this.escapeIdentifier(tableName)} DELETE WHERE ${whereClause}`;
@@ -66,7 +68,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
6668
if (!referencedFieldName || !fieldValues.length) {
6769
return [];
6870
}
69-
const client = this.getClickHouseClient();
71+
const client = await this.getClickHouseClient();
7072
try {
7173
const columnsToSelect = identityColumnName
7274
? `${this.escapeIdentifier(referencedFieldName)}, ${this.escapeIdentifier(identityColumnName)}`
@@ -92,7 +94,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
9294
primaryKey: Record<string, unknown>,
9395
settings: TableSettingsDS,
9496
): Promise<Record<string, unknown>> {
95-
const client = this.getClickHouseClient();
97+
const client = await this.getClickHouseClient();
9698
try {
9799
let availableFields: string[] = [];
98100
if (settings) {
@@ -127,7 +129,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
127129
return [];
128130
}
129131

130-
const client = this.getClickHouseClient();
132+
const client = await this.getClickHouseClient();
131133
try {
132134
let availableFields: string[] = [];
133135
if (settings) {
@@ -172,7 +174,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
172174
: DAO_CONSTANTS.DEFAULT_PAGINATION.perPage;
173175

174176
const offset = (page - 1) * perPage;
175-
const client = this.getClickHouseClient();
177+
const client = await this.getClickHouseClient();
176178

177179
try {
178180
if (!tableStructure) {
@@ -318,7 +320,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
318320
return cachedPrimaryColumns;
319321
}
320322

321-
const client = this.getClickHouseClient();
323+
const client = await this.getClickHouseClient();
322324
try {
323325
const query = `
324326
SELECT name, type
@@ -349,7 +351,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
349351
}
350352

351353
public async getTablesFromDB(): Promise<Array<TableDS>> {
352-
const client = this.getClickHouseClient();
354+
const client = await this.getClickHouseClient();
353355
try {
354356
const database = this.connection.database || 'default';
355357
const query = `
@@ -382,7 +384,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
382384
return cachedTableStructure;
383385
}
384386

385-
const client = this.getClickHouseClient();
387+
const client = await this.getClickHouseClient();
386388
try {
387389
const database = this.connection.database || 'default';
388390
const query = `
@@ -430,7 +432,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
430432
}
431433

432434
public async testConnect(): Promise<TestConnectionResultDS> {
433-
const client = this.getClickHouseClient();
435+
const client = await this.getClickHouseClient();
434436
try {
435437
const result = await client.query({
436438
query: 'SELECT 1',
@@ -457,7 +459,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
457459
row: Record<string, unknown>,
458460
primaryKey: Record<string, unknown>,
459461
): Promise<Record<string, unknown>> {
460-
const client = this.getClickHouseClient();
462+
const client = await this.getClickHouseClient();
461463
try {
462464
const setClause = Object.entries(row)
463465
.map(([key, value]) => {
@@ -486,7 +488,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
486488
return [];
487489
}
488490

489-
const client = this.getClickHouseClient();
491+
const client = await this.getClickHouseClient();
490492
try {
491493
const setClause = Object.entries(newValues)
492494
.map(([key, value]) => {
@@ -511,7 +513,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
511513
return 0;
512514
}
513515

514-
const client = this.getClickHouseClient();
516+
const client = await this.getClickHouseClient();
515517
try {
516518
const whereConditions = primaryKeys.map((pk) => `(${this.buildWhereClause(pk)})`).join(' OR ');
517519

@@ -537,7 +539,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
537539
}
538540

539541
public async isView(tableName: string): Promise<boolean> {
540-
const client = this.getClickHouseClient();
542+
const client = await this.getClickHouseClient();
541543
try {
542544
const database = this.connection.database || 'default';
543545
const query = `
@@ -597,7 +599,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
597599
}
598600

599601
public async importCSVInTable(file: Express.Multer.File, tableName: string): Promise<void> {
600-
const client = this.getClickHouseClient();
602+
const client = await this.getClickHouseClient();
601603
try {
602604
const stream = new Readable();
603605
stream.push(file.buffer);
@@ -623,7 +625,7 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
623625
}
624626

625627
public async executeRawQuery(query: string, _tableName?: string): Promise<Array<Record<string, unknown>>> {
626-
const client = this.getClickHouseClient();
628+
const client = await this.getClickHouseClient();
627629
try {
628630
const result = await client.query({
629631
query,
@@ -811,8 +813,15 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
811813
};
812814
}
813815

814-
private getClickHouseClient(): ClickHouseClient {
815-
const { host, port, username, password, database, ssl, cert } = this.connection;
816+
private async getClickHouseClient(): Promise<ClickHouseClient> {
817+
if (this.connection.ssh) {
818+
return this.createTunneledClickHouseClient();
819+
}
820+
return this.createDirectClickHouseClient(this.connection.host, this.connection.port);
821+
}
822+
823+
private createDirectClickHouseClient(host: string, port: number): ClickHouseClient {
824+
const { username, password, database, ssl, cert } = this.connection;
816825
const protocol = ssl ? 'https' : 'http';
817826
const url = `${protocol}://${host}:${port}`;
818827

@@ -831,4 +840,44 @@ export class DataAccessObjectClickHouse extends BasicDataAccessObject implements
831840

832841
return createClient(clientConfig);
833842
}
843+
844+
private async createTunneledClickHouseClient(): Promise<ClickHouseClient> {
845+
const connectionCopy = { ...this.connection };
846+
847+
const cachedTnl = LRUStorage.getTunnelCache(connectionCopy);
848+
if (cachedTnl && cachedTnl.clickhouse && cachedTnl.server && cachedTnl.client) {
849+
return cachedTnl.clickhouse;
850+
}
851+
852+
const freePort = await getPort();
853+
854+
return new Promise<ClickHouseClient>(async (resolve, reject): Promise<void> => {
855+
try {
856+
const [server, client] = await getTunnel(connectionCopy, freePort);
857+
const clickhouseClient = this.createDirectClickHouseClient('127.0.0.1', freePort);
858+
859+
const tnlCachedObj = {
860+
server: server,
861+
client: client,
862+
clickhouse: clickhouseClient,
863+
};
864+
865+
LRUStorage.setTunnelCache(connectionCopy, tnlCachedObj);
866+
resolve(clickhouseClient);
867+
868+
client.on('error', (e) => {
869+
LRUStorage.delTunnelCache(connectionCopy);
870+
reject(e);
871+
});
872+
873+
server.on('error', (e) => {
874+
LRUStorage.delTunnelCache(connectionCopy);
875+
reject(e);
876+
});
877+
} catch (error) {
878+
LRUStorage.delTunnelCache(connectionCopy);
879+
reject(error);
880+
}
881+
});
882+
}
834883
}

0 commit comments

Comments
 (0)