The function pulls input values from an iterable or async iterable input, runs an async action on each yielded value with a configurable concurrency limit, and returns an async iterable output in the same order as input was received. The final return value of the input iterator (if any) is forwarded as the terminal { done: true, value } of the output without applying the action.
Here is an example that requests random numbers from random.org with not more than 2 concurrent requests at a time:
const { parallelPipe } = require('async-parallel-pipe');
const fetch = require('node-fetch');
async function getRandomNumber(maxValue) {
const response = await fetch(`https://www.random.org/integers/?num=1&min=0&max=${maxValue}&col=1&base=10&format=plain&rnd=new`);
const value = await response.text();
return value;
}
(async function main() {
const inputIterable = [1, 2, 3, 4, 5].values();
const concurrentThreads = 2;
const outputIterable = parallelPipe(inputIterable, concurrentThreads, getRandomNumber);
for await (const value of outputIterable)
console.log(value);
}());Since input and output are generators, multiple parallel executions can be chained together:
const inputIterable = [1, 2, 3, 4, 5].values();
const multipliedIterable = parallelPipe(inputIterable, 3, el => el * 100);
const sumIterable = parallelPipe(multipliedIterable, 3, el => el + 2);
for await (const value of sumIterable)
console.log(value);