Skip to content

Commit 001f39d

Browse files
committed
notifySubscribers should use Promise.all so notifications occur in parallel
1 parent 762fe67 commit 001f39d

File tree

3 files changed

+124
-60
lines changed

3 files changed

+124
-60
lines changed

services/notify-subscribers.js

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -29,44 +29,44 @@
2929
);
3030
}
3131

32+
async function notifyOneSubscriber(resourceUrl, subscription) {
33+
const apiurl = subscription.url,
34+
startticks = moment().format('x'),
35+
parts = url.parse(apiurl),
36+
notifyProcedure = subscription.notifyProcedure,
37+
protocol = subscription.protocol;
38+
39+
try {
40+
await notifyOne(notifyProcedure, apiurl, protocol, resourceUrl);
41+
42+
subscription.ctUpdates += 1;
43+
subscription.ctConsecutiveErrors = 0;
44+
subscription.whenLastUpdate = moment().utc().format();
45+
46+
await logEvent(
47+
'Notify',
48+
sprintf(appMessages.log.notify, apiurl, parts.host, resourceUrl, parts.protocol),
49+
startticks
50+
);
51+
} catch (err) {
52+
console.error(err.message);
53+
54+
subscription.ctErrors += 1;
55+
subscription.ctConsecutiveErrors += 1;
56+
subscription.whenLastError = moment().utc().format();
57+
58+
await logEvent(
59+
'NotifyFailed',
60+
sprintf(appMessages.log.notifyFailed, apiurl, parts.host, resourceUrl, parts.protocol),
61+
startticks
62+
);
63+
}
64+
}
65+
3266
async function notifySubscribers(resourceUrl) {
3367
const subscriptions = await fetchSubscriptions(resourceUrl);
3468

35-
for (let subscription of subscriptions.pleaseNotify) {
36-
const apiurl = subscription.url,
37-
startticks = moment().format('x'),
38-
parts = url.parse(apiurl),
39-
notifyProcedure = subscription.notifyProcedure,
40-
protocol = subscription.protocol;
41-
42-
console.log(apiurl);
43-
44-
try {
45-
await notifyOne(notifyProcedure, apiurl, protocol, resourceUrl);
46-
47-
subscription.ctUpdates += 1;
48-
subscription.ctConsecutiveErrors = 0;
49-
subscription.whenLastUpdate = moment().utc().format();
50-
51-
await logEvent(
52-
'Notify',
53-
sprintf(appMessages.log.notify, apiurl, parts.host, resourceUrl, parts.protocol),
54-
startticks
55-
);
56-
} catch (err) {
57-
console.error(err.message);
58-
59-
subscription.ctErrors += 1;
60-
subscription.ctConsecutiveErrors += 1;
61-
subscription.whenLastError = moment().utc().format();
62-
63-
await logEvent(
64-
'NotifyFailed',
65-
sprintf(appMessages.log.notifyFailed, apiurl, parts.host, resourceUrl, parts.protocol),
66-
startticks
67-
);
68-
}
69-
}
69+
await Promise.all(subscriptions.pleaseNotify.map(notifyOneSubscriber.bind(null, resourceUrl)));
7070

7171
console.log('upserting subscriptions');
7272

test/mock.js

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ const https = require('https'),
1212
SECURE_MOCK_SERVER_URL = process.env.SECURE_MOCK_SERVER_URL || `https://${MOCK_SERVER_DOMAIN}:${SECURE_MOCK_SERVER_PORT}`,
1313
rpcReturnFault = require('../services/rpc-return-fault');
1414

15-
function restController(req, res) {
15+
async function restController(req, res) {
1616
const method = req.method,
1717
path = req.path;
1818

@@ -21,34 +21,35 @@ function restController(req, res) {
2121
let responseBody = this.routes[method][path].responseBody;
2222
res
2323
.status(this.routes[method][path].status)
24-
.send(typeof responseBody === 'function' ? responseBody(req) : responseBody);
24+
.send(typeof responseBody === 'function' ? await responseBody(req) : responseBody);
2525
} else {
2626
res
2727
.status(501)
2828
.send(`Unknown route ${method} ${path}`);
2929
}
3030
}
3131

32-
function rpcController(req, res) {
33-
parseRpcRequest(req)
34-
.then(request => {
35-
req.rpcBody = request;
36-
if (this.routes.RPC2[request.methodName]) {
37-
this.requests.RPC2[request.methodName].push(req);
38-
res
39-
.status(200)
40-
.send(this.routes.RPC2[request.methodName].responseBody);
41-
} else {
42-
res
43-
.status(501)
44-
.send(rpcReturnFault(1, `Unknown methodName ${request.methodName}`));
45-
}
46-
})
47-
.catch(err => {
32+
async function rpcController(req, res) {
33+
try {
34+
req.rpcBody = await parseRpcRequest(req);
35+
const method = req.rpcBody.methodName;
36+
37+
if (this.routes.RPC2[method]) {
38+
this.requests.RPC2[method].push(req);
39+
let responseBody = this.routes.RPC2[method].responseBody;
40+
res
41+
.status(200)
42+
.send(typeof responseBody === 'function' ? await responseBody(req) : responseBody);
43+
} else {
4844
res
49-
.status(500)
50-
.send(rpcReturnFault(1, err.message));
51-
});
45+
.status(501)
46+
.send(rpcReturnFault(1, `Unknown methodName ${method}`));
47+
}
48+
} catch(err) {
49+
res
50+
.status(500)
51+
.send(rpcReturnFault(1, err.message));
52+
}
5253
}
5354

5455
module.exports = {

test/ping.js

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ function ping(pingProtocol, resourceUrl, returnFormat) {
1717
let rpctext;
1818
if (null == resourceUrl) {
1919
rpctext = xmlrpc.buildCall('rssCloud.ping', [], 'xml');
20-
console.log(rpctext);
2120
} else {
2221
rpctext = xmlrpc.buildCall('rssCloud.ping', [resourceUrl], 'xml');
2322
}
@@ -91,7 +90,7 @@ for (const pingProtocol of ['XML-RPC', 'REST']) {
9190
mock.route('GET', feedPath, 200, '<RSS Feed />');
9291
mock.route('POST', pingPath, 200, 'Thanks for the update! :-)');
9392
mock.rpc(notifyProcedure, rpcReturnSuccess(true));
94-
mongodb.addSubscription(resourceUrl, notifyProcedure, apiurl, protocol);
93+
await mongodb.addSubscription(resourceUrl, notifyProcedure, apiurl, protocol);
9594

9695
let res = await ping(pingProtocol, resourceUrl, returnFormat);
9796

@@ -137,7 +136,7 @@ for (const pingProtocol of ['XML-RPC', 'REST']) {
137136
mock.route('GET', feedPath, 404, 'Not Found');
138137
mock.route('POST', pingPath, 200, 'Thanks for the update! :-)');
139138
mock.rpc(notifyProcedure, rpcReturnSuccess(true));
140-
mongodb.addSubscription(resourceUrl, notifyProcedure, apiurl, protocol);
139+
await mongodb.addSubscription(resourceUrl, notifyProcedure, apiurl, protocol);
141140

142141
let res = await ping(pingProtocol, resourceUrl, returnFormat);
143142

@@ -218,7 +217,7 @@ for (const pingProtocol of ['XML-RPC', 'REST']) {
218217
mock.route('GET', feedPath, 200, '<RSS Feed />');
219218
mock.route('POST', pingPath, 200, 'Thanks for the update! :-)');
220219
mock.rpc(notifyProcedure, rpcReturnSuccess(true));
221-
mongodb.addSubscription(resourceUrl, notifyProcedure, apiurl, protocol);
220+
await mongodb.addSubscription(resourceUrl, notifyProcedure, apiurl, protocol);
222221

223222
let res = await ping(pingProtocol, resourceUrl, returnFormat);
224223

@@ -257,6 +256,70 @@ for (const pingProtocol of ['XML-RPC', 'REST']) {
257256
}
258257
});
259258

259+
it(`should accept a ping with slow subscribers`, async function () {
260+
this.timeout(5000);
261+
262+
const feedPath = '/rss.xml',
263+
pingPath = '/feedupdated',
264+
resourceUrl = mock.serverUrl + feedPath;
265+
266+
let apiurl = ('http-post' === protocol ? mock.serverUrl : mock.secureServerUrl) + pingPath,
267+
notifyProcedure = false;
268+
269+
if ('xml-rpc' === protocol) {
270+
apiurl = mock.serverUrl + '/RPC2';
271+
notifyProcedure = 'river.feedUpdated';
272+
}
273+
274+
function slowPostResponse(req) {
275+
return new Promise(function(resolve) {
276+
setTimeout(function () {
277+
resolve('Thanks for the update! :-)');
278+
}, 1000);
279+
});
280+
}
281+
282+
mock.route('GET', feedPath, 200, '<RSS Feed />');
283+
if ('xml-rpc' === protocol) {
284+
mock.rpc(notifyProcedure, rpcReturnSuccess(true));
285+
await mongodb.addSubscription(resourceUrl, notifyProcedure, apiurl, protocol);
286+
} else {
287+
for (let i = 0; i < 10; i++) {
288+
mock.route('POST', pingPath + i, 200, slowPostResponse);
289+
await mongodb.addSubscription(resourceUrl, notifyProcedure, apiurl + i, protocol);
290+
}
291+
}
292+
293+
let res = await ping(pingProtocol, resourceUrl, returnFormat);
294+
295+
expect(res).status(200);
296+
297+
if ('XML-RPC' === pingProtocol) {
298+
expect(res.text).xml.equal(rpcReturnSuccess(true));
299+
} else {
300+
if ('JSON' === returnFormat) {
301+
expect(res.body).deep.equal({ success: true, msg: 'Thanks for the ping.' });
302+
} else {
303+
expect(res.text).xml.equal('<result success="true" msg="Thanks for the ping."/>');
304+
}
305+
}
306+
307+
expect(mock.requests.GET).property(feedPath).lengthOf(1, `Missing GET ${feedPath}`);
308+
309+
if ('xml-rpc' === protocol) {
310+
expect(mock.requests.RPC2).property(notifyProcedure).lengthOf(1, `Missing XML-RPC call ${notifyProcedure}`);
311+
expect(mock.requests.RPC2[notifyProcedure][0]).property('rpcBody');
312+
expect(mock.requests.RPC2[notifyProcedure][0].rpcBody.params[0]).equal(resourceUrl);
313+
} else {
314+
for (let i = 0; i < 10; i++) {
315+
expect(mock.requests.POST).property(pingPath + i).lengthOf(1, `Missing POST ${pingPath + i}`);
316+
expect(mock.requests.POST[pingPath + i][0]).property('body');
317+
expect(mock.requests.POST[pingPath + i][0].body).property('url');
318+
expect(mock.requests.POST[pingPath + i][0].body.url).equal(resourceUrl);
319+
}
320+
}
321+
});
322+
260323
});
261324

262325
} // end for pingProtocol

0 commit comments

Comments
 (0)