@@ -21,14 +21,14 @@ import {
2121 TableStructure ,
2222 UnloadOptions ,
2323} from '@cubejs-backend/base-driver' ;
24+
25+ import { ClickHouseClient , createClient } from '@clickhouse/client' ;
2426import genericPool , { Pool } from 'generic-pool' ;
2527import { v4 as uuidv4 } from 'uuid' ;
2628import sqlstring from 'sqlstring' ;
2729
2830import { HydrationStream , transformRow } from './HydrationStream' ;
2931
30- const ClickHouse = require ( '@cubejs-backend/apla-clickhouse' ) ;
31-
3232const ClickhouseTypeToGeneric : Record < string , string > = {
3333 enum : 'text' ,
3434 string : 'text' ,
@@ -86,7 +86,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
8686 return 5 ;
8787 }
8888
89- protected readonly pool : Pool < any > ;
89+ protected readonly pool : Pool < ClickHouseClient > ;
9090
9191 protected readonly readOnlyMode : boolean ;
9292
@@ -122,19 +122,33 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
122122 config . dataSource ||
123123 assertDataSource ( 'default' ) ;
124124
125+ // TODO recheck everything in config for new driver
126+ const host = getEnv ( 'dbHost' , { dataSource } ) ;
127+ const port = getEnv ( 'dbPort' , { dataSource } ) ;
128+ const protocol = getEnv ( 'dbSsl' , { dataSource } ) ? 'https:' : 'http:' ;
129+ // TODO proper value here, with proper back compat, and treating protocol
130+ const url = `${ protocol } //${ host } :${ port } ` ;
131+ // TODO drop this
132+ console . log ( 'ClickHouseDriver will use url' , url ) ;
133+
134+ const username = getEnv ( 'dbUser' , { dataSource } ) ;
135+ const password = getEnv ( 'dbPass' , { dataSource } ) ;
125136 this . config = {
126- host : getEnv ( 'dbHost' , { dataSource } ) ,
127- port : getEnv ( 'dbPort' , { dataSource } ) ,
128- auth :
129- getEnv ( 'dbUser' , { dataSource } ) ||
130- getEnv ( 'dbPass' , { dataSource } )
131- ? `${
132- getEnv ( 'dbUser' , { dataSource } )
133- } :${
134- getEnv ( 'dbPass' , { dataSource } )
135- } `
136- : '' ,
137- protocol : getEnv ( 'dbSsl' , { dataSource } ) ? 'https:' : 'http:' ,
137+ // host: getEnv('dbHost', { dataSource }),
138+ // port: getEnv('dbPort', { dataSource }),
139+ url,
140+ // auth:
141+ // getEnv('dbUser', { dataSource }) ||
142+ // getEnv('dbPass', { dataSource })
143+ // ? `${
144+ // getEnv('dbUser', { dataSource })
145+ // }:${
146+ // getEnv('dbPass', { dataSource })
147+ // }`
148+ // : '',
149+ username,
150+ password,
151+ // protocol: getEnv('dbSsl', { dataSource }) ? 'https:' : 'http:',
138152 queryOptions : {
139153 database :
140154 getEnv ( 'dbName' , { dataSource } ) ||
@@ -148,10 +162,17 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
148162 this . readOnlyMode =
149163 getEnv ( 'clickhouseReadOnly' , { dataSource } ) === 'true' ;
150164
165+ // TODO @clickhouse /client have internal pool, that does NOT guarantee same connection, and can break with temp tables. Disable it?
151166 this . pool = genericPool . createPool ( {
152- create : async ( ) => new ClickHouse ( {
167+ create : async ( ) => createClient ( {
153168 ...this . config ,
154- queryOptions : {
169+
170+ username : getEnv ( 'dbUser' , { dataSource } ) ,
171+ password : getEnv ( 'dbPass' , { dataSource } ) ,
172+
173+ database : this . config . queryOptions . database ,
174+ session_id : uuidv4 ( ) ,
175+ clickhouse_settings : {
155176 //
156177 //
157178 // If ClickHouse user's permissions are restricted with "readonly = 1",
@@ -160,9 +181,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
160181 //
161182 //
162183 ...( this . readOnlyMode ? { } : { join_use_nulls : 1 } ) ,
163- session_id : uuidv4 ( ) ,
164- ...this . config . queryOptions ,
165- }
184+ } ,
166185 } ) ,
167186 destroy : ( ) => Promise . resolve ( )
168187 } , {
@@ -176,36 +195,41 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
176195 idleTimeoutMillis : 30000 ,
177196 acquireTimeoutMillis : 20000
178197 } ) ;
198+
199+ // https://github.com/coopernurse/node-pool/blob/ee5db9ddb54ce3a142fde3500116b393d4f2f755/README.md#L220-L226
200+ this . pool . on ( 'factoryCreateError' , ( err ) => {
201+ this . databasePoolError ( err ) ;
202+ } ) ;
203+ this . pool . on ( 'factoryDestroyError' , ( err ) => {
204+ this . databasePoolError ( err ) ;
205+ } ) ;
179206 }
180207
181- protected withConnection ( fn : ( con : any , queryId : string ) => Promise < any > ) {
208+ protected withConnection ( fn : ( con : ClickHouseClient , queryId : string ) => Promise < any > ) {
182209 const self = this ;
183210 const connectionPromise = this . pool . acquire ( ) ;
184211 const queryId = uuidv4 ( ) ;
185212
186213 let cancelled = false ;
187214 const cancelObj : any = { } ;
188215
189- const promise : any = connectionPromise . then ( ( connection : any ) => {
216+ const promise : any = connectionPromise . then ( ( connection : ClickHouseClient ) => {
190217 cancelObj . cancel = async ( ) => {
191218 cancelled = true ;
192219 await self . withConnection ( async conn => {
193- await conn . querying ( `KILL QUERY WHERE query_id = '${ queryId } '` ) ;
220+ await conn . command ( {
221+ query : `KILL QUERY WHERE query_id = '${ queryId } '` ,
222+ } ) ;
194223 } ) ;
195224 } ;
196225 return fn ( connection , queryId )
197- . then ( res => this . pool . release ( connection ) . then ( ( ) => {
226+ . finally ( ( ) => this . pool . release ( connection ) )
227+ . then ( ( res ) => {
198228 if ( cancelled ) {
199229 throw new Error ( 'Query cancelled' ) ;
200230 }
201231 return res ;
202- } ) )
203- . catch ( ( err ) => this . pool . release ( connection ) . then ( ( ) => {
204- if ( cancelled ) {
205- throw new Error ( 'Query cancelled' ) ;
206- }
207- throw err ;
208- } ) ) ;
232+ } ) ;
209233 } ) ;
210234 promise . cancel = ( ) => cancelObj . cancel ( ) ;
211235
@@ -229,10 +253,10 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
229253 protected queryResponse ( query : string , values : unknown [ ] ) {
230254 const formattedQuery = sqlstring . format ( query , values ) ;
231255
232- return this . withConnection ( ( connection , queryId ) => connection . querying ( formattedQuery , {
233- dataObjects : true ,
234- queryOptions : {
235- query_id : queryId ,
256+ return this . withConnection ( ( connection , queryId ) => connection . query ( {
257+ query : formattedQuery ,
258+ query_id : queryId ,
259+ clickhouse_settings : {
236260 //
237261 //
238262 // If ClickHouse user's permissions are restricted with "readonly = 1",
@@ -241,7 +265,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
241265 //
242266 //
243267 ...( this . readOnlyMode ? { } : { join_use_nulls : 1 } ) ,
244- }
268+ } ,
245269 } ) ) ;
246270 }
247271
0 commit comments