Skip to content

Commit 1fe185c

Browse files
committed
Merge pull request #40 from AnyFetch/tasks-on-redis
Tasks on redis
2 parents 83b77be + bc2e15e commit 1fe185c

File tree

15 files changed

+234
-111
lines changed

15 files changed

+234
-111
lines changed

.jshintrc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
"describe",
6767
"it",
6868
"after",
69+
"afterEach",
6970
"before",
7071
"beforeEach"
7172
],

.travis.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
language: node_js
22
node_js:
33
- '0.10'
4+
services:
5+
- redis-server
46
before_script:
57
- npm install -g istanbul
68
- npm install coveralls

lib/handlers/hydrater.js

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
*/
88

99
var restify = require('restify');
10-
1110
/**
1211
* This handler receives a document to hydrate on a POST request and processes it.
1312
*
@@ -16,28 +15,36 @@ var restify = require('restify');
1615
* @param {Object} server Current server. See implementation in index.js
1716
* @param {Function} next Callback to call once res has been populated.
1817
*/
19-
module.exports = function(req, res, server, logger, next) {
20-
if(!req.params.callback && !req.params.long_poll) {
18+
module.exports = function(req, res, server, logger, errLogger, next) {
19+
if(!req.params.callback) {
2120
return next(new restify.MissingParameterError('No specified callback'));
2221
}
2322

23+
2424
res.send(202);
2525
next();
2626

2727
// Prepare the new task
28+
2829
var task = {};
2930
task.file_path = (req.params.file_path) ? req.params.file_path : null;
3031
task.callback = req.params.callback;
3132

32-
3333
task.document = req.params.document || {};
3434
task.document.metadata = task.document ? task.document.metadata || {} : {};
3535
task.document.data = task.document ? task.document.data || {} : {};
3636

3737
task.priority = (req.params.priority) ? parseInt(req.params.priority) : 0;
3838

3939
// Push it to the queue
40-
server.queue.push(task, -task.priority);
40+
var job = server.queue.createJob(task, {priority: task.priority});
41+
job.save(function(err) {
42+
if(err) {
43+
errLogger("Error while queuing: " + ((task.file_path) ? task.file_path : task.document.id) + "\nError :" + err.toString());
44+
}
45+
else {
46+
logger("Queuing: " + ((task.file_path) ? task.file_path : task.document.id));
47+
}
48+
});
4149

42-
logger("Queuing: " + ((task.file_path) ? task.file_path : task.document.id));
4350
};

lib/handlers/status.js

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,14 @@
1313
* @param {Function} next Callback to call once res has been populated.
1414
*/
1515
module.exports = function(req, res, server, logger, next) {
16-
var ret = {
17-
'status': 'ok',
18-
'queued_items': server.queue.length()
19-
};
20-
21-
res.send(ret);
22-
next();
16+
server.queue.stats(function(err, stats) {
17+
if(err) {
18+
return next(err);
19+
}
20+
else {
21+
stats.status = 'ok';
22+
res.send(stats);
23+
next();
24+
}
25+
});
2326
};

lib/helpers/Childs.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ var Childs = function(concurrency, tasksPerProcess) {
5555
return child;
5656
}
5757
};
58+
59+
this.stopAllChilds = function()  {
60+
this.childs.forEach(function(child) {
61+
child.process.kill('SIGTERM');
62+
});
63+
};
5864
};
5965

6066

lib/helpers/child-process.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"use strict";
22
/*
3-
* A new process is spawned with this file for each task.
3+
* A new process is spawned with this file for some tasks.
44
* Inter process comunication is used to send and receive message from the parent process regarding the task.
55
* This let us finely control the behavior of the hydration function.
66
* If it happens to get stuck in a subprocess or in an HTTP blackhole,

lib/helpers/hydrater.js

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,16 @@ var rarity = require('rarity');
1414
var util = require('util');
1515

1616
var lib = require('../index.js');
17-
var Childs = require('./Childs');
1817

1918
var HydrationError = lib.HydrationError;
2019

2120

2221

23-
module.exports = function(hydraterFunction, concurrency, logger, errLogger) {
22+
module.exports = function(hydraterFunction, childs, logger, errLogger) {
2423
if(!errLogger) {
2524
errLogger = logger;
2625
}
2726

28-
var tasksPerProcess = process.env.TASKS_PER_PROCESS || 100;
29-
30-
var childs = new Childs(concurrency, tasksPerProcess);
31-
32-
3327
/**
3428
* Handle a hydration task:
3529
* - Download the file
@@ -40,7 +34,8 @@ module.exports = function(hydraterFunction, concurrency, logger, errLogger) {
4034
* @param {Object} task Task object, keys must be `file_path` (file URL) and `callback` (URL)
4135
* @param {Function} done(err)
4236
*/
43-
return function(task, done) {
37+
return function(job, done) {
38+
var task = job.data;
4439
logger("Starting task: " + ((task.file_path) ? task.file_path : task.document.id));
4540

4641
async.waterfall([
@@ -222,7 +217,6 @@ module.exports = function(hydraterFunction, concurrency, logger, errLogger) {
222217
if(internalErr) {
223218
errLogger("INTERNAL ERR", internalErr);
224219
}
225-
226220
done(err || internalErr, changes);
227221
});
228222
});

lib/index.js

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22

33
var util = require('util');
44
var restify = require('restify');
5-
var async = require('async');
5+
var yaqs = require('yaqs');
6+
var Childs = require('./helpers/Childs');
67

78
/**
89
* Create a new hydration server.
@@ -23,24 +24,49 @@ module.exports.createServer = function(config) {
2324
config.errLogger = config.errLogger || console.error;
2425

2526
var concurrency = config.concurrency || 1;
27+
var tasksPerProcess = config.tasksPerProcess || 100;
2628

2729
// Load configuration and initialize server
2830
var hydraterEndpoint = require('./handlers/hydrater.js');
2931
var statusEndpoint = require('./handlers/status.js');
30-
var hydraterHelper = require('./helpers/hydrater.js')(config.hydrater_function, concurrency, config.logger, config.errLogger);
31-
var server = restify.createServer();
3232

33+
var childs = new Childs(concurrency, tasksPerProcess);
34+
35+
var hydraterHelper = require('./helpers/hydrater.js')(config.hydrater_function, childs, config.logger, config.errLogger);
36+
37+
var server = restify.createServer();
3338
// Middleware Goes Here
3439
server.use(restify.acceptParser(server.acceptable));
3540
server.use(restify.queryParser());
3641
server.use(restify.bodyParser());
3742
server.use(require('./middlewares/logger.js'));
3843

39-
server.queue = async.priorityQueue(hydraterHelper, concurrency);
44+
server.yaqsClient = yaqs({
45+
prefix: config.hydraterUrl,
46+
redis: config.redisUrl,
47+
defaultConcurrency: concurrency
48+
});
49+
50+
server.queue = server.yaqsClient.createQueue('hydration', {});
51+
server.queue.setWorker(hydraterHelper).start();
52+
53+
54+
function sigtermYaqs() {
55+
server.yaqsClient.stopAllQueues(function(err) {
56+
if(err) {
57+
config.errLogger(err);
58+
}
59+
childs.stopAllChilds();
60+
config.logger('YAQS has stopped.');
61+
process.exit(0);
62+
});
63+
}
64+
65+
process.once('SIGTERM', sigtermYaqs);
4066

4167
// Load routes
4268
server.post('/hydrate', function(req, res, next) {
43-
hydraterEndpoint(req, res, server, config.logger, next);
69+
hydraterEndpoint(req, res, server, config.logger, config.errLogger, next);
4470
});
4571
server.get('/status', function(req, res, next) {
4672
statusEndpoint(req, res, server, config.logger, next);

package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
"dependencies": {
2020
"async": "^0.9.0",
2121
"rarity": "^2.1.1",
22+
"redis": "^0.12.1",
2223
"restify": "^2.8.3",
2324
"restify-logger": "^2.0.1",
24-
"supertest": "^0.14.0"
25+
"supertest": "^0.14.0",
26+
"yaqs": "^1.0.3"
2527
},
2628
"devDependencies": {
2729
"should": "^4.1.0",

test/cleaning.js

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ var path = require("path");
44
var async = require("async");
55
var rarity = require("rarity");
66
var shellExec = require('child_process').exec;
7+
8+
var Childs = require('../lib/helpers/Childs');
79
var createFakeApi = require('./helpers/fake-api.js');
810

911
var concurrencies = [1, 2];
@@ -14,17 +16,24 @@ concurrencies.forEach(function(concurrency) {
1416
tasksPerProcess.forEach(function(_tasksPerProcess) {
1517
describe('Hydration should be cleaned every time with concurrency = ' + concurrency + ' & tasksPerProcess = ' + _tasksPerProcess , function() {
1618
var fakeApi = createFakeApi();
19+
var childs;
1720

1821
fakeApi.patch('/result', function(req, res, next) {
1922
res.send(204);
2023
next();
2124
});
25+
2226
before(function() {
2327
fakeApi.listen(4243);
2428
});
2529

26-
after(function() {
27-
fakeApi.close();
30+
afterEach(function(done) {
31+
childs.stopAllChilds();
32+
done();
33+
});
34+
35+
after(function(done) {
36+
fakeApi.close(done);
2837
});
2938

3039
it('on normal workflow', function(done) {
@@ -35,11 +44,12 @@ concurrencies.forEach(function(concurrency) {
3544
concurrency: concurrency,
3645
logger: function() {},
3746
};
47+
childs = new Childs(config.concurrency, _tasksPerProcess);
3848

39-
process.env.TASKS_PER_PROCESS = _tasksPerProcess;
40-
var hydrate = require('../lib/helpers/hydrater.js')(config.hydrater_function, config.concurrency, config.logger);
49+
var hydrate = require('../lib/helpers/hydrater.js')(config.hydrater_function, childs, config.logger);
4150

42-
var task = {
51+
var task = {};
52+
task.data = {
4353
file_path: "http://127.0.0.1:4243/afile",
4454
callback: "http://127.0.0.1:4243/result",
4555
document: {
@@ -82,11 +92,12 @@ concurrencies.forEach(function(concurrency) {
8292
concurrency: concurrency,
8393
logger: function() {},
8494
};
85-
process.env.TASKS_PER_PROCESS = _tasksPerProcess;
95+
var childs = new Childs(config.concurrency, _tasksPerProcess);
8696

87-
var hydrate = require('../lib/helpers/hydrater.js')(config.hydrater_function, config.concurrency, config.logger);
97+
var hydrate = require('../lib/helpers/hydrater.js')(config.hydrater_function, childs, config.logger);
8898

89-
var task = {
99+
var task = {};
100+
task.data = {
90101
file_path: "http://127.0.0.1:4243/afile",
91102
callback: "http://127.0.0.1:4243/result",
92103
document: {
@@ -129,11 +140,12 @@ concurrencies.forEach(function(concurrency) {
129140
concurrency: concurrency,
130141
logger: function() {},
131142
};
132-
process.env.TASKS_PER_PROCESS = _tasksPerProcess;
143+
var childs = new Childs(config.concurrency, _tasksPerProcess);
133144

134-
var hydrate = require('../lib/helpers/hydrater.js')(config.hydrater_function, config.concurrency, config.logger);
145+
var hydrate = require('../lib/helpers/hydrater.js')(config.hydrater_function, childs, config.logger);
135146

136-
var task = {
147+
var task = {};
148+
task.data = {
137149
file_path: "http://127.0.0.1:4243/afile",
138150
callback: "http://127.0.0.1:4243/result",
139151
document: {

0 commit comments

Comments
 (0)