Skip to content

Commit b9b5208

Browse files
committed
refactor(pool): use cancellation token for connection establishment
1 parent 2014b7b commit b9b5208

File tree

3 files changed

+26
-5
lines changed

3 files changed

+26
-5
lines changed

lib/core/connection/connect.js

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,16 +266,25 @@ function makeConnection(family, options, cancellationToken, _callback) {
266266
socket.setTimeout(connectionTimeout);
267267
socket.setNoDelay(noDelay);
268268

269+
let cancellationHandler;
269270
function errorHandler(eventName) {
270271
return err => {
271272
SOCKET_ERROR_EVENTS.forEach(event => socket.removeAllListeners(event));
273+
if (cancellationHandler) {
274+
cancellationToken.removeListener('cancel', cancellationHandler);
275+
}
276+
272277
socket.removeListener('connect', connectHandler);
273278
callback(connectionFailureError(eventName, err));
274279
};
275280
}
276281

277282
function connectHandler() {
278283
SOCKET_ERROR_EVENTS.forEach(event => socket.removeAllListeners(event));
284+
if (cancellationHandler) {
285+
cancellationToken.removeListener('cancel', cancellationHandler);
286+
}
287+
279288
if (socket.authorizationError && rejectUnauthorized) {
280289
return callback(socket.authorizationError);
281290
}
@@ -286,7 +295,8 @@ function makeConnection(family, options, cancellationToken, _callback) {
286295

287296
SOCKET_ERROR_EVENTS.forEach(event => socket.once(event, errorHandler(event)));
288297
if (cancellationToken) {
289-
cancellationToken.once('cancel', errorHandler('cancel'));
298+
cancellationHandler = errorHandler('cancel');
299+
cancellationToken.once('cancel', cancellationHandler);
290300
}
291301

292302
socket.once('connect', connectHandler);

lib/core/connection/pool.js

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,13 @@ var Pool = function(topology, options) {
9191
this.topology = topology;
9292

9393
this.s = {
94-
state: DISCONNECTED
94+
state: DISCONNECTED,
95+
cancellationToken: new EventEmitter()
9596
};
9697

98+
// we don't care how many connections are listening for cancellation
99+
this.s.cancellationToken.setMaxListeners(Infinity);
100+
97101
// Add the options
98102
this.options = Object.assign(
99103
{
@@ -629,6 +633,9 @@ Pool.prototype.unref = function() {
629633
function destroy(self, connections, options, callback) {
630634
stateTransition(self, DESTROYING);
631635

636+
// indicate that in-flight connections should cancel
637+
self.s.cancellationToken.emit('cancel');
638+
632639
eachAsync(
633640
connections,
634641
(conn, cb) => {
@@ -748,6 +755,10 @@ Pool.prototype.reset = function(callback) {
748755
return;
749756
}
750757

758+
// signal in-flight connections should be cancelled
759+
this.s.cancellationToken.emit('cancel');
760+
761+
// destroy existing connections
751762
const connections = this.availableConnections.concat(this.inUseConnections);
752763
eachAsync(
753764
connections,
@@ -984,7 +995,7 @@ function createConnection(pool, callback) {
984995
}
985996

986997
pool.connectingConnections++;
987-
connect(pool.options, (err, connection) => {
998+
connect(pool.options, pool.s.cancellationToken, (err, connection) => {
988999
pool.connectingConnections--;
9891000

9901001
if (err) {

test/unit/core/connect.test.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,10 @@ describe('Connect Tests', function() {
131131

132132
it('should allow a cancellaton token', function(done) {
133133
const cancellationToken = new EventEmitter();
134-
setTimeout(() => cancellationToken.emit('cancel'), 100);
134+
setTimeout(() => cancellationToken.emit('cancel'), 500);
135135
// set no response handler for mock server, effecively blackhole requests
136136

137-
connect(test.connectOptions, cancellationToken, (err, conn) => {
137+
connect({ host: '240.0.0.1' }, cancellationToken, (err, conn) => {
138138
expect(err).to.exist;
139139
expect(err).to.match(/connection establishment was cancelled/);
140140
expect(conn).to.not.exist;

0 commit comments

Comments
 (0)