Skip to content

Commit dc83c39

Browse files
committed
Implement flow-control for test worker IPC
1 parent f328a69 commit dc83c39

File tree

4 files changed

+52
-9
lines changed

4 files changed

+52
-9
lines changed

lib/cli.js

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -430,14 +430,17 @@ exports.run = async () => { // eslint-disable-line complexity
430430
reporter.startRun(plan);
431431

432432
if (process.env.AVA_EMIT_RUN_STATUS_OVER_IPC === 'I\'ll find a payphone baby / Take some time to talk to you') {
433+
const {controlFlow} = require('./ipc-flow-control');
434+
const bufferedSend = controlFlow(process);
435+
433436
if (process.versions.node >= '12.16.0') {
434437
plan.status.on('stateChange', evt => {
435-
process.send(evt);
438+
bufferedSend(evt);
436439
});
437440
} else {
438441
const v8 = require('v8');
439442
plan.status.on('stateChange', evt => {
440-
process.send([...v8.serialize(evt)]);
443+
bufferedSend([...v8.serialize(evt)]);
441444
});
442445
}
443446
}

lib/fork.js

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ const childProcess = require('child_process');
33
const path = require('path');
44
const fs = require('fs');
55
const Emittery = require('emittery');
6+
const {controlFlow} = require('./ipc-flow-control');
67

78
if (fs.realpathSync(__filename) !== __filename) {
89
console.warn('WARNING: `npm link ava` and the `--preserve-symlink` flag are incompatible. We have detected that AVA is linked via `npm link`, and that you are using either an early version of Node 6, or the `--preserve-symlink` flag. This breaks AVA. You should upgrade to Node 6.2.0+, avoid the `--preserve-symlink` flag, or avoid using `npm link ava`.');
@@ -45,12 +46,12 @@ module.exports = (file, options, execArgv = process.execArgv) => {
4546
emitStateChange({type: 'worker-stderr', chunk});
4647
});
4748

49+
const bufferedSend = controlFlow(subprocess);
50+
4851
let forcedExit = false;
4952
const send = evt => {
50-
if (subprocess.connected && !finished && !forcedExit) {
51-
subprocess.send({ava: evt}, () => {
52-
// Disregard errors.
53-
});
53+
if (!finished && !forcedExit) {
54+
bufferedSend({ava: evt});
5455
}
5556
};
5657

lib/ipc-flow-control.js

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
function controlFlow(channel) {
2+
let errored = false;
3+
let deliverImmediately = true;
4+
5+
const backlog = [];
6+
const deliverNext = error => {
7+
if (error !== null) {
8+
errored = true;
9+
}
10+
11+
if (errored || !channel.connected) {
12+
backlog.length = 0; // Free memory.
13+
return; // We can't send.
14+
}
15+
16+
let ok = true;
17+
while (ok && backlog.length > 0) { // Stop sending after backpressure.
18+
ok = channel.send(backlog.shift(), deliverNext);
19+
}
20+
21+
// Re-enable immediate delivery if there is no backpressure and the backlog
22+
// has been cleared.
23+
deliverImmediately = ok && backlog.length === 0;
24+
};
25+
26+
return message => {
27+
if (errored || !channel.connected) {
28+
return;
29+
}
30+
31+
if (deliverImmediately) {
32+
deliverImmediately = channel.send(message, deliverNext);
33+
} else {
34+
backlog.push(message);
35+
}
36+
};
37+
}
38+
39+
exports.controlFlow = controlFlow;

lib/worker/ipc.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
'use strict';
22
const Emittery = require('emittery');
3+
const {controlFlow} = require('../ipc-flow-control');
34

45
const emitter = new Emittery();
56
process.on('message', message => {
@@ -25,10 +26,9 @@ process.on('message', message => {
2526
exports.options = emitter.once('options');
2627
exports.peerFailed = emitter.once('peerFailed');
2728

29+
const bufferedSend = controlFlow(process);
2830
function send(evt) {
29-
if (process.connected) {
30-
process.send({ava: evt});
31-
}
31+
bufferedSend({ava: evt});
3232
}
3333

3434
exports.send = send;

0 commit comments

Comments
 (0)