@@ -29,8 +29,6 @@ import * as process from 'node:process';
2929import { Readable } from 'node:stream' ;
3030import { ClickHouseClient , createClient } from '@clickhouse/client' ;
3131import type { ClickHouseSettings , ResponseJSON } from '@clickhouse/client' ;
32- import genericPool from 'generic-pool' ;
33- import type { Factory , Pool } from 'generic-pool' ;
3432import { v4 as uuidv4 } from 'uuid' ;
3533import sqlstring from 'sqlstring' ;
3634
@@ -119,9 +117,8 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
119117 return 5 ;
120118 }
121119
122- protected readonly connectionFactory : Factory < ClickHouseClient > ;
123-
124- protected readonly pool : Pool < ClickHouseClient > ;
120+ // ClickHouseClient has internal pool of several sockets, no need for generic-pool
121+ protected readonly client : ClickHouseClient ;
125122
126123 protected readonly readOnlyMode : boolean ;
127124
@@ -168,81 +165,54 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
168165 } ,
169166 } ;
170167
171- this . connectionFactory = {
172- create : async ( ) => createClient ( {
173- url : this . config . url ,
174- username : this . config . username ,
175- password : this . config . password ,
176- database : this . config . database ,
177- clickhouse_settings : this . config . clickhouseSettings ,
178- // TODO max_open_connections vs generic pool
179- max_open_connections : 1 ,
180- } ) ,
181- validate : async ( client ) => {
182- const result = await client . ping ( ) ;
183- if ( ! result . success ) {
184- this . databasePoolError ( result . error ) ;
185- }
186- return result . success ;
187- } ,
188- destroy : ( client ) => client . close ( ) ,
189- } ;
168+ const maxPoolSize = config . maxPoolSize || getEnv ( "dbMaxPoolSize" , { dataSource } ) || 8 ;
190169
191- // TODO @clickhouse /client have internal pool, that does NOT guarantee same connection, and can break with temp tables. Disable it?
192- this . pool = genericPool . createPool (
193- this . connectionFactory ,
194- {
195- min : 0 ,
196- max : config . maxPoolSize || getEnv ( "dbMaxPoolSize" , { dataSource } ) || 8 ,
197- evictionRunIntervalMillis : 10000 ,
198- softIdleTimeoutMillis : 30000 ,
199- idleTimeoutMillis : 30000 ,
200- acquireTimeoutMillis : 20000 ,
201- }
202- ) ;
203-
204- // https://github.com/coopernurse/node-pool/blob/ee5db9ddb54ce3a142fde3500116b393d4f2f755/README.md#L220-L226
205- this . pool . on ( 'factoryCreateError' , ( err ) => {
206- this . databasePoolError ( err ) ;
207- } ) ;
208- this . pool . on ( 'factoryDestroyError' , ( err ) => {
209- this . databasePoolError ( err ) ;
210- } ) ;
170+ this . client = this . createClient ( maxPoolSize ) ;
211171 }
212172
213- protected withConnection < T > ( fn : ( con : ClickHouseClient , queryId : string ) => Promise < T > ) : Promise < T > {
214- console . log ( "withConnection call" ) ;
173+ protected withCancel < T > ( fn : ( con : ClickHouseClient , queryId : string ) => Promise < T > ) : Promise < T > {
174+ console . log ( "withCancel call" ) ;
215175 const queryId = uuidv4 ( ) ;
216176
217177 const abortController = new AbortController ( ) ;
218178 const { signal } = abortController ;
219179
220180 const promise = ( async ( ) => {
221- const connection = await this . pool . acquire ( ) ;
222- try {
223- signal . throwIfAborted ( ) ;
224- // TODO pass signal deeper, new driver supports abort signal, but does not do autokill
225- const result = await fn ( connection , queryId ) ;
226- signal . throwIfAborted ( ) ;
227- return result ;
228- } finally {
229- await this . pool . release ( connection ) ;
230- }
181+ await this . client . ping ( ) ;
182+ signal . throwIfAborted ( ) ;
183+ // TODO pass signal deeper, new driver supports abort signal, but does not do autokill
184+ const result = await fn ( this . client , queryId ) ;
185+ signal . throwIfAborted ( ) ;
186+ return result ;
231187 } ) ( ) ;
232188 // TODO why do we need this?
233189 ( promise as any ) . cancel = async ( ) => {
234190 abortController . abort ( ) ;
235- // TODO kill is sent thru same pool, which can be busy, use separate pool/connection.
236- await this . withConnection ( async conn => {
237- await conn . command ( {
191+ // Use separate client for kill query, usual pool may be busy
192+ const killClient = this . createClient ( 1 ) ;
193+ try {
194+ await killClient . command ( {
238195 query : `KILL QUERY WHERE query_id = '${ queryId } '` ,
239196 } ) ;
240- } ) ;
197+ } finally {
198+ await killClient . close ( ) ;
199+ }
241200 } ;
242201
243202 return promise ;
244203 }
245204
205+ protected createClient ( maxPoolSize : number ) : ClickHouseClient {
206+ return createClient ( {
207+ url : this . config . url ,
208+ username : this . config . username ,
209+ password : this . config . password ,
210+ database : this . config . database ,
211+ clickhouse_settings : this . config . clickhouseSettings ,
212+ max_open_connections : maxPoolSize ,
213+ } ) ;
214+ }
215+
246216 public async testConnection ( ) {
247217 await this . query ( 'SELECT 1' , [ ] ) ;
248218 }
@@ -274,7 +244,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
274244
275245 console . log ( 'ClickHouse queryResponse prepared' , formattedQuery ) ;
276246
277- return this . withConnection ( async ( connection , queryId ) => {
247+ return this . withCancel ( async ( connection , queryId ) => {
278248 // if (formattedQuery.startsWith("CREATE TABLE")) {
279249 //
280250 // }
@@ -343,8 +313,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
343313 }
344314
345315 public async release ( ) {
346- await this . pool . drain ( ) ;
347- await this . pool . clear ( ) ;
316+ await this . client . close ( ) ;
348317 }
349318
350319 public informationSchemaQuery ( ) {
@@ -400,14 +369,15 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
400369 ) : Promise < StreamTableDataWithTypes > {
401370 console . log ( 'ClickHouse stream call' , query , values ) ;
402371
403- const conn = await this . connectionFactory . create ( ) ;
372+ // Use separate client for this long-living query
373+ const client = this . createClient ( 1 ) ;
404374
405375 try {
406376 const formattedQuery = sqlstring . format ( query , values ) ;
407377
408378 console . log ( 'ClickHouse stream prepared' , formattedQuery ) ;
409379
410- const resultSet = await conn . query ( {
380+ const resultSet = await client . query ( {
411381 query : formattedQuery ,
412382 query_id : uuidv4 ( ) ,
413383 format : 'JSONCompactEachRowWithNamesAndTypes' ,
@@ -462,11 +432,11 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
462432 } ;
463433 } ) ,
464434 release : async ( ) => {
465- await this . connectionFactory . destroy ( conn ) ;
435+ await client . close ( ) ;
466436 }
467437 } ;
468438 } catch ( e ) {
469- await this . connectionFactory . destroy ( conn ) ;
439+ await client . close ( ) ;
470440
471441 throw e ;
472442 }
@@ -682,7 +652,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
682652
683653 // This is not part of a driver interface, and marked public only for testing
684654 public async command ( query : string ) : Promise < void > {
685- await this . withConnection ( async ( connection ) => {
655+ await this . withCancel ( async ( connection ) => {
686656 await connection . command ( {
687657 query,
688658 } ) ;
@@ -691,7 +661,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
691661
692662 // This is not part of a driver interface, and marked public only for testing
693663 public async insert ( table : string , values : Array < Array < unknown > > ) : Promise < void > {
694- await this . withConnection ( async ( connection ) => {
664+ await this . withCancel ( async ( connection ) => {
695665 await connection . insert ( {
696666 table,
697667 values,
0 commit comments