Skip to content

Commit 4e22e33

Browse files
authored
Merge pull request #1363 from yuxizhe/feat-poolcluster-promise
feat(poolCluster): add promise version
2 parents 27ab4e7 + 2cfecd9 commit 4e22e33

File tree

3 files changed

+102
-8
lines changed

3 files changed

+102
-8
lines changed

index.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ exports.connect = exports.createConnection;
1414
exports.Connection = Connection;
1515

1616
const Pool = require('./lib/pool.js');
17+
const PoolCluster = require('./lib/pool_cluster.js');
1718

1819
exports.createPool = function(config) {
1920
const PoolConfig = require('./lib/pool_config.js');
@@ -29,6 +30,8 @@ exports.createQuery = Connection.createQuery;
2930

3031
exports.Pool = Pool;
3132

33+
exports.PoolCluster = PoolCluster;
34+
3235
exports.createServer = function(handler) {
3336
const Server = require('./lib/server.js');
3437
const s = new Server();

lib/pool_cluster.js

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,8 @@ class PoolNamespace {
5555
* @returns query
5656
*/
5757
query(sql, values, cb) {
58-
const clusterNode = this._getClusterNode();
5958
const query = Connection.createQuery(sql, values, cb, {});
60-
if (clusterNode === null) {
61-
return cb(new Error('Pool does Not exists.'));
62-
}
63-
this._cluster._getConnection(clusterNode, (err, conn) => {
59+
this.getConnection((err, conn) => {
6460
if (err) {
6561
if (typeof query.onResult === 'function') {
6662
query.onResult(err);
@@ -69,9 +65,6 @@ class PoolNamespace {
6965
}
7066
return;
7167
}
72-
if (conn === 'retry') {
73-
return this.query(sql, values, cb);
74-
}
7568
try {
7669
conn.query(query).once('end', () => {
7770
conn.release();

promise.js

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,8 +428,106 @@ function createPool(opts) {
428428
'format'
429429
]);
430430

431+
class PromisePoolCluster extends EventEmitter {
432+
constructor(poolCluster, thePromise) {
433+
super();
434+
this.poolCluster = poolCluster;
435+
this.Promise = thePromise || Promise;
436+
inheritEvents(poolCluster, this, ['acquire', 'connection', 'enqueue', 'release']);
437+
}
438+
439+
getConnection() {
440+
const corePoolCluster = this.poolCluster;
441+
return new this.Promise((resolve, reject) => {
442+
corePoolCluster.getConnection((err, coreConnection) => {
443+
if (err) {
444+
reject(err);
445+
} else {
446+
resolve(new PromisePoolConnection(coreConnection, this.Promise));
447+
}
448+
});
449+
});
450+
}
451+
452+
query(sql, args) {
453+
const corePoolCluster = this.poolCluster;
454+
const localErr = new Error();
455+
if (typeof args === 'function') {
456+
throw new Error(
457+
'Callback function is not available with promise clients.'
458+
);
459+
}
460+
return new this.Promise((resolve, reject) => {
461+
const done = makeDoneCb(resolve, reject, localErr);
462+
corePoolCluster.query(sql, args, done);
463+
});
464+
}
465+
466+
of(pattern, selector) {
467+
return new PromisePoolCluster(
468+
this.poolCluster.of(pattern, selector),
469+
this.Promise
470+
);
471+
}
472+
473+
end() {
474+
const corePoolCluster = this.poolCluster;
475+
const localErr = new Error();
476+
return new this.Promise((resolve, reject) => {
477+
corePoolCluster.end(err => {
478+
if (err) {
479+
localErr.message = err.message;
480+
localErr.code = err.code;
481+
localErr.errno = err.errno;
482+
localErr.sqlState = err.sqlState;
483+
localErr.sqlMessage = err.sqlMessage;
484+
reject(localErr);
485+
} else {
486+
resolve();
487+
}
488+
});
489+
});
490+
}
491+
}
492+
493+
/**
494+
* proxy poolCluster synchronous functions
495+
*/
496+
(function (functionsToWrap) {
497+
for (let i = 0; functionsToWrap && i < functionsToWrap.length; i++) {
498+
const func = functionsToWrap[i];
499+
500+
if (
501+
typeof core.PoolCluster.prototype[func] === 'function' &&
502+
PromisePoolCluster.prototype[func] === undefined
503+
) {
504+
PromisePoolCluster.prototype[func] = (function factory(funcName) {
505+
return function () {
506+
return core.PoolCluster.prototype[funcName].apply(this.poolCluster, arguments);
507+
};
508+
})(func);
509+
}
510+
}
511+
})([
512+
'add'
513+
]);
514+
515+
function createPoolCluster(opts) {
516+
const corePoolCluster = core.createPoolCluster(opts);
517+
const thePromise = (opts && opts.Promise) || Promise;
518+
if (!thePromise) {
519+
throw new Error(
520+
'no Promise implementation available.' +
521+
'Use promise-enabled node version or pass userland Promise' +
522+
" implementation as parameter, for example: { Promise: require('bluebird') }"
523+
);
524+
}
525+
return new PromisePoolCluster(corePoolCluster, thePromise);
526+
}
527+
431528
exports.createConnection = createConnection;
432529
exports.createPool = createPool;
530+
exports.createPoolCluster = createPoolCluster;
433531
exports.escape = core.escape;
434532
exports.escapeId = core.escapeId;
435533
exports.format = core.format;

0 commit comments

Comments
 (0)