diff --git a/config/epp-config-example.json b/config/epp-config-example.json index a885bbe..282dff8 100644 --- a/config/epp-config-example.json +++ b/config/epp-config-example.json @@ -27,7 +27,8 @@ "DNSSEC": { "xmlns": "urn:ietf:params:xml:ns:secDNS-1.1" } - } + }, + "connections":[10, 100] }, "registry-test2": { "ssl": true, @@ -57,7 +58,8 @@ "keyvalue": { "xmlns": "http://schema.ispapi.net/epp/xml/keyvalue-1.0" } - } + }, + "connections":[10, 100] } }, "rabbitmq": { diff --git a/lib/dispatcher.js b/lib/dispatcher.js index cabc2e0..d476e61 100644 --- a/lib/dispatcher.js +++ b/lib/dispatcher.js @@ -65,9 +65,11 @@ Dispatcher.prototype.processMessage = function processMessage(m) { loginTransactionId).then( function(data) { logger.log("Got login data: ", data.toString()); + process.send(currentState.loggedIn); }, function(error) { logger.error("Unable to login: ", error); + process.send(currentState.loggedIn); }); }, 2000); diff --git a/lib/epp-factory.js b/lib/epp-factory.js index d50ee0b..b674174 100644 --- a/lib/epp-factory.js +++ b/lib/epp-factory.js @@ -20,6 +20,9 @@ var logger = new (winston.Logger)({ function EppFactory() {} EppFactory.generate = function(registry, config) { + if (!registry) { + return; + } var epp = new EPP(registry, config); config.extensionClasses.forEach(function(extensionClass) { var extension = extensionClass.extension; diff --git a/lib/event-dispatcher.js b/lib/event-dispatcher.js deleted file mode 100644 index 3f17cf1..0000000 --- a/lib/event-dispatcher.js +++ /dev/null @@ -1,18 +0,0 @@ - -var events = require('events'); -var util = require('util'); - -EventDispatcher = function (){ - events.EventEmitter.call(this); - this.childFree = function (registry) { - this.emit('childFree', registry); - }; - this.queueChild = function (registry) { - this.emit('queueChild', registry); - }; - -}; -util.inherits(EventDispatcher, events.EventEmitter); - - -module.exports = EventDispatcher; diff --git a/lib/listener.js b/lib/listener.js deleted file mode 100644 index cdb68a1..0000000 --- a/lib/listener.js +++ /dev/null @@ -1,43 +0,0 @@ -var winston = require('winston'); -var nconf = require('nconf'); - -nconf.env() -var log_level = (nconf.get('LOG_LEVEL') || 'debug').toLowerCase(); -var logger = new (winston.Logger)({ - transports: [ - new (winston.transports.Console)({ level: log_level }) - ] -}); - -var available; -var busy; -var childQueue = []; -var eventer; -function Listener(eventDispatcher, availableProcesses) { - eventer = eventDispatcher; - available = availableProcesses; - busy = {}; -} -Listener.prototype.pushChildQueue = function (child) { - childQueue.push(child); - logger.info("Items in queue: ", childQueue.length); -}; -Listener.prototype.childFree = function(registry) { - logger.info(registry + " free "); - var childProc = busy[registry]; - delete busy[registry]; - available[registry] = childProc; - eventer.queueChild(registry); -}; - -Listener.prototype.queueChild = function (registry) { - var childProc = available[registry]; - if (childProc && childQueue.length > 0) { - delete available[registry]; - busy[registry] = childProc; - var callToChild = childQueue.shift(); - callToChild(childProc); - } -}; - -module.exports = Listener; diff --git a/lib/node-epp-server.js b/lib/node-epp-server.js index 98f10a2..f3a6a79 100644 --- a/lib/node-epp-server.js +++ b/lib/node-epp-server.js @@ -4,11 +4,10 @@ var cp = require('child_process'); var program = require('commander'); var ipfilter = require('ipfilter'); var moment = require('moment'); -var Listener = require('./listener.js'); -var EventDispatcher = require('./event-dispatcher.js'); var path = require('path'); var nconf = require('nconf'); var winston = require('winston'); +var Pool = require('generic-pool').Pool; nconf.env().file({ "file": path.resolve(__dirname, '..', 'config/epp-config.json') @@ -35,36 +34,51 @@ logger.log("Initialised with registries: ", registries); var availableProcesses = {}; for (var accred in registries) { var registry = registries[accred]; - var eppProcess = cp.fork(__dirname + '/worker.js'); - eppProcess.send({ - "registry": registry + var registryConfig = nconf.get('registries')[registry]; + var pool = new Pool({ + name : registry, + create : function(callback) { + var client = cp.fork(__dirname + '/worker.js'); + client.once('message', function(loggedIn) { + if (!loggedIn) { + client.kill(); + return callback("Login failed.", null); + } + // parameter order: err, resource + callback(null, client); + }); + client.send({ + "registry": registry + }); + }, + destroy: function(client) { + setTimeout(function() { + client.kill(); + }, 2000); + client.send({ + "command": "logout", + "data": {"kill": true} + }); + }, + validateAsync: function(client, callback) { + if (!client.connected) { + return callback(false); + } + return callback(true); + }, + returnToHead: true, + max : registryConfig['connections'][1], + min : registryConfig['connections'][0], + // specifies how long a resource can stay idle in pool before being removed + idleTimeoutMillis : 30000, + // if true, logs via console.log - can also be a function + log : true }); - availableProcesses[registry] = eppProcess; + availableProcesses[registry] = pool; } -process.on('SIGINT', function() { - var logoutResponse = function(data) { - logger.log("Got reply from logout: ", data); - }; - for (var registry in availableProcesses) { - var childProc = availableProcesses[registry]; - childProc.send({ - "command": "logout", - "data": {"kill": true} - }); - childProc.once('message', logoutResponse); - } - process.exit(0); -}); - // Wire up event/listener to keep track of available worker process. This is // to avoid individual connection workers from getting swamped. -var eventDispatch = new EventDispatcher(); -var listener = new Listener(eventDispatch, availableProcesses); -eventDispatch.on('queueChild', listener.queueChild); -eventDispatch.on('childFree', listener.childFree); - - var app = express(); app.use(bodyParser.json()); var ips = nconf.get('whitelisted_ips'); @@ -72,31 +86,72 @@ app.use(ipfilter(ips, {"mode": "allow"})); app.post('/command/:registry/:command', function(req, res) { var registry = req.params.registry; - if (! (registry in availableProcesses)) { - res.send(400, { + if (!(registry in availableProcesses)) { + res.status(400).send({ "error": "Unknown registry" }); return; } + var queryData = req.body; + var pool = availableProcesses[registry]; + pool.acquire(function(err, client) { + if (err) { + logger.error(err); + res.status(400).send({ + "error": "Session overload" + }); + return; + } + + var registryConfig = nconf.get('registries')[registry]; + var t = setTimeout(function() { + pool.destroy(client); + res.status(400).send({ + "error": "Timeout" + }); + return; + }, registryConfig['max_runtime']); - var a = moment(); - var processChild = function (childProcess) { - childProcess.once('message', function(m) { + var a = moment(); + client.once('message', function(m) { + clearTimeout(t); var b = moment(); var diff = b.diff(a, 'milliseconds'); logger.info('Request elapsed time: '+ diff.toString() + ' ms'); + m['runtime'] = diff/1000; + pool.release(client); res.send(m); - eventDispatch.childFree(registry); }); - childProcess.send({ + + client.send({ "command": req.params.command, "data": queryData }); - }; - listener.pushChildQueue(processChild); - eventDispatch.queueChild(registry); + }); +}); + +process.on('SIGINT', function(err) { + if (err) { + console.log(err.stack); + } + + for (var registry in availableProcesses) { + var pool = availableProcesses[registry]; + pool.drain(function() { + pool.destroyAllNow(); + }); + } + + setTimeout(function() { + process.exit(0); + }, 5000); }); -app.listen(program.listen); +process.on('uncaughtException', function(err) { + console.error(err); + console.error(err.stack); +}); +process.stdin.resume(); //so the program will not close instantly +app.listen(program.listen); diff --git a/lib/worker.js b/lib/worker.js index a2042cd..4dbb639 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -2,3 +2,14 @@ var Dispatcher = require('./dispatcher.js'); var dispatch = new Dispatcher(); process.on('message', dispatch.processMessage); +process.on('SIGINT', function(err) { + console.log("Try to logout."); + setTimeout(function() { + process.exit(0); + }, 2000); + process.send({ + "command": "logout", + "data": {"kill": true} + }); +}); + diff --git a/package.json b/package.json index a3764fb..95eaba2 100644 --- a/package.json +++ b/package.json @@ -33,6 +33,7 @@ "q": "*", "xml2json": "*", "winston": "*", + "generic-pool": "*", "crypto": "*" }, "devDependencies": {