Skip to content

Commit e1e4377

Browse files
authored
refactor(NODE-2993): align internal cmap implementation with spec (#3248)
1 parent 48e0e6e commit e1e4377

File tree

7 files changed

+378
-123
lines changed

7 files changed

+378
-123
lines changed

src/cmap/connection_pool.ts

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ const kLogger = Symbol('logger');
3939
/** @internal */
4040
const kConnections = Symbol('connections');
4141
/** @internal */
42-
const kPermits = Symbol('permits');
42+
const kPending = Symbol('pending');
43+
/** @internal */
44+
const kCheckedOut = Symbol('checkedOut');
4345
/** @internal */
4446
const kMinPoolSizeTimer = Symbol('minPoolSizeTimer');
4547
/** @internal */
@@ -57,8 +59,6 @@ const kCancelled = Symbol('cancelled');
5759
/** @internal */
5860
const kMetrics = Symbol('metrics');
5961
/** @internal */
60-
const kCheckedOut = Symbol('checkedOut');
61-
/** @internal */
6262
const kProcessingWaitQueue = Symbol('processingWaitQueue');
6363

6464
/** @public */
@@ -112,11 +112,10 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
112112
[kLogger]: Logger;
113113
/** @internal */
114114
[kConnections]: Denque<Connection>;
115-
/**
116-
* An integer expressing how many total connections are permitted
117-
* @internal
118-
*/
119-
[kPermits]: number;
115+
/** @internal */
116+
[kPending]: number;
117+
/** @internal */
118+
[kCheckedOut]: number;
120119
/** @internal */
121120
[kMinPoolSizeTimer]?: NodeJS.Timeout;
122121
/**
@@ -137,8 +136,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
137136
/** @internal */
138137
[kMetrics]: ConnectionPoolMetrics;
139138
/** @internal */
140-
[kCheckedOut]: number;
141-
/** @internal */
142139
[kProcessingWaitQueue]: boolean;
143140

144141
/**
@@ -216,7 +213,8 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
216213

217214
this[kLogger] = new Logger('ConnectionPool');
218215
this[kConnections] = new Denque();
219-
this[kPermits] = this.options.maxPoolSize;
216+
this[kPending] = 0;
217+
this[kCheckedOut] = 0;
220218
this[kMinPoolSizeTimer] = undefined;
221219
this[kGeneration] = 0;
222220
this[kServiceGenerations] = new Map();
@@ -225,7 +223,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
225223
this[kCancellationToken].setMaxListeners(Infinity);
226224
this[kWaitQueue] = new Denque();
227225
this[kMetrics] = new ConnectionPoolMetrics();
228-
this[kCheckedOut] = 0;
229226
this[kProcessingWaitQueue] = false;
230227

231228
process.nextTick(() => {
@@ -244,16 +241,26 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
244241
return this[kGeneration];
245242
}
246243

247-
/** An integer expressing how many total connections (active + in use) the pool currently has */
244+
/** An integer expressing how many total connections (available + pending + in use) the pool currently has */
248245
get totalConnectionCount(): number {
249-
return this[kConnections].length + (this.options.maxPoolSize - this[kPermits]);
246+
return (
247+
this.availableConnectionCount + this.pendingConnectionCount + this.currentCheckedOutCount
248+
);
250249
}
251250

252251
/** An integer expressing how many connections are currently available in the pool. */
253252
get availableConnectionCount(): number {
254253
return this[kConnections].length;
255254
}
256255

256+
get pendingConnectionCount(): number {
257+
return this[kPending];
258+
}
259+
260+
get currentCheckedOutCount(): number {
261+
return this[kCheckedOut];
262+
}
263+
257264
get waitQueueSize(): number {
258265
return this[kWaitQueue].length;
259266
}
@@ -266,10 +273,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
266273
return this[kServiceGenerations];
267274
}
268275

269-
get currentCheckedOutCount(): number {
270-
return this[kCheckedOut];
271-
}
272-
273276
/**
274277
* Get the metrics information for the pool when a wait queue timeout occurs.
275278
*/
@@ -319,7 +322,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
319322
}, waitQueueTimeoutMS);
320323
}
321324

322-
this[kCheckedOut] = this[kCheckedOut] + 1;
323325
this[kWaitQueue].push(waitQueueMember);
324326
process.nextTick(processWaitQueue, this);
325327
}
@@ -339,7 +341,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
339341
this[kConnections].unshift(connection);
340342
}
341343

342-
this[kCheckedOut] = this[kCheckedOut] - 1;
344+
this[kCheckedOut]--;
343345
this.emit(ConnectionPool.CONNECTION_CHECKED_IN, new ConnectionCheckedInEvent(this, connection));
344346

345347
if (willDestroy) {
@@ -527,10 +529,10 @@ function createConnection(pool: ConnectionPool, callback?: Callback<Connection>)
527529
cancellationToken: pool[kCancellationToken]
528530
};
529531

530-
pool[kPermits]--;
532+
pool[kPending]++;
531533
connect(connectOptions, (err, connection) => {
532534
if (err || !connection) {
533-
pool[kPermits]++;
535+
pool[kPending]--;
534536
pool[kLogger].debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
535537
if (typeof callback === 'function') {
536538
callback(err);
@@ -541,6 +543,7 @@ function createConnection(pool: ConnectionPool, callback?: Callback<Connection>)
541543

542544
// The pool might have closed since we started trying to create a connection
543545
if (pool.closed) {
546+
pool[kPending]--;
544547
connection.destroy({ force: true });
545548
return;
546549
}
@@ -572,24 +575,22 @@ function createConnection(pool: ConnectionPool, callback?: Callback<Connection>)
572575
connection.markAvailable();
573576
pool.emit(ConnectionPool.CONNECTION_READY, new ConnectionReadyEvent(pool, connection));
574577

575-
// if a callback has been provided, check out the connection immediately
578+
// if a callback has been provided, hand off the connection immediately
576579
if (typeof callback === 'function') {
577580
callback(undefined, connection);
578581
return;
579582
}
580583

581584
// otherwise add it to the pool for later acquisition, and try to process the wait queue
582585
pool[kConnections].push(connection);
586+
pool[kPending]--;
583587
process.nextTick(processWaitQueue, pool);
584588
});
585589
}
586590

587591
function destroyConnection(pool: ConnectionPool, connection: Connection, reason: string) {
588592
pool.emit(ConnectionPool.CONNECTION_CLOSED, new ConnectionClosedEvent(pool, connection, reason));
589593

590-
// allow more connections to be created
591-
pool[kPermits]++;
592-
593594
// destroy the connection
594595
process.nextTick(() => connection.destroy());
595596
}
@@ -624,6 +625,7 @@ function processWaitQueue(pool: ConnectionPool) {
624625
const isStale = connectionIsStale(pool, connection);
625626
const isIdle = connectionIsIdle(pool, connection);
626627
if (!isStale && !isIdle && !connection.closed) {
628+
pool[kCheckedOut]++;
627629
pool.emit(
628630
ConnectionPool.CONNECTION_CHECKED_OUT,
629631
new ConnectionCheckedOutEvent(pool, connection)
@@ -647,6 +649,7 @@ function processWaitQueue(pool: ConnectionPool) {
647649
if (!waitQueueMember || waitQueueMember[kCancelled]) {
648650
if (!err && connection) {
649651
pool[kConnections].push(connection);
652+
pool[kPending]--;
650653
}
651654

652655
pool[kProcessingWaitQueue] = false;
@@ -659,6 +662,8 @@ function processWaitQueue(pool: ConnectionPool) {
659662
new ConnectionCheckOutFailedEvent(pool, err)
660663
);
661664
} else if (connection) {
665+
pool[kCheckedOut]++;
666+
pool[kPending]--;
662667
pool.emit(
663668
ConnectionPool.CONNECTION_CHECKED_OUT,
664669
new ConnectionCheckedOutEvent(pool, connection)
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
{
2+
"version": 1,
3+
"style": "unit",
4+
"description": "must correctly update pool stats when checking in a connection",
5+
"poolOptions": {
6+
"minPoolSize": 3
7+
},
8+
"operations": [
9+
{
10+
"name": "waitForEvent",
11+
"event": "ConnectionCreated",
12+
"count": 3
13+
},
14+
{
15+
"name": "waitForEvent",
16+
"event": "ConnectionReady",
17+
"count": 3
18+
},
19+
{
20+
"name": "checkOut",
21+
"label": "conn"
22+
},
23+
{
24+
"name": "checkIn",
25+
"connection": "conn"
26+
}
27+
],
28+
"events": [
29+
{
30+
"type": "ConnectionPoolCreated",
31+
"address": 42,
32+
"options": 42,
33+
"totalConnectionCount": 0,
34+
"availableConnectionCount": 0,
35+
"pendingConnectionCount": 0,
36+
"currentCheckedOutCount": 0
37+
},
38+
{
39+
"type": "ConnectionCreated",
40+
"connectionId": 42,
41+
"address": 42
42+
},
43+
{
44+
"type": "ConnectionCreated",
45+
"connectionId": 42,
46+
"address": 42
47+
},
48+
{
49+
"type": "ConnectionCreated",
50+
"connectionId": 42,
51+
"address": 42
52+
},
53+
{
54+
"type": "ConnectionCheckedOut",
55+
"connectionId": 42,
56+
"address": 42
57+
},
58+
{
59+
"type": "ConnectionCheckedIn",
60+
"connectionId": 42,
61+
"address": 42,
62+
"currentCheckedOutCount": 0,
63+
"availableConnectionCount": 3,
64+
"pendingConnectionCount": 0,
65+
"totalConnectionCount": 3
66+
}
67+
],
68+
"ignore": [
69+
"ConnectionReady",
70+
"ConnectionClosed",
71+
"ConnectionCheckOutStarted"
72+
]
73+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
{
2+
"version": 1,
3+
"style": "unit",
4+
"description": "must correctly update pool stats when checking out a connection",
5+
"poolOptions": {
6+
"minPoolSize": 3
7+
},
8+
"operations": [
9+
{
10+
"name": "waitForEvent",
11+
"event": "ConnectionCreated",
12+
"count": 3
13+
},
14+
{
15+
"name": "waitForEvent",
16+
"event": "ConnectionReady",
17+
"count": 3
18+
},
19+
{
20+
"name": "checkOut"
21+
}
22+
],
23+
"events": [
24+
{
25+
"type": "ConnectionPoolCreated",
26+
"address": 42,
27+
"options": 42,
28+
"currentCheckedOutCount": 0,
29+
"availableConnectionCount": 0,
30+
"pendingConnectionCount": 0,
31+
"totalConnectionCount": 0
32+
},
33+
{
34+
"type": "ConnectionCreated",
35+
"connectionId": 42,
36+
"address": 42
37+
},
38+
{
39+
"type": "ConnectionCreated",
40+
"connectionId": 42,
41+
"address": 42
42+
},
43+
{
44+
"type": "ConnectionCreated",
45+
"connectionId": 42,
46+
"address": 42
47+
},
48+
{
49+
"type": "ConnectionCheckedOut",
50+
"connectionId": 42,
51+
"address": 42,
52+
"currentCheckedOutCount": 1,
53+
"availableConnectionCount": 2,
54+
"pendingConnectionCount": 0,
55+
"totalConnectionCount": 3
56+
}
57+
],
58+
"ignore": [
59+
"ConnectionReady",
60+
"ConnectionClosed",
61+
"ConnectionCheckOutStarted"
62+
]
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
{
2+
"version": 1,
3+
"style": "unit",
4+
"description": "must correctly update pool stats when populating the pool up to minPoolSize",
5+
"poolOptions": {
6+
"minPoolSize": 3
7+
},
8+
"operations": [
9+
{
10+
"name": "waitForEvent",
11+
"event": "ConnectionCreated",
12+
"count": 3
13+
},
14+
{
15+
"name": "waitForEvent",
16+
"event": "ConnectionReady",
17+
"count": 3
18+
}
19+
],
20+
"events": [
21+
{
22+
"type": "ConnectionPoolCreated",
23+
"address": 42,
24+
"options": 42,
25+
"currentCheckedOutCount": 0,
26+
"availableConnectionCount": 0,
27+
"pendingConnectionCount": 0,
28+
"totalConnectionCount": 0
29+
},
30+
{
31+
"type": "ConnectionCreated",
32+
"connectionId": 42,
33+
"address": 42,
34+
"currentCheckedOutCount": 0,
35+
"availableConnectionCount": 0,
36+
"pendingConnectionCount": 3,
37+
"totalConnectionCount": 3
38+
},
39+
{
40+
"type": "ConnectionCreated",
41+
"connectionId": 42,
42+
"address": 42,
43+
"availableConnectionCount": 1,
44+
"pendingConnectionCount": 2,
45+
"totalConnectionCount": 3
46+
},
47+
{
48+
"type": "ConnectionCreated",
49+
"connectionId": 42,
50+
"address": 42,
51+
"availableConnectionCount": 2,
52+
"pendingConnectionCount": 1,
53+
"totalConnectionCount": 3
54+
}
55+
],
56+
"ignore": [
57+
"ConnectionReady",
58+
"ConnectionClosed"
59+
]
60+
}

0 commit comments

Comments
 (0)