Skip to content

Commit dc35f01

Browse files
committed
added pooling
1 parent 7bbf59c commit dc35f01

File tree

5 files changed

+159
-41
lines changed

5 files changed

+159
-41
lines changed

src/IRISDriver.ts

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import { TableForeignKey } from "typeorm/schema-builder/table/TableForeignKey"
2323
import { InstanceChecker } from "typeorm/util/InstanceChecker"
2424
import { UpsertType } from "typeorm/driver/types/UpsertType"
2525
import { TypeORMError } from "typeorm/error/TypeORMError"
26-
import { IRISNative } from "./IRISNative"
26+
import { IRISConnectionPool } from "./IRISNative"
2727
import { ConnectionIsNotSetError } from "typeorm"
2828

2929
/**
@@ -41,7 +41,7 @@ export class IRISDriver implements Driver {
4141

4242
iris: any
4343

44-
master: any
44+
master: IRISConnectionPool
4545

4646
// -------------------------------------------------------------------------
4747
// Public Implemented Properties
@@ -302,8 +302,7 @@ export class IRISDriver implements Driver {
302302
options: IRISConnectionOptions,
303303
credentials: IRISConnectionCredentialsOptions,
304304
): Promise<any> {
305-
const { logger } = this.connection
306-
logger.log("info", "Creating connection...")
305+
// const { logger } = this.connection
307306
credentials = Object.assign({}, credentials)
308307
const connectionOptions = Object.assign(
309308
{},
@@ -318,7 +317,10 @@ export class IRISDriver implements Driver {
318317
},
319318
options.extra || {},
320319
)
321-
const conn = IRISNative.createConnection(connectionOptions)
320+
const conn = new IRISConnectionPool(
321+
connectionOptions,
322+
options.poolSize || 5,
323+
)
322324
return Promise.resolve(conn)
323325
}
324326

@@ -337,8 +339,8 @@ export class IRISDriver implements Driver {
337339
throw new ConnectionIsNotSetError("iris")
338340
}
339341

340-
this.master.close()
341-
this.master = undefined
342+
// this.master.close()
343+
// this.master = undefined
342344
}
343345

344346
/**
@@ -470,13 +472,6 @@ export class IRISDriver implements Driver {
470472
* Prepares given value to a value to be persisted, based on its column type and metadata.
471473
*/
472474
preparePersistentValue(value: any, columnMetadata: ColumnMetadata): any {
473-
// console.log(
474-
// "IRISDriver.preparePersistentValue:",[
475-
// columnMetadata.databaseName,
476-
// columnMetadata.type,
477-
// typeof value,
478-
// value,
479-
// ])
480475
if (columnMetadata.transformer)
481476
value = ApplyValueTransformers.transformTo(
482477
columnMetadata.transformer,
@@ -513,12 +508,6 @@ export class IRISDriver implements Driver {
513508
* Prepares given value to a value to be persisted, based on its column type or metadata.
514509
*/
515510
prepareHydratedValue(value: any, columnMetadata: ColumnMetadata): any {
516-
// console.log("IRISDriver.prepareHydratedValue:", [
517-
// columnMetadata.databaseName,
518-
// columnMetadata.type,
519-
// typeof value,
520-
// value,
521-
// ])
522511
if (value === null || value === undefined)
523512
return columnMetadata.transformer
524513
? ApplyValueTransformers.transformFrom(
@@ -696,12 +685,11 @@ export class IRISDriver implements Driver {
696685
* Used for replication.
697686
* If replication is not setup then returns default connection's database connection.
698687
*/
699-
obtainMasterConnection(): Promise<any> {
688+
async obtainMasterConnection(): Promise<any> {
700689
if (!this.master) {
701690
throw new TypeORMError("Driver not Connected")
702691
}
703-
// return this.createConnection(this.options, this.options)
704-
return Promise.resolve(this.master)
692+
return this.master.getConnection()
705693
}
706694

707695
/**

src/IRISNative.ts

Lines changed: 117 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ const IRISNative = require("@intersystems/intersystems-iris-native")
22
import { Connection } from "@intersystems/intersystems-iris-native"
33

44
export interface IRISConnection extends Connection {
5-
query: (queryString: string, argSets?: any[]) => any
5+
connectionId: string
6+
query: (queryString: string, argSets?: any[]) => Promise<any>
7+
release: () => Promise<void>
68
}
79

810
type IRISConnectionOptions = {
@@ -228,7 +230,6 @@ class ResultSet {
228230
})
229231
this.rows.push(row)
230232
}
231-
// console.log("ResultSet rows fetched:", this.rows.length);
232233
}
233234
}
234235

@@ -271,7 +272,7 @@ function normalizeArgs(args: any[]) {
271272

272273
function createQuery(connection: any) {
273274
const iris = connection.createIris()
274-
return function (queryString: string, argSets: any[]): any {
275+
return async function (queryString: string, argSets: any[]): Promise<any> {
275276
argSets =
276277
argSets && argSets.length && Array.isArray(argSets[0])
277278
? argSets
@@ -304,10 +305,10 @@ function createQuery(connection: any) {
304305
}
305306
resultSet = st.execute(...args)
306307
if (resultSet.Message) {
307-
return resultSet
308+
return Promise.resolve(resultSet)
308309
}
309310
}
310-
return resultSet
311+
return Promise.resolve(resultSet)
311312
}
312313
}
313314

@@ -322,4 +323,114 @@ IRISNative.createConnection = (
322323
}
323324
IRISNative.connect = IRISNative.createConnection
324325

325-
export { IRISNative }
326+
class IRISConnectionPool {
327+
private options: IRISConnectionOptions
328+
private maxConnections: number
329+
private pool: IRISConnection[] = []
330+
private activeConnections: Set<IRISConnection> = new Set()
331+
private connectionCount: number = 0
332+
333+
constructor(options: IRISConnectionOptions, maxConnections: number = 5) {
334+
this.options = options
335+
this.maxConnections = maxConnections
336+
}
337+
338+
async getConnection(): Promise<IRISConnection> {
339+
// Try to reuse an existing connection from pool
340+
if (this.pool.length > 0) {
341+
const connection = this.pool.pop()!
342+
this.activeConnections.add(connection)
343+
return connection
344+
}
345+
346+
// Create new connection if under limit
347+
if (this.connectionCount < this.maxConnections) {
348+
try {
349+
const connection = await new Promise<IRISConnection>(
350+
(resolve, reject) => {
351+
try {
352+
const conn = IRISNative.createConnection(
353+
this.options,
354+
)
355+
this.connectionCount++
356+
conn.connectionId = `conn-${this.connectionCount}`
357+
conn.release = () => {
358+
this.releaseConnection(conn)
359+
return Promise.resolve()
360+
}
361+
resolve(conn)
362+
} catch (error) {
363+
reject(error)
364+
}
365+
},
366+
)
367+
this.activeConnections.add(connection)
368+
return connection
369+
} catch (error) {
370+
console.error("Failed to create connection:", error)
371+
throw error
372+
}
373+
}
374+
375+
// Wait for a connection to become available
376+
// throw new Error("Max connections reached, please try again later.")
377+
return new Promise<IRISConnection>((resolve) => {
378+
const checkPool = (): void => {
379+
if (this.pool.length > 0) {
380+
const connection = this.pool.pop()!
381+
this.activeConnections.add(connection)
382+
resolve(connection)
383+
} else {
384+
setTimeout(checkPool, 100)
385+
}
386+
}
387+
checkPool()
388+
})
389+
}
390+
391+
releaseConnection(connection: IRISConnection): void {
392+
if (this.activeConnections.has(connection)) {
393+
this.activeConnections.delete(connection)
394+
this.pool.push(connection)
395+
}
396+
}
397+
398+
async closeAll(): Promise<void> {
399+
// Close active connections
400+
for (const connection of this.activeConnections) {
401+
try {
402+
connection.close()
403+
} catch (error) {
404+
console.error("Error closing active connection:", error)
405+
}
406+
}
407+
this.activeConnections.clear()
408+
409+
// Close pooled connections
410+
for (const connection of this.pool) {
411+
try {
412+
connection.close()
413+
} catch (error) {
414+
console.error("Error closing pooled connection:", error)
415+
}
416+
}
417+
this.pool.length = 0
418+
this.connectionCount = 0
419+
420+
// Force garbage collection if available
421+
if ((global as any).gc) {
422+
;(global as any).gc()
423+
console.log("Forced garbage collection")
424+
}
425+
}
426+
427+
getPoolStats(): { active: number; pooled: number; total: number } {
428+
return {
429+
active: this.activeConnections.size,
430+
pooled: this.pool.length,
431+
total: this.connectionCount,
432+
}
433+
}
434+
}
435+
436+
export { IRISNative, IRISConnectionPool }

src/IRISQueryRunner.ts

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,16 @@ export class IRISQueryRunner extends BaseQueryRunner implements QueryRunner {
4040
*/
4141
driver: IRISDriver
4242

43+
protected releaseCallback?: () => void
44+
4345
// -------------------------------------------------------------------------
4446
// Protected Properties
4547
// -------------------------------------------------------------------------
4648

4749
/**
4850
* Promise used to obtain a database connection from a pool for a first time.
4951
*/
50-
// protected databaseConnectionPromise: Promise<any>
52+
protected databaseConnectionPromise: Promise<any>
5153

5254
// -------------------------------------------------------------------------
5355
// Constructor
@@ -70,15 +72,31 @@ export class IRISQueryRunner extends BaseQueryRunner implements QueryRunner {
7072
* Returns obtained database connection.
7173
*/
7274
async connect(): Promise<any> {
73-
return this.driver.obtainMasterConnection()
75+
if (this.databaseConnection) {
76+
return Promise.resolve(this.databaseConnection)
77+
}
78+
79+
if (this.databaseConnectionPromise)
80+
return this.databaseConnectionPromise
81+
82+
this.databaseConnectionPromise = this.driver
83+
.obtainMasterConnection()
84+
.then((connection) => {
85+
this.databaseConnection = connection
86+
return this.databaseConnection
87+
})
88+
89+
return this.databaseConnectionPromise
7490
}
7591

7692
/**
7793
* Releases used database connection.
7894
* You cannot use query runner methods once its released.
7995
*/
8096
async release(): Promise<void> {
81-
// releasing connection are not supported by mongodb driver, so simply don't do anything here
97+
this.isReleased = true
98+
if (this.databaseConnection) this.databaseConnection.release()
99+
return Promise.resolve()
82100
}
83101

84102
/**
@@ -160,7 +178,6 @@ export class IRISQueryRunner extends BaseQueryRunner implements QueryRunner {
160178
useStructuredResult = false,
161179
): Promise<any> {
162180
if (this.isReleased) throw new QueryRunnerAlreadyReleasedError()
163-
// const isInsertQuery = query.startsWith("INSERT INTO ")
164181

165182
const databaseConnection = await this.connect()
166183

@@ -208,6 +225,7 @@ export class IRISQueryRunner extends BaseQueryRunner implements QueryRunner {
208225
return Promise.reject(err)
209226
} finally {
210227
await broadcasterResult.wait()
228+
databaseConnection.release()
211229
}
212230
}
213231

test/iris.spec.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@ describe("IRISNative test", () => {
1515
const res = await connection.query(
1616
"SELECT 1 AS test1, '2' AS test2",
1717
[],
18-
).rows
19-
res.should.be.an("array")
20-
res.should.have.lengthOf(1)
21-
res[0].should.be.an("object")
22-
res[0].should.have.property("test1")
23-
res[0].should.have.property("test2")
24-
res[0].should.have.property("test1", 1)
25-
res[0].should.have.property("test2", "2")
18+
)
19+
res.rows.should.be.an("array")
20+
res.rows.should.have.lengthOf(1)
21+
res.rows[0].should.be.an("object")
22+
res.rows[0].should.have.property("test1")
23+
res.rows[0].should.have.property("test2")
24+
res.rows[0].should.have.property("test1", 1)
25+
res.rows[0].should.have.property("test2", "2")
2626
})
2727
})

test/utils/test-utils.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ export function getTypeOrmConfig(): IRISConnectionOptions[] {
2020
namespace: globalOptions.ns,
2121
username: globalOptions.user,
2222
password: globalOptions.pwd,
23+
poolSize: 5,
2324
},
2425
]
2526
}

0 commit comments

Comments
 (0)