Skip to content

Commit 9c54b7d

Browse files
committed
Merge pull request #25 from AnyFetch/restore-stream
Restore streams and download in child process
2 parents 938fb16 + c876910 commit 9c54b7d

File tree

4 files changed

+64
-46
lines changed

4 files changed

+64
-46
lines changed

lib/helpers/child-process.js

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,65 @@
11
"use strict";
2+
/*
3+
* A new process is spawned with this file for each task.
4+
* Inter process comunication is used to send and receive message from the parent process regarding the task.
5+
* This let us finely control the behavior of the hydration function.
6+
* If it happens to get stuck in a subprocess or in an HTTP blackhole,
7+
* the master process can simply kill the child and resume its normal operation.
8+
* Without the sub-process isolation, we get stuck quite fast and get leaks everywhere.
9+
*/
10+
11+
var crypto = require('crypto');
212
var async = require("async");
13+
var fs = require('fs');
14+
var request = require('supertest');
15+
var url = require('url');
16+
var rarity = require('rarity');
17+
318

419
process.on('message', function(task) {
520
var hydrate = require(task.functionPath);
21+
var path = '/tmp/AFH-' + crypto.randomBytes(20).toString('hex');
622
async.waterfall([
23+
/**
24+
* Download the file from task.file_path, store it in a temporary file if there is file_path
25+
*/
26+
function downloadFile(cb) {
27+
if(task.file_path) {
28+
// Download the file
29+
var stream = fs.createWriteStream(path);
30+
31+
// Store error if statusCode !== 200
32+
var err;
33+
stream.on("finish", function() {
34+
cb(err);
35+
});
36+
37+
var apiUrl = url.parse(task.file_path, false, true);
38+
var req = request(apiUrl.protocol + "//" + apiUrl.host)
39+
.get(apiUrl.path);
40+
41+
req.end().req.once('response', function(res) {
42+
if(res.statusCode !== 200) {
43+
err = 'Error when downloading file ' + task.file_path + ': ' + res.statusCode;
44+
stream.end();
45+
this.abort();
46+
}
47+
});
48+
req.pipe(stream);
49+
}
50+
else {
51+
cb(null);
52+
}
53+
},
754
function startHydration(cb) {
855
cb.urlCallback = task.options.urlCallback;
956
cb.apiUrl = task.options.apiUrl;
10-
hydrate(task.path, task.document, task.changes, cb);
57+
hydrate(path, task.document, task.changes, cb);
58+
},
59+
function cleanFile(changes, cb) {
60+
if(task.file_path) {
61+
fs.unlink(path, rarity.carry([changes], cb));
62+
}
1163
}
1264
],
1365
function(err, changes) {

lib/helpers/hydrater.js

Lines changed: 9 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,7 @@
1010
var async = require('async');
1111
var shellFork = require('child_process').fork;
1212
var request = require('supertest');
13-
var crypto = require('crypto');
1413
var restify = require('restify');
15-
var fs = require('fs');
1614
var url = require('url');
1715
var rarity = require('rarity');
1816
var util = require('util');
@@ -39,36 +37,7 @@ module.exports = function(hydraterFunction, logger, errLogger) {
3937
return function(task, done) {
4038
logger("Starting task: " + ((task.file_path) ? task.file_path : task.document.id));
4139

42-
var path = '/tmp/AFH-' + crypto.randomBytes(20).toString('hex');
4340
async.waterfall([
44-
/**
45-
* Download the file from task.file_path, store it in a temporary file if there is file_path
46-
*/
47-
function downloadFile(cb) {
48-
if(task.file_path) {
49-
var apiUrl = url.parse(task.file_path, false, true);
50-
request(apiUrl.protocol + "//" + apiUrl.host)
51-
.get(apiUrl.path)
52-
.expect(200)
53-
.end(function(err, res) {
54-
if(err) {
55-
err = new restify.BadGatewayError('Error when downloading file ' + task.file_path + ': ' + err);
56-
}
57-
cb(err, res && res.text);
58-
});
59-
}
60-
else {
61-
cb(null);
62-
}
63-
},
64-
function saveFile(res, cb) {
65-
if(res) {
66-
fs.writeFile(path, res, cb);
67-
}
68-
else {
69-
cb();
70-
}
71-
},
7241
function performHydration(cb) {
7342
var child = shellFork(__dirname + '/child-process.js', {silent: true});
7443
var stderr = "";
@@ -83,6 +52,12 @@ module.exports = function(hydraterFunction, logger, errLogger) {
8352
cleaner.called = true;
8453
cb(err, changes);
8554
}
55+
if(stdout !== "") {
56+
logger(stdout);
57+
}
58+
if(stderr !== "") {
59+
errLogger(stderr);
60+
}
8661
clearTimeout(timeout);
8762
};
8863
cleaner.called = false;
@@ -122,7 +97,7 @@ module.exports = function(hydraterFunction, logger, errLogger) {
12297

12398
child.send({
12499
functionPath: hydraterFunction,
125-
path: (task.file_path) ? path : null,
100+
file_path: task.file_path,
126101
document: task.document,
127102
changes: lib.defaultChanges(),
128103
options: options,
@@ -139,7 +114,6 @@ module.exports = function(hydraterFunction, logger, errLogger) {
139114
err = null;
140115
}
141116
cleaner(err, res.changes);
142-
143117
});
144118

145119
timeout = setTimeout(function() {
@@ -212,21 +186,13 @@ module.exports = function(hydraterFunction, logger, errLogger) {
212186
if(task.next) {
213187
task.next(new restify.InvalidContentError("ERR hydrating " + ((task.file_path) ? task.file_path : task.document.id) + err.toString()));
214188
}
215-
errLogger("ERR hydrating " + ((task.file_path) ? task.file_path : task.document.id), err);
189+
errLogger("ERR hydrating " + ((task.file_path) ? task.file_path : task.document.id), err.toString());
216190
}
217191

218192
if(res && res.statusCode && res.statusCode !== 204) {
219193
errLogger("ERR hydrating: server refused data! Code:" + res.statusCode);
220194
}
221-
222-
if(task.file_path) {
223-
fs.unlink(path, function(_err) {
224-
done(err || _err, changes);
225-
});
226-
}
227-
else {
228-
done(changes);
229-
}
195+
done(err, changes);
230196
});
231197
};
232198
};

lib/index.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ module.exports.createServer = function(config) {
2020
throw new Error("Specify `hydrater_function`");
2121
}
2222
config.logger = config.logger || console.log;
23-
config.errLogger = config.errLogger || console.warn;
23+
config.errLogger = config.errLogger || console.error;
2424

2525
// Load configuration and initialize server
2626
var hydraterEndpoint = require('./handlers/hydrater.js');

test/helpers/hydrater.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ describe("hydrate()", function() {
3636

3737
hydrate(task, function(err, changes) {
3838
changes.should.have.keys(["metadata"]);
39-
done();
39+
done(err);
4040
});
4141
});
4242

0 commit comments

Comments
 (0)