Skip to content

Commit 9562541

Browse files
author
wenzhe
committed
add preserveOrder stream option
1 parent ce8b0dc commit 9562541

File tree

1 file changed

+62
-0
lines changed

1 file changed

+62
-0
lines changed

through2-concurrent.js

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,15 @@ var through2 = require('through2');
66
function cbNoop (cb) {
77
cb();
88
}
9+
function removeFirst(arr, item) {
10+
arr.some((v, index) => {
11+
if (v === item) {
12+
arr.splice(index, 1);
13+
return true;
14+
}
15+
return false;
16+
});
17+
}
918

1019
module.exports = function concurrentThrough (options, transform, flush) {
1120
var concurrent = 0, lastCallback = null, pendingFinish = null;
@@ -18,6 +27,59 @@ module.exports = function concurrentThrough (options, transform, flush) {
1827

1928
var maxConcurrency = options.maxConcurrency || 16;
2029

30+
if (options.preserveOrder) {
31+
const sendArr = [];
32+
const doFinal = options.final || cbNoop;
33+
options.final = function(finalCb) {
34+
Promise.all(sendArr).then(() => {
35+
process.nextTick(() => {
36+
doFinal.call(this, finalCb);
37+
})
38+
});
39+
};
40+
41+
const fnFlush = function(flushCb) {
42+
if (flush) {
43+
flush.call(this, flushCb);
44+
} else {
45+
flushCb();
46+
}
47+
};
48+
49+
const fnTransform = async function(data, encoding, callback) {
50+
const sendP = new Promise((resolve, reject) => {
51+
transform.call(this, data, encoding, (err, sendData) => {
52+
if (err) {
53+
reject(err);
54+
} else {
55+
resolve(sendData);
56+
}
57+
});
58+
});
59+
sendArr.push(sendP);
60+
61+
// throttle
62+
if (sendArr.length >= maxConcurrency) {
63+
await Promise.all(sendArr.slice());
64+
const sendData = await sendP;
65+
66+
removeFirst(sendArr, sendP);
67+
callback(null, sendData);
68+
return;
69+
}
70+
71+
process.nextTick(() => {
72+
callback();
73+
});
74+
await Promise.all(sendArr.slice());
75+
const sendData = await sendP;
76+
77+
removeFirst(sendArr, sendP);
78+
this.push(sendData);
79+
};
80+
return through2(options, fnTransform, fnFlush);
81+
}
82+
2183
function _transform (message, enc, callback) {
2284
var self = this;
2385
var callbackCalled = false;

0 commit comments

Comments
 (0)