Skip to content

Commit ba50244

Browse files
pMapIterable: test sync iterables/mappers throwing (and fix the former)
1 parent c1cfe02 commit ba50244

File tree

2 files changed

+38
-24
lines changed

2 files changed

+38
-24
lines changed

index.js

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -231,39 +231,38 @@ export function pMapIterable(
231231
}
232232

233233
async function mapNext(promisesIndex) {
234-
let next = iterator.next();
235-
236234
const myInputIndex = inputIndex++; // Save this promise's index before `trySpawn`ing others
237235
runningMappersCount++;
238236
promisesIndexFromInputIndex[myInputIndex] = promisesIndex;
239237
inputIndexFromPromisesIndex[promisesIndex] = myInputIndex;
240238

241-
if (isPromiseLike(next)) {
242-
// Optimization: if our concurrency and/or backpressure is bounded (so that we won't infinitely recurse),
243-
// and we need to `await` the next `iterator` element, we first eagerly spawn more `mapNext` promises,
244-
// so that these promises can begin `await`ing their respective `iterator` elements (if needed) and `mapper` results in parallel.
245-
// This may entail memory usage closer to `options.backpressure` than necessary, but this space was already allocated to `pMapIterable` via
246-
// `options.concurrency` and `options.backpressure`.
247-
// This may also cause iteration well past the end of the `iterator`: we don't inspect the `iterator`'s response before `trySpawn`ing
248-
// (because we are `trySpawn`ing before `await`ing the response), which will request the next `iterator` element, so we may end up spawning many promises which resolve to `done`.
249-
// However, the time needed to `await` and ignore these `done` promises is presumed to be small relative to the time needed to perform common
250-
// `async` operations like disk reads, network requests, etc.
251-
// Overall, this can reduce the total time taken to process all elements.
252-
if (backpressure !== Number.POSITIVE_INFINITY) {
253-
// Spawn if still below concurrency and backpressure limit
254-
trySpawn();
255-
}
239+
let next;
240+
try {
241+
next = iterator.next();
242+
if (isPromiseLike(next)) {
243+
// Optimization: if our concurrency and/or backpressure is bounded (so that we won't infinitely recurse),
244+
// and we need to `await` the next `iterator` element, we first eagerly spawn more `mapNext` promises,
245+
// so that these promises can begin `await`ing their respective `iterator` elements (if needed) and `mapper` results in parallel.
246+
// This may entail memory usage closer to `options.backpressure` than necessary, but this space was already allocated to `pMapIterable` via
247+
// `options.concurrency` and `options.backpressure`.
248+
// This may also cause iteration well past the end of the `iterator`: we don't inspect the `iterator`'s response before `trySpawn`ing
249+
// (because we are `trySpawn`ing before `await`ing the response), which will request the next `iterator` element, so we may end up spawning many promises which resolve to `done`.
250+
// However, the time needed to `await` and ignore these `done` promises is presumed to be small relative to the time needed to perform common
251+
// `async` operations like disk reads, network requests, etc.
252+
// Overall, this can reduce the total time taken to process all elements.
253+
if (backpressure !== Number.POSITIVE_INFINITY) {
254+
// Spawn if still below concurrency and backpressure limit
255+
trySpawn();
256+
}
256257

257-
try {
258258
next = await next;
259-
} catch (error) {
260-
isDone = true;
261-
return {result: {error}, inputIndex: myInputIndex};
262259
}
260+
} catch (error) {
261+
isDone = true;
262+
return {result: {error}, inputIndex: myInputIndex};
263263
}
264264

265265
let {done, value} = next;
266-
267266
if (done) {
268267
isDone = true;
269268
return {result: {done: true}, inputIndex: myInputIndex};

test.js

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,7 @@ test('pMapIterable - empty', async t => {
545545
t.deepEqual(await collectAsyncIterable(pMapIterable([], mapper)), []);
546546
});
547547

548-
test('pMapIterable - iterable that throws', async t => {
548+
test('pMapIterable - async iterable that throws', async t => {
549549
let isFirstNextCall = true;
550550

551551
const iterable = {
@@ -568,12 +568,27 @@ test('pMapIterable - iterable that throws', async t => {
568568
await t.throwsAsync(iterator.next(), {message: 'foo'});
569569
});
570570

571-
test('pMapIterable - mapper that throws', async t => {
571+
test('pMapIterable - sync iterable that throws', async t => {
572+
function * throwingGenerator() { // eslint-disable-line require-yield
573+
throw new Error('foo');
574+
}
575+
576+
const iterator = pMapIterable(throwingGenerator(), mapper)[Symbol.asyncIterator]();
577+
await t.throwsAsync(() => iterator.next(), {message: 'foo'});
578+
});
579+
580+
test('pMapIterable - async mapper that throws', async t => {
572581
await t.throwsAsync(collectAsyncIterable(pMapIterable(sharedInput, async () => {
573582
throw new Error('foo');
574583
})), {message: 'foo'});
575584
});
576585

586+
test('pMapIterable - sync mapper that throws', async t => {
587+
await t.throwsAsync(collectAsyncIterable(pMapIterable(sharedInput, () => {
588+
throw new Error('foo');
589+
})), {message: 'foo'});
590+
});
591+
577592
test('pMapIterable - stop on error', async t => {
578593
const output = [];
579594

0 commit comments

Comments
 (0)