@@ -21,14 +21,14 @@ import {
2121 TableStructure ,
2222 UnloadOptions ,
2323} from '@cubejs-backend/base-driver' ;
24+
25+ import { ClickHouseClient , createClient } from '@clickhouse/client' ;
2426import genericPool , { Pool } from 'generic-pool' ;
2527import { v4 as uuidv4 } from 'uuid' ;
2628import sqlstring from 'sqlstring' ;
2729
2830import { HydrationStream , transformRow } from './HydrationStream' ;
2931
30- const ClickHouse = require ( '@cubejs-backend/apla-clickhouse' ) ;
31-
3232const ClickhouseTypeToGeneric : Record < string , string > = {
3333 enum : 'text' ,
3434 string : 'text' ,
@@ -86,7 +86,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
8686 return 5 ;
8787 }
8888
89- protected readonly pool : Pool < any > ;
89+ protected readonly pool : Pool < ClickHouseClient > ;
9090
9191 protected readonly readOnlyMode : boolean ;
9292
@@ -122,19 +122,33 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
122122 config . dataSource ||
123123 assertDataSource ( 'default' ) ;
124124
125+ // TODO recheck everything in config for new driver
126+ const host = getEnv ( 'dbHost' , { dataSource } ) ;
127+ const port = getEnv ( 'dbPort' , { dataSource } ) ;
128+ const protocol = getEnv ( 'dbSsl' , { dataSource } ) ? 'https:' : 'http:' ;
129+ // TODO proper value here, with proper back compat, and treating protocol
130+ const url = `${ protocol } //${ host } :${ port } ` ;
131+ // TODO drop this
132+ console . log ( 'ClickHouseDriver will use url' , url ) ;
133+
134+ const username = getEnv ( 'dbUser' , { dataSource } ) ;
135+ const password = getEnv ( 'dbPass' , { dataSource } ) ;
125136 this . config = {
126- host : getEnv ( 'dbHost' , { dataSource } ) ,
127- port : getEnv ( 'dbPort' , { dataSource } ) ,
128- auth :
129- getEnv ( 'dbUser' , { dataSource } ) ||
130- getEnv ( 'dbPass' , { dataSource } )
131- ? `${
132- getEnv ( 'dbUser' , { dataSource } )
133- } :${
134- getEnv ( 'dbPass' , { dataSource } )
135- } `
136- : '' ,
137- protocol : getEnv ( 'dbSsl' , { dataSource } ) ? 'https:' : 'http:' ,
137+ // host: getEnv('dbHost', { dataSource }),
138+ // port: getEnv('dbPort', { dataSource }),
139+ url,
140+ // auth:
141+ // getEnv('dbUser', { dataSource }) ||
142+ // getEnv('dbPass', { dataSource })
143+ // ? `${
144+ // getEnv('dbUser', { dataSource })
145+ // }:${
146+ // getEnv('dbPass', { dataSource })
147+ // }`
148+ // : '',
149+ username,
150+ password,
151+ // protocol: getEnv('dbSsl', { dataSource }) ? 'https:' : 'http:',
138152 queryOptions : {
139153 database :
140154 getEnv ( 'dbName' , { dataSource } ) ||
@@ -148,10 +162,17 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
148162 this . readOnlyMode =
149163 getEnv ( 'clickhouseReadOnly' , { dataSource } ) === 'true' ;
150164
165+ // TODO @clickhouse /client have internal pool, that does NOT guarantee same connection, and can break with temp tables. Disable it?
151166 this . pool = genericPool . createPool ( {
152- create : async ( ) => new ClickHouse ( {
167+ create : async ( ) => createClient ( {
153168 ...this . config ,
154- queryOptions : {
169+
170+ username : getEnv ( 'dbUser' , { dataSource } ) ,
171+ password : getEnv ( 'dbPass' , { dataSource } ) ,
172+
173+ database : this . config . queryOptions . database ,
174+ session_id : uuidv4 ( ) ,
175+ clickhouse_settings : {
155176 //
156177 //
157178 // If ClickHouse user's permissions are restricted with "readonly = 1",
@@ -160,9 +181,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
160181 //
161182 //
162183 ...( this . readOnlyMode ? { } : { join_use_nulls : 1 } ) ,
163- session_id : uuidv4 ( ) ,
164- ...this . config . queryOptions ,
165- }
184+ } ,
166185 } ) ,
167186 destroy : ( ) => Promise . resolve ( )
168187 } , {
@@ -176,36 +195,43 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
176195 idleTimeoutMillis : 30000 ,
177196 acquireTimeoutMillis : 20000
178197 } ) ;
198+
199+ // https://github.com/coopernurse/node-pool/blob/ee5db9ddb54ce3a142fde3500116b393d4f2f755/README.md#L220-L226
200+ this . pool . on ( 'factoryCreateError' , ( err ) => {
201+ this . databasePoolError ( err ) ;
202+ } ) ;
203+ this . pool . on ( 'factoryDestroyError' , ( err ) => {
204+ this . databasePoolError ( err ) ;
205+ } ) ;
179206 }
180207
181- protected withConnection ( fn : ( con : any , queryId : string ) => Promise < any > ) {
208+ protected withConnection ( fn : ( con : ClickHouseClient , queryId : string ) => Promise < any > ) {
209+ console . log ( "withConnection call" ) ;
182210 const self = this ;
183211 const connectionPromise = this . pool . acquire ( ) ;
184212 const queryId = uuidv4 ( ) ;
185213
186214 let cancelled = false ;
187215 const cancelObj : any = { } ;
188216
189- const promise : any = connectionPromise . then ( ( connection : any ) => {
217+ const promise : any = connectionPromise . then ( ( connection : ClickHouseClient ) => {
218+ console . log ( "withConnection got connection" ) ;
190219 cancelObj . cancel = async ( ) => {
191220 cancelled = true ;
192221 await self . withConnection ( async conn => {
193- await conn . querying ( `KILL QUERY WHERE query_id = '${ queryId } '` ) ;
222+ await conn . command ( {
223+ query : `KILL QUERY WHERE query_id = '${ queryId } '` ,
224+ } ) ;
194225 } ) ;
195226 } ;
196227 return fn ( connection , queryId )
197- . then ( res => this . pool . release ( connection ) . then ( ( ) => {
228+ . finally ( ( ) => this . pool . release ( connection ) )
229+ . then ( ( res ) => {
198230 if ( cancelled ) {
199231 throw new Error ( 'Query cancelled' ) ;
200232 }
201233 return res ;
202- } ) )
203- . catch ( ( err ) => this . pool . release ( connection ) . then ( ( ) => {
204- if ( cancelled ) {
205- throw new Error ( 'Query cancelled' ) ;
206- }
207- throw err ;
208- } ) ) ;
234+ } ) ;
209235 } ) ;
210236 promise . cancel = ( ) => cancelObj . cancel ( ) ;
211237
@@ -227,22 +253,48 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
227253 }
228254
229255 protected queryResponse ( query : string , values : unknown [ ] ) {
256+ console . log ( "queryResponse call" , query ) ;
257+
230258 const formattedQuery = sqlstring . format ( query , values ) ;
231259
232- return this . withConnection ( ( connection , queryId ) => connection . querying ( formattedQuery , {
233- dataObjects : true ,
234- queryOptions : {
260+ return this . withConnection ( async ( connection , queryId ) => {
261+ // if (formattedQuery.startsWith("CREATE TABLE")) {
262+ //
263+ // }
264+
265+ const resultSet = await connection . query ( {
266+ query : formattedQuery ,
235267 query_id : queryId ,
236- //
237- //
238- // If ClickHouse user's permissions are restricted with "readonly = 1",
239- // change settings queries are not allowed. Thus, "join_use_nulls" setting
240- // can not be changed
241- //
242- //
243- ...( this . readOnlyMode ? { } : { join_use_nulls : 1 } ) ,
268+ clickhouse_settings : {
269+ //
270+ //
271+ // If ClickHouse user's permissions are restricted with "readonly = 1",
272+ // change settings queries are not allowed. Thus, "join_use_nulls" setting
273+ // can not be changed
274+ //
275+ //
276+ ...( this . readOnlyMode ? { } : { join_use_nulls : 1 } ) ,
277+ } ,
278+ } ) ;
279+ console . log ( "queryResponse resultSet" , query , resultSet . query_id , resultSet . response_headers ) ;
280+
281+ if ( resultSet . response_headers [ 'x-clickhouse-format' ] !== 'JSON' ) {
282+ const results = await resultSet . text ( ) ;
283+ console . log ( "queryResponse text results" , query , results ) ;
284+ console . log ( "queryResponse text results JSON" , JSON . stringify ( results ) ) ;
285+ return [ ] ;
286+ } else {
287+ const results = await resultSet . json ( ) ;
288+ console . log ( "queryResponse json results" , query , results ) ;
289+ console . log ( "queryResponse json results JSON" , JSON . stringify ( results ) ) ;
290+ return results ;
244291 }
245- } ) ) ;
292+
293+ // 'content-type': 'text/plain; charset=UTF-8',
294+ // vs
295+ // 'content-type': 'application/json; charset=UTF-8',
296+ // 'x-clickhouse-format': 'JSON',
297+ } ) ;
246298 }
247299
248300 protected normaliseResponse ( res : any ) {
0 commit comments