Skip to content

Commit 08f62bc

Browse files
committed
Handle parallel pool connection requests in Thin mode for Issue #1591
1 parent bc06be2 commit 08f62bc

File tree

5 files changed

+269
-99
lines changed

5 files changed

+269
-99
lines changed

doc/src/release_notes.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ Common Changes
2323
Thin Mode Changes
2424
+++++++++++++++++
2525

26+
#) Fixed bug in handling unexpected pool growth which is crossing pool max limit
27+
due to improper handling of parallel connection request.
28+
`Issue #1591 <https://github.com/oracle/node-oracledb/issues/1591>`__.
29+
2630
#) Fixed Bug to return proper error when invalid service name is
2731
configured in listener.
2832

lib/thin/pool.js

Lines changed: 130 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ const thinUtil = require('./util.js');
3636
const {getConnectionInfo} = require('./sqlnet/networkSession.js');
3737
const crypto = require('crypto');
3838
const EventEmitter = require('events');
39+
const Timers = require('timers');
3940

4041
class ThinPoolImpl extends PoolImpl {
4142

@@ -69,6 +70,9 @@ class ThinPoolImpl extends PoolImpl {
6970
this._privateKey = params.privateKey;
7071
this._obfuscatedPrivateKey = [];
7172
this._schedulerJob = null;
73+
this._poolCloseWaiter = null;
74+
this._pendingRequests = [];
75+
7276
// password obfuscation
7377
if (this._password !== undefined) {
7478
const obj = protocolUtil.setObfuscatedValue(this._password);
@@ -121,8 +125,9 @@ class ThinPoolImpl extends PoolImpl {
121125
this._generateConnectionClass();
122126
}
123127

124-
// create minimum connections
125-
await this._growPool(this._poolMin);
128+
// create a background task. It will create minimum connections in the pool
129+
// and expand the pool as required.
130+
this.bgThreadFunc();
126131
}
127132

128133
//---------------------------------------------------------------------------
@@ -151,15 +156,21 @@ class ThinPoolImpl extends PoolImpl {
151156
async _getConnAttrs() {
152157
let accessToken;
153158
const clonedAttrs = Object.assign({}, this._userConfig);
159+
// deobfuscate password
154160
if (clonedAttrs.password === null) {
155161
clonedAttrs.password = protocolUtil.getDeobfuscatedValue(this._password,
156162
this._obfuscatedPassword);
157163
}
164+
165+
// deobfuscate wallet password
158166
if (clonedAttrs.walletPassword === null) {
159167
clonedAttrs.walletPassword =
160168
protocolUtil.getDeobfuscatedValue(this._walletPassword,
161169
this._obfuscatedWalletPassword);
162170
}
171+
172+
// deobfuscate token and private key
173+
// check for token expiry
163174
if (clonedAttrs.token === null) {
164175
clonedAttrs.token =
165176
protocolUtil.getDeobfuscatedValue(this._token, this._obfuscatedToken);
@@ -230,6 +241,15 @@ class ThinPoolImpl extends PoolImpl {
230241
//---------------------------------------------------------------------------
231242
async close() {
232243

244+
// wait till background task for pool expansion is finished; if it is not
245+
// currently running, wake it up!
246+
await new Promise((resolve) => {
247+
this._poolCloseWaiter = resolve;
248+
if (this.bgWaiter) {
249+
this.bgWaiter();
250+
}
251+
});
252+
233253
// clear scheduled job
234254
if (this._schedulerJob) {
235255
clearTimeout(this._schedulerJob);
@@ -319,25 +339,6 @@ class ThinPoolImpl extends PoolImpl {
319339
return this._stmtCacheSize;
320340
}
321341

322-
//---------------------------------------------------------------------------
323-
// _growPool()
324-
//
325-
// Grows the pool to include the specified number of connections.
326-
//---------------------------------------------------------------------------
327-
async _growPool(numConns) {
328-
const clonedAttrs = await this._getConnAttrs();
329-
while (numConns > 0) {
330-
const conn = new ThinConnectionImpl();
331-
conn._pool = this;
332-
await conn.connect(clonedAttrs);
333-
conn._newSession = true;
334-
conn._dropSess = false;
335-
conn._lastTimeUsed = Date.now();
336-
this._freeConnectionList.push(conn);
337-
numConns--;
338-
}
339-
}
340-
341342
//---------------------------------------------------------------------------
342343
// _setScheduler()
343344
//
@@ -375,6 +376,94 @@ class ThinPoolImpl extends PoolImpl {
375376
this._setScheduler();
376377
}
377378

379+
//---------------------------------------------------------------------------
380+
// _getNumConns()
381+
//
382+
// get number of connections need to be created
383+
//---------------------------------------------------------------------------
384+
_getNumConns() {
385+
const usedConns = this._freeConnectionList.length + this._usedConnectionList.size;
386+
// less connections in the pool than poolMin? restore to poolMin
387+
if (usedConns < this._poolMin) {
388+
return this._poolMin - usedConns;
389+
// connections need to be created? create up to poolIncrement without exceeding poolMax
390+
} else if (this._pendingRequests.length > 0) {
391+
return Math.min(this._poolIncrement, this._poolMax - usedConns);
392+
// no pending requests and we are already at poolMin so nothing to do!
393+
} else {
394+
return 0;
395+
}
396+
}
397+
398+
//---------------------------------------------------------------------------
399+
// bgThreadFunc()
400+
//
401+
// method which runs in a background thread and is used to create connections.
402+
// When first started, it creates poolMin connections. After that, it creates
403+
// poolIncrement connections up to the value of poolMax when needed.
404+
// The thread terminates automatically when the pool is closed.
405+
//---------------------------------------------------------------------------
406+
async bgThreadFunc() {
407+
408+
// continue until a close request is received
409+
while (!this._poolCloseWaiter) {
410+
// get count for connections to be created
411+
const numConns = this._getNumConns();
412+
413+
// connection creation is going on serially and not concurrently
414+
for (let i = 0; i < numConns; i++) {
415+
try {
416+
// get deobfuscated value
417+
const config = await this._getConnAttrs();
418+
const conn = new ThinConnectionImpl();
419+
conn._pool = this;
420+
await conn.connect(config);
421+
conn._newSession = true;
422+
conn._dropSess = false;
423+
conn._lastTimeUsed = Date.now();
424+
this._freeConnectionList.push(conn);
425+
} catch (err) {
426+
this._bgErr = err;
427+
}
428+
429+
if (this._poolIncrement > 1 && (this._poolMax - this._usedConnectionList.size
430+
- this._freeConnectionList.length) > 1) {
431+
this._setScheduler();
432+
}
433+
434+
// resolve pending request
435+
if (this._pendingRequests.length > 0) {
436+
const payload = this._pendingRequests.shift();
437+
payload.resolve();
438+
}
439+
440+
// give an opportunity for other "threads" to do their work.
441+
await new Promise((resolve) => Timers.setImmediate(resolve));
442+
443+
// break loop when pool is closing
444+
if (this._poolCloseWaiter) {
445+
break;
446+
}
447+
}
448+
449+
// when pool is closing, break from while loop
450+
if (this._poolCloseWaiter) {
451+
break;
452+
}
453+
454+
// if no pending requests, wait for pending requests to appear!
455+
if (this._pendingRequests.length == 0 || this._bgErr) {
456+
await new Promise((resolve) => {
457+
this.bgWaiter = resolve;
458+
});
459+
this.bgWaiter = null;
460+
}
461+
}
462+
463+
// notify the closer that the close can actually take place
464+
this._poolCloseWaiter();
465+
}
466+
378467
//---------------------------------------------------------------------------
379468
// acquire()
380469
//
@@ -422,17 +511,28 @@ class ThinPoolImpl extends PoolImpl {
422511

423512
// no free connections exist at this point; if less than poolMin
424513
// connections exist, grow the pool to poolMin again; otherwise, increase
425-
// the pool by poolIncrement up to poolMax
426-
if (this._usedConnectionList.size < this._poolMin) {
427-
await this._growPool(this._poolMin - this._usedConnectionList.size);
428-
} else {
429-
const sizeAvailable = this._poolMax - this._usedConnectionList.size;
430-
await this._growPool(Math.min(this._poolIncrement, sizeAvailable));
431-
if (this._poolIncrement > 1 && sizeAvailable > 1) {
432-
this._setScheduler();
514+
// the pool by poolIncrement up to poolMax. We are deferring this
515+
// to the background thread function!
516+
await new Promise((resolve) => {
517+
this._pendingRequests.push({resolve: resolve});
518+
if (this.bgWaiter) {
519+
// this wakes up the function to do some more work
520+
this.bgWaiter();
433521
}
434-
}
522+
});
435523

524+
if (this._bgErr) {
525+
const err = this._bgErr;
526+
this._bgErr = null;
527+
528+
// if an error has occurred in the background thread we clear it and then,
529+
// if there are more pending requests we request the background thread
530+
// function to try again.
531+
if (this._pendingRequests.length > 0 && this.bgWaiter) {
532+
this.bgWaiter();
533+
}
534+
throw err;
535+
}
436536
// return a connection from the ones that were just built
437537
const conn = this._freeConnectionList.pop();
438538
this._usedConnectionList.add(conn);

test/poolExpansion.js

Lines changed: 61 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -35,71 +35,107 @@
3535
const oracledb = require('oracledb');
3636
const assert = require('assert');
3737
const dbConfig = require('./dbconfig.js');
38+
const testUtil = require('../test/testsUtil.js');
3839

3940
describe('278. Pool expansion', function() {
4041

42+
function sleep(ms) {
43+
return new Promise((resolve) => {
44+
setTimeout(resolve, ms);
45+
});
46+
}
47+
4148
it('278.1 pool expansion when new connection created and within pool max limit', async function() {
42-
if (dbConfig.test.drcp) this.skip();
49+
if (!oracledb.thin || dbConfig.test.drcp) this.skip();
50+
// thick driver is creating total connections exceeding poolMax
4351

4452
const pool = await oracledb.createPool({
4553
...dbConfig,
4654
poolMin : 0,
47-
poolMax : 10,
55+
poolMax : 2,
4856
poolIncrement : 4,
4957
homogeneous : true
5058
});
5159
const conn = await pool.getConnection();
52-
assert.deepStrictEqual(pool.connectionsOpen, 4);
53-
assert.deepStrictEqual(pool.connectionsInUse, 1);
60+
assert.strictEqual(pool.connectionsInUse, 1);
61+
62+
await testUtil.checkAndWait(100, 50, () => pool.connectionsOpen === 2);
63+
// total connections created won't exceed pool max limit.
64+
assert.strictEqual(pool.connectionsOpen, 2);
65+
assert.strictEqual(pool.connectionsInUse, 1);
5466
await conn.close();
5567
await pool.close(0);
5668
});
5769

58-
(oracledb.thin ? it : it.skip)('278.2 pool expansion when new connection created and exceeding pool max limit', async function() {
70+
it('278.2 pool expansion not done on creating minimum connection', async function() {
71+
if (dbConfig.test.drcp) this.skip();
72+
5973
const pool = await oracledb.createPool({
6074
...dbConfig,
61-
poolMin : 0,
62-
poolMax : 4,
63-
poolIncrement : 7,
75+
poolMin : 2,
76+
poolMax : 10,
77+
poolIncrement : 4,
6478
homogeneous : true
6579
});
66-
const conn = await pool.getConnection();
67-
assert.deepStrictEqual(pool.connectionsOpen, 4);
68-
assert.deepStrictEqual(pool.connectionsInUse, 1);
69-
await conn.close();
80+
81+
await testUtil.checkAndWait(100, 50, () => pool.connectionsOpen === 2);
82+
assert.strictEqual(pool.connectionsOpen, 2);
83+
assert.strictEqual(pool.connectionsInUse, 0);
84+
await testUtil.checkAndWait(100, 50, () => pool.connectionsOpen === 2);
85+
await sleep(1000);
86+
assert.strictEqual(pool.connectionsOpen, 2);
87+
assert.strictEqual(pool.connectionsInUse, 0);
7088
await pool.close(0);
7189
});
7290

73-
it('278.3 pool expansion not done on creating minimum connection', async function() {
91+
it('278.3 no pool expansion while acquiring connection already present in pool', async function() {
7492
if (dbConfig.test.drcp) this.skip();
7593

7694
const pool = await oracledb.createPool({
7795
...dbConfig,
78-
poolMin : 5,
96+
poolMin : 2,
7997
poolMax : 10,
80-
poolIncrement : 2,
98+
poolIncrement : 3,
8199
homogeneous : true
82100
});
83-
assert.deepStrictEqual(pool.connectionsOpen, 5);
84-
assert.deepStrictEqual(pool.connectionsInUse, 0);
101+
const conn = await pool.getConnection();
102+
assert.strictEqual(pool.connectionsInUse, 1);
103+
await testUtil.checkAndWait(100, 50, () => pool.connectionsOpen === 2);
104+
await sleep(1000);
105+
assert.strictEqual(pool.connectionsOpen, 2);
106+
assert.strictEqual(pool.connectionsInUse, 1);
107+
await conn.close();
85108
await pool.close(0);
86109
});
87110

88-
it('278.4 no pool expansion while acquiring connection already present in pool', async function() {
111+
it('278.4 pool connection count not crossing pool max limit on parallel execution', async function() {
89112
if (dbConfig.test.drcp) this.skip();
90113

91114
const pool = await oracledb.createPool({
92115
...dbConfig,
93-
poolMin : 3,
116+
poolMin : 2,
94117
poolMax : 10,
95-
poolIncrement : 3,
118+
poolIncrement : 2,
96119
homogeneous : true
97120
});
98-
const conn = await pool.getConnection();
99-
assert.deepStrictEqual(pool.connectionsOpen, 3);
100-
assert.deepStrictEqual(pool.connectionsInUse, 1);
101-
await conn.close();
121+
let conn1, conn2, conn3;
122+
const routine1 = async function() {
123+
conn1 = await pool.getConnection();
124+
};
125+
126+
const routine3 = async function() {
127+
conn3 = await pool.getConnection();
128+
};
129+
130+
const routine2 = async function() {
131+
conn2 = await pool.getConnection();
132+
};
133+
await Promise.all([routine1(), routine2(), routine3()]);
134+
assert.strictEqual(pool.connectionsInUse, 3);
135+
await testUtil.checkAndWait(100, 50, () => pool.connectionsOpen === 4);
136+
await conn1.close();
137+
await conn3.close();
138+
await conn2.close();
102139
await pool.close(0);
103140
});
104-
105141
});

0 commit comments

Comments
 (0)