@@ -21,14 +21,14 @@ import {
2121 TableStructure ,
2222 UnloadOptions ,
2323} from '@cubejs-backend/base-driver' ;
24+
25+ import { ClickHouseClient , createClient } from '@clickhouse/client' ;
2426import genericPool , { Pool } from 'generic-pool' ;
2527import { v4 as uuidv4 } from 'uuid' ;
2628import sqlstring from 'sqlstring' ;
2729
2830import { HydrationStream , transformRow } from './HydrationStream' ;
2931
30- const ClickHouse = require ( '@cubejs-backend/apla-clickhouse' ) ;
31-
3232const ClickhouseTypeToGeneric : Record < string , string > = {
3333 enum : 'text' ,
3434 string : 'text' ,
@@ -86,7 +86,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
8686 return 5 ;
8787 }
8888
89- protected readonly pool : Pool < any > ;
89+ protected readonly pool : Pool < ClickHouseClient > ;
9090
9191 protected readonly readOnlyMode : boolean ;
9292
@@ -122,19 +122,31 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
122122 config . dataSource ||
123123 assertDataSource ( 'default' ) ;
124124
125+ // TODO recheck everything in config for new driver
126+ const host = getEnv ( 'dbHost' , { dataSource } ) ;
127+ const port = getEnv ( 'dbPort' , { dataSource } ) ;
128+ const protocol = getEnv ( 'dbSsl' , { dataSource } ) ? 'https:' : 'http:' ;
129+ // TODO proper value here, with proper back compat, and treating protocol
130+ const url = `${ protocol } ://${ host } :${ port } ` ;
131+
132+ const username = getEnv ( 'dbUser' , { dataSource } ) ;
133+ const password = getEnv ( 'dbPass' , { dataSource } ) ;
125134 this . config = {
126- host : getEnv ( 'dbHost' , { dataSource } ) ,
127- port : getEnv ( 'dbPort' , { dataSource } ) ,
128- auth :
129- getEnv ( 'dbUser' , { dataSource } ) ||
130- getEnv ( 'dbPass' , { dataSource } )
131- ? `${
132- getEnv ( 'dbUser' , { dataSource } )
133- } :${
134- getEnv ( 'dbPass' , { dataSource } )
135- } `
136- : '' ,
137- protocol : getEnv ( 'dbSsl' , { dataSource } ) ? 'https:' : 'http:' ,
135+ // host: getEnv('dbHost', { dataSource }),
136+ // port: getEnv('dbPort', { dataSource }),
137+ url,
138+ // auth:
139+ // getEnv('dbUser', { dataSource }) ||
140+ // getEnv('dbPass', { dataSource })
141+ // ? `${
142+ // getEnv('dbUser', { dataSource })
143+ // }:${
144+ // getEnv('dbPass', { dataSource })
145+ // }`
146+ // : '',
147+ username,
148+ password,
149+ // protocol: getEnv('dbSsl', { dataSource }) ? 'https:' : 'http:',
138150 queryOptions : {
139151 database :
140152 getEnv ( 'dbName' , { dataSource } ) ||
@@ -148,10 +160,17 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
148160 this . readOnlyMode =
149161 getEnv ( 'clickhouseReadOnly' , { dataSource } ) === 'true' ;
150162
163+ // TODO @clickhouse /client have internal pool, that does NOT guarantee same connection, and can break with temp tables. Disable it?
151164 this . pool = genericPool . createPool ( {
152- create : async ( ) => new ClickHouse ( {
165+ create : async ( ) => createClient ( {
153166 ...this . config ,
154- queryOptions : {
167+
168+ username : getEnv ( 'dbUser' , { dataSource } ) ,
169+ password : getEnv ( 'dbPass' , { dataSource } ) ,
170+
171+ database : this . config . queryOptions . database ,
172+ session_id : uuidv4 ( ) ,
173+ clickhouse_settings : {
155174 //
156175 //
157176 // If ClickHouse user's permissions are restricted with "readonly = 1",
@@ -160,9 +179,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
160179 //
161180 //
162181 ...( this . readOnlyMode ? { } : { join_use_nulls : 1 } ) ,
163- session_id : uuidv4 ( ) ,
164- ...this . config . queryOptions ,
165- }
182+ } ,
166183 } ) ,
167184 destroy : ( ) => Promise . resolve ( )
168185 } , {
@@ -176,36 +193,41 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
176193 idleTimeoutMillis : 30000 ,
177194 acquireTimeoutMillis : 20000
178195 } ) ;
196+
197+ // https://github.com/coopernurse/node-pool/blob/ee5db9ddb54ce3a142fde3500116b393d4f2f755/README.md#L220-L226
198+ this . pool . on ( 'factoryCreateError' , ( err ) => {
199+ this . databasePoolError ( err ) ;
200+ } ) ;
201+ this . pool . on ( 'factoryDestroyError' , ( err ) => {
202+ this . databasePoolError ( err ) ;
203+ } ) ;
179204 }
180205
181- protected withConnection ( fn : ( con : any , queryId : string ) => Promise < any > ) {
206+ protected withConnection ( fn : ( con : ClickHouseClient , queryId : string ) => Promise < any > ) {
182207 const self = this ;
183208 const connectionPromise = this . pool . acquire ( ) ;
184209 const queryId = uuidv4 ( ) ;
185210
186211 let cancelled = false ;
187212 const cancelObj : any = { } ;
188213
189- const promise : any = connectionPromise . then ( ( connection : any ) => {
214+ const promise : any = connectionPromise . then ( ( connection : ClickHouseClient ) => {
190215 cancelObj . cancel = async ( ) => {
191216 cancelled = true ;
192217 await self . withConnection ( async conn => {
193- await conn . querying ( `KILL QUERY WHERE query_id = '${ queryId } '` ) ;
218+ await conn . command ( {
219+ query : `KILL QUERY WHERE query_id = '${ queryId } '` ,
220+ } ) ;
194221 } ) ;
195222 } ;
196223 return fn ( connection , queryId )
197- . then ( res => this . pool . release ( connection ) . then ( ( ) => {
224+ . finally ( ( ) => this . pool . release ( connection ) )
225+ . then ( ( res ) => {
198226 if ( cancelled ) {
199227 throw new Error ( 'Query cancelled' ) ;
200228 }
201229 return res ;
202- } ) )
203- . catch ( ( err ) => this . pool . release ( connection ) . then ( ( ) => {
204- if ( cancelled ) {
205- throw new Error ( 'Query cancelled' ) ;
206- }
207- throw err ;
208- } ) ) ;
230+ } ) ;
209231 } ) ;
210232 promise . cancel = ( ) => cancelObj . cancel ( ) ;
211233
@@ -229,10 +251,10 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
229251 protected queryResponse ( query : string , values : unknown [ ] ) {
230252 const formattedQuery = sqlstring . format ( query , values ) ;
231253
232- return this . withConnection ( ( connection , queryId ) => connection . querying ( formattedQuery , {
233- dataObjects : true ,
234- queryOptions : {
235- query_id : queryId ,
254+ return this . withConnection ( ( connection , queryId ) => connection . query ( {
255+ query : formattedQuery ,
256+ query_id : queryId ,
257+ clickhouse_settings : {
236258 //
237259 //
238260 // If ClickHouse user's permissions are restricted with "readonly = 1",
@@ -241,7 +263,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
241263 //
242264 //
243265 ...( this . readOnlyMode ? { } : { join_use_nulls : 1 } ) ,
244- }
266+ } ,
245267 } ) ) ;
246268 }
247269
0 commit comments