Skip to content

Commit 2d03c90

Browse files
author
wenzhe
committed
add preserveOrder stream option
1 parent ce8b0dc commit 2d03c90

File tree

1 file changed

+61
-0
lines changed

1 file changed

+61
-0
lines changed

through2-concurrent.js

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,15 @@ var through2 = require('through2');
66
function cbNoop (cb) {
77
cb();
88
}
9+
function removeFirst(arr, item) {
10+
arr.some((v, index) => {
11+
if (v === item) {
12+
arr.splice(index, 1);
13+
return true;
14+
}
15+
return false;
16+
});
17+
}
918

1019
module.exports = function concurrentThrough (options, transform, flush) {
1120
var concurrent = 0, lastCallback = null, pendingFinish = null;
@@ -18,6 +27,58 @@ module.exports = function concurrentThrough (options, transform, flush) {
1827

1928
var maxConcurrency = options.maxConcurrency || 16;
2029

30+
options.preserveOrder = true;
31+
if (options.preserveOrder) {
32+
const sendArr = [];
33+
const doFinal = options.final || cbNoop;
34+
options.final = function(finalCb) {
35+
Promise.all(sendArr).then(() => {
36+
doFinal.call(this, finalCb);
37+
});
38+
};
39+
40+
const fnFlush = function(flushCb) {
41+
if (flush) {
42+
flush.call(this, flushCb);
43+
} else {
44+
flushCb();
45+
}
46+
};
47+
48+
const fnTransform = async function(data, encoding, callback) {
49+
const sendP = new Promise((resolve, reject) => {
50+
transform.call(this, data, encoding, (err, sendData) => {
51+
if (err) {
52+
reject(err);
53+
} else {
54+
resolve(sendData);
55+
}
56+
});
57+
});
58+
sendArr.push(sendP);
59+
60+
// throttle
61+
if (sendArr.length >= maxConcurrency) {
62+
await Promise.all(sendArr.slice());
63+
const sendData = await sendP;
64+
65+
removeFirst(sendArr, sendP);
66+
callback(null, sendData);
67+
return;
68+
}
69+
70+
process.nextTick(() => {
71+
callback();
72+
});
73+
await Promise.all(sendArr.slice());
74+
const sendData = await sendP;
75+
76+
removeFirst(sendArr, sendP);
77+
this.push(sendData);
78+
};
79+
return through2(options, fnTransform, fnFlush);
80+
}
81+
2182
function _transform (message, enc, callback) {
2283
var self = this;
2384
var callbackCalled = false;

0 commit comments

Comments
 (0)