@@ -18,17 +18,19 @@ import {
1818 QuerySchemasResult ,
1919 StreamOptions ,
2020 StreamTableDataWithTypes ,
21+ TableQueryResult ,
2122 TableStructure ,
2223 UnloadOptions ,
2324} from '@cubejs-backend/base-driver' ;
25+
26+ import { ClickHouseClient , createClient } from '@clickhouse/client' ;
27+ import type { ResponseJSON } from '@clickhouse/client' ;
2428import genericPool , { Pool } from 'generic-pool' ;
2529import { v4 as uuidv4 } from 'uuid' ;
2630import sqlstring from 'sqlstring' ;
2731
2832import { HydrationStream , transformRow } from './HydrationStream' ;
2933
30- const ClickHouse = require ( '@cubejs-backend/apla-clickhouse' ) ;
31-
3234const ClickhouseTypeToGeneric : Record < string , string > = {
3335 enum : 'text' ,
3436 string : 'text' ,
@@ -86,7 +88,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
8688 return 5 ;
8789 }
8890
89- protected readonly pool : Pool < any > ;
91+ protected readonly pool : Pool < ClickHouseClient > ;
9092
9193 protected readonly readOnlyMode : boolean ;
9294
@@ -122,19 +124,33 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
122124 config . dataSource ||
123125 assertDataSource ( 'default' ) ;
124126
127+ // TODO recheck everything in config for new driver
128+ const host = getEnv ( 'dbHost' , { dataSource } ) ;
129+ const port = getEnv ( 'dbPort' , { dataSource } ) ;
130+ const protocol = getEnv ( 'dbSsl' , { dataSource } ) ? 'https:' : 'http:' ;
131+ // TODO proper value here, with proper back compat, and treating protocol
132+ const url = `${ protocol } //${ host } :${ port } ` ;
133+ // TODO drop this
134+ console . log ( 'ClickHouseDriver will use url' , url ) ;
135+
136+ const username = getEnv ( 'dbUser' , { dataSource } ) ;
137+ const password = getEnv ( 'dbPass' , { dataSource } ) ;
125138 this . config = {
126- host : getEnv ( 'dbHost' , { dataSource } ) ,
127- port : getEnv ( 'dbPort' , { dataSource } ) ,
128- auth :
129- getEnv ( 'dbUser' , { dataSource } ) ||
130- getEnv ( 'dbPass' , { dataSource } )
131- ? `${
132- getEnv ( 'dbUser' , { dataSource } )
133- } :${
134- getEnv ( 'dbPass' , { dataSource } )
135- } `
136- : '' ,
137- protocol : getEnv ( 'dbSsl' , { dataSource } ) ? 'https:' : 'http:' ,
139+ // host: getEnv('dbHost', { dataSource }),
140+ // port: getEnv('dbPort', { dataSource }),
141+ url,
142+ // auth:
143+ // getEnv('dbUser', { dataSource }) ||
144+ // getEnv('dbPass', { dataSource })
145+ // ? `${
146+ // getEnv('dbUser', { dataSource })
147+ // }:${
148+ // getEnv('dbPass', { dataSource })
149+ // }`
150+ // : '',
151+ username,
152+ password,
153+ // protocol: getEnv('dbSsl', { dataSource }) ? 'https:' : 'http:',
138154 queryOptions : {
139155 database :
140156 getEnv ( 'dbName' , { dataSource } ) ||
@@ -148,64 +164,95 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
148164 this . readOnlyMode =
149165 getEnv ( 'clickhouseReadOnly' , { dataSource } ) === 'true' ;
150166
151- this . pool = genericPool . createPool ( {
152- create : async ( ) => new ClickHouse ( {
153- ...this . config ,
154- queryOptions : {
155- //
156- //
157- // If ClickHouse user's permissions are restricted with "readonly = 1",
158- // change settings queries are not allowed. Thus, "join_use_nulls" setting
159- // can not be changed
160- //
161- //
162- ...( this . readOnlyMode ? { } : { join_use_nulls : 1 } ) ,
163- session_id : uuidv4 ( ) ,
164- ...this . config . queryOptions ,
165- }
166- } ) ,
167- destroy : ( ) => Promise . resolve ( )
168- } , {
169- min : 0 ,
170- max :
171- config . maxPoolSize ||
172- getEnv ( 'dbMaxPoolSize' , { dataSource } ) ||
173- 8 ,
174- evictionRunIntervalMillis : 10000 ,
175- softIdleTimeoutMillis : 30000 ,
176- idleTimeoutMillis : 30000 ,
177- acquireTimeoutMillis : 20000
167+ // TODO @clickhouse /client have internal pool, that does NOT guarantee same connection, and can break with temp tables. Disable it?
168+ this . pool = genericPool . createPool (
169+ {
170+ create : async ( ) =>
171+ createClient ( {
172+ ...this . config ,
173+
174+ username : getEnv ( "dbUser" , { dataSource } ) ,
175+ password : getEnv ( "dbPass" , { dataSource } ) ,
176+
177+ database : this . config . queryOptions . database ,
178+ session_id : uuidv4 ( ) ,
179+ clickhouse_settings : {
180+ //
181+ //
182+ // If ClickHouse user's permissions are restricted with "readonly = 1",
183+ // change settings queries are not allowed. Thus, "join_use_nulls" setting
184+ // can not be changed
185+ //
186+ //
187+ ...( this . readOnlyMode ? { } : { join_use_nulls : 1 } ) ,
188+ } ,
189+
190+ // TODO max_open_connections vs generic pool
191+ max_open_connections : 1 ,
192+ // TODO debugging keepalive in CI
193+ keep_alive : {
194+ enabled : false ,
195+ } ,
196+ } ) ,
197+ validate : async ( client ) => {
198+ const result = await client . ping ( ) ;
199+ if ( ! result . success ) {
200+ this . databasePoolError ( result . error ) ;
201+ }
202+ return result . success ;
203+ } ,
204+ destroy : ( client ) => client . close ( ) ,
205+ } ,
206+ {
207+ min : 0 ,
208+ max : config . maxPoolSize || getEnv ( "dbMaxPoolSize" , { dataSource } ) || 8 ,
209+ evictionRunIntervalMillis : 10000 ,
210+ softIdleTimeoutMillis : 30000 ,
211+ idleTimeoutMillis : 30000 ,
212+ acquireTimeoutMillis : 20000 ,
213+ }
214+ ) ;
215+
216+ // https://github.com/coopernurse/node-pool/blob/ee5db9ddb54ce3a142fde3500116b393d4f2f755/README.md#L220-L226
217+ this . pool . on ( 'factoryCreateError' , ( err ) => {
218+ this . databasePoolError ( err ) ;
219+ } ) ;
220+ this . pool . on ( 'factoryDestroyError' , ( err ) => {
221+ this . databasePoolError ( err ) ;
178222 } ) ;
179223 }
180224
181- protected withConnection ( fn : ( con : any , queryId : string ) => Promise < any > ) {
225+ protected withConnection ( fn : ( con : ClickHouseClient , queryId : string ) => Promise < any > ) {
226+ console . log ( "withConnection call" ) ;
182227 const self = this ;
183228 const connectionPromise = this . pool . acquire ( ) ;
184229 const queryId = uuidv4 ( ) ;
185230
186231 let cancelled = false ;
187232 const cancelObj : any = { } ;
188233
189- const promise : any = connectionPromise . then ( ( connection : any ) => {
234+ const promise : any = connectionPromise . then ( ( connection : ClickHouseClient ) => {
235+ console . log ( "withConnection got connection" ) ;
190236 cancelObj . cancel = async ( ) => {
191237 cancelled = true ;
238+
239+ // TODO new driver supports abort signal, but does not do autokill
240+ // TODO kill is sent thru same pool, which can be busy, use separate pool/connection.
241+
192242 await self . withConnection ( async conn => {
193- await conn . querying ( `KILL QUERY WHERE query_id = '${ queryId } '` ) ;
243+ await conn . command ( {
244+ query : `KILL QUERY WHERE query_id = '${ queryId } '` ,
245+ } ) ;
194246 } ) ;
195247 } ;
196248 return fn ( connection , queryId )
197- . then ( res => this . pool . release ( connection ) . then ( ( ) => {
249+ . finally ( ( ) => this . pool . release ( connection ) )
250+ . then ( ( res ) => {
198251 if ( cancelled ) {
199252 throw new Error ( 'Query cancelled' ) ;
200253 }
201254 return res ;
202- } ) )
203- . catch ( ( err ) => this . pool . release ( connection ) . then ( ( ) => {
204- if ( cancelled ) {
205- throw new Error ( 'Query cancelled' ) ;
206- }
207- throw err ;
208- } ) ) ;
255+ } ) ;
209256 } ) ;
210257 promise . cancel = ( ) => cancelObj . cancel ( ) ;
211258
@@ -222,32 +269,58 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
222269 true ;
223270 }
224271
225- public async query ( query : string , values : unknown [ ] ) {
272+ public async query < R = unknown > ( query : string , values : unknown [ ] ) : Promise < R [ ] > {
226273 return this . queryResponse ( query , values ) . then ( ( res : any ) => this . normaliseResponse ( res ) ) ;
227274 }
228275
229- protected queryResponse ( query : string , values : unknown [ ] ) {
276+ protected queryResponse ( query : string , values : unknown [ ] ) : Promise < ResponseJSON > {
277+ console . log ( "queryResponse call" , query ) ;
278+
230279 const formattedQuery = sqlstring . format ( query , values ) ;
231280
232- return this . withConnection ( ( connection , queryId ) => connection . querying ( formattedQuery , {
233- dataObjects : true ,
234- queryOptions : {
281+ return this . withConnection ( async ( connection , queryId ) => {
282+ // if (formattedQuery.startsWith("CREATE TABLE")) {
283+ //
284+ // }
285+
286+ const resultSet = await connection . query ( {
287+ query : formattedQuery ,
235288 query_id : queryId ,
236- //
237- //
238- // If ClickHouse user's permissions are restricted with "readonly = 1",
239- // change settings queries are not allowed. Thus, "join_use_nulls" setting
240- // can not be changed
241- //
242- //
243- ...( this . readOnlyMode ? { } : { join_use_nulls : 1 } ) ,
289+ clickhouse_settings : {
290+ //
291+ //
292+ // If ClickHouse user's permissions are restricted with "readonly = 1",
293+ // change settings queries are not allowed. Thus, "join_use_nulls" setting
294+ // can not be changed
295+ //
296+ //
297+ ...( this . readOnlyMode ? { } : { join_use_nulls : 1 } ) ,
298+ } ,
299+ } ) ;
300+ console . log ( "queryResponse resultSet" , query , resultSet . query_id , resultSet . response_headers ) ;
301+
302+ if ( resultSet . response_headers [ 'x-clickhouse-format' ] !== 'JSON' ) {
303+ const results = await resultSet . text ( ) ;
304+ console . log ( "queryResponse text results" , query , results ) ;
305+ console . log ( "queryResponse text results JSON" , JSON . stringify ( results ) ) ;
306+ return [ ] ;
307+ } else {
308+ const results = await resultSet . json ( ) ;
309+ console . log ( "queryResponse json results" , query , results ) ;
310+ console . log ( "queryResponse json results JSON" , JSON . stringify ( results ) ) ;
311+ return results ;
244312 }
245- } ) ) ;
313+
314+ // 'content-type': 'text/plain; charset=UTF-8',
315+ // vs
316+ // 'content-type': 'application/json; charset=UTF-8',
317+ // 'x-clickhouse-format': 'JSON',
318+ } ) ;
246319 }
247320
248- protected normaliseResponse ( res : any ) {
321+ protected normaliseResponse < R = unknown > ( res : ResponseJSON ) : Array < R > {
249322 if ( res . data ) {
250- const meta = res . meta . reduce (
323+ const meta = ( res . meta ?? [ ] ) . reduce (
251324 ( state : any , element : any ) => ( { [ element . name ] : element , ...state } ) ,
252325 { }
253326 ) ;
@@ -256,7 +329,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
256329 transformRow ( row , meta ) ;
257330 } ) ;
258331 }
259- return res . data ;
332+ return res . data as Array < R > ;
260333 }
261334
262335 public async release ( ) {
@@ -378,7 +451,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
378451
379452 return {
380453 rows : this . normaliseResponse ( response ) ,
381- types : response . meta . map ( ( field : any ) => ( {
454+ types : ( response . meta ?? [ ] ) . map ( ( field : any ) => ( {
382455 name : field . name ,
383456 type : this . toGenericType ( field . type ) ,
384457 } ) ) ,
@@ -415,7 +488,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
415488 await this . query ( `CREATE DATABASE IF NOT EXISTS ${ schemaName } ` , [ ] ) ;
416489 }
417490
418- public getTablesQuery ( schemaName : string ) {
491+ public getTablesQuery ( schemaName : string ) : Promise < TableQueryResult [ ] > {
419492 return this . query ( 'SELECT name as table_name FROM system.tables WHERE database = ?' , [ schemaName ] ) ;
420493 }
421494
0 commit comments