Skip to content

Commit 4b8f367

Browse files
1. add promiseEmitter to preserveOrder: false Promise.race to address case of: infinite concurrency + async iterable producing >1 element
2. use `!isSyncIterator` as shortcut for `isPromiseLike(next)` (`next` is promise iff iterator is async) 3. add `trySpawn` to the `returnValue === pMapSkip && preserveOrder && (promise mapping next input iterable element is pending` branch 4. add tests for changes (1) and (3) 5. tests `rangeAround` helper 6. extra `pMapSkip` tests 7. test for #76
1 parent ba50244 commit 4b8f367

File tree

2 files changed

+158
-26
lines changed

2 files changed

+158
-26
lines changed

index.js

Lines changed: 51 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -193,28 +193,61 @@ export function pMapIterable(
193193

194194
return {
195195
async * [Symbol.asyncIterator]() {
196-
const iterator = iterable[Symbol.asyncIterator] === undefined ? iterable[Symbol.iterator]() : iterable[Symbol.asyncIterator]();
196+
const isSyncIterator = iterable[Symbol.asyncIterator] === undefined;
197+
const iterator = isSyncIterator ? iterable[Symbol.iterator]() : iterable[Symbol.asyncIterator]();
197198

198199
const promises = [];
199200
const promisesIndexFromInputIndex = {};
200201
const inputIndexFromPromisesIndex = [];
201202
let runningMappersCount = 0;
202203
let isDone = false;
203204
let inputIndex = 0;
204-
let outputIndex = 0; // Only used when `preserveOrder: false`
205+
let outputIndex = 0; // Only used when `preserveOrder: true`
206+
207+
// This event emitter prevents the race conditions that arises when:
208+
// - `preserveOrder: false`
209+
// - `promises` are added after `Promise.race` is invoked, since `Promise.race` only races the promises that existed in its input array at call time
210+
// More specifically, this occurs when (in addition to `preserveOrder: false`):
211+
// - `concurrency === Number.PositiveInfinity && Number.PositiveInfinity === backpressure`
212+
// - this forces us to forgo eagerly filling the `promises` pool to avoid infinite recursion
213+
// - IMO this is the root of this problem, and a problem in and of itself: we should consider requiring a finite concurrency & backpressure
214+
// - given the inability to eagerly filing the `promises` pool with infinite concurrency & backpressure, there are some situations in which specifying
215+
// a finite concurrency & backpressure will be faster than specifying the otherwise faster-sounding infinite concurrency & backpressure
216+
// - an async iterator input iterable
217+
// - `mapNext` can't `trySpawn` until it `await`s its `next`, since the input iterable might be done
218+
// - the initial `trySpawn` thus ends when the execution of `mapNext` is suspended to `await next`
219+
// - the input iterable produces more than one element
220+
// - the (single) running `mapNext`'s `trySpawn` _will necessarily_ (since concurrency and backpressure are infinite)
221+
// start another `mapNext` promise that `trySpawn` adds to `promises`
222+
// - this additional promise does not partake in the already-running `nextPromise`, because its underlying `Promise.race` began without it,
223+
// when the initial `trySpawn` returned and `nextPromise` was invoked from the main loop
224+
const promiseEmitter = new EventTarget(); // Only used when `preserveOrder: false`
225+
const promiseEmitterEvent = 'promiseFulfilled';
205226

206227
const nextPromise = preserveOrder
207228
// Treat `promises` as a queue
208229
? () => {
209-
// May be undefined bc of `pMapSkip`s
230+
// May be `undefined` bc of `pMapSkip`s
210231
while (promisesIndexFromInputIndex[outputIndex] === undefined) {
211232
outputIndex += 1;
212233
}
213234

214235
return promises[promisesIndexFromInputIndex[outputIndex++]];
215236
}
216237
// Treat `promises` as a pool (order doesn't matter)
217-
: () => Promise.race(promises);
238+
: () => Promise.race([
239+
// Ensures correctness in the case that mappers resolve between the time that one `await nextPromise()` resolves and the next `nextPromise` call is made
240+
// (these promises would otherwise be lost if an event emitter is not listening - the `promises` pool buffers resolved promises to be processed)
241+
// (I wonder if it may be actually be possible to convert the `preserveOrder: false` case to _exclusively_ event-based,
242+
// but such a solution may get messy since we'd want to `yield` from a callback, likely requiring a resolved promises buffer anyway...)
243+
Promise.race(promises),
244+
// Ensures correctness in the case that more promises are added to `promises` after the initial `nextPromise` call is made
245+
// (these additional promises are not be included in the above `Promise.race`)
246+
// (see comment above `promiseEmitter` declaration for details on when this can occur)
247+
new Promise(resolve => {
248+
promiseEmitter.addEventListener(promiseEmitterEvent, r => resolve(r.detail), {once: true});
249+
}),
250+
]);
218251

219252
function popPromise(inputIndex) {
220253
// Swap the fulfilled promise with the last element to avoid an O(n) shift to the `promises` array
@@ -239,7 +272,7 @@ export function pMapIterable(
239272
let next;
240273
try {
241274
next = iterator.next();
242-
if (isPromiseLike(next)) {
275+
if (!isSyncIterator) { // `!isSyncIterator` iff `isPromiseLike(next)`, but former is already computed
243276
// Optimization: if our concurrency and/or backpressure is bounded (so that we won't infinitely recurse),
244277
// and we need to `await` the next `iterator` element, we first eagerly spawn more `mapNext` promises,
245278
// so that these promises can begin `await`ing their respective `iterator` elements (if needed) and `mapper` results in parallel.
@@ -250,6 +283,7 @@ export function pMapIterable(
250283
// However, the time needed to `await` and ignore these `done` promises is presumed to be small relative to the time needed to perform common
251284
// `async` operations like disk reads, network requests, etc.
252285
// Overall, this can reduce the total time taken to process all elements.
286+
// Potential TODO: in the `concurrency === Number.POSITIVE_INFINITY` case, we could potentially still optimize here by eagerly spawning some # of promises.
253287
if (backpressure !== Number.POSITIVE_INFINITY) {
254288
// Spawn if still below concurrency and backpressure limit
255289
trySpawn();
@@ -291,12 +325,15 @@ export function pMapIterable(
291325
if (returnValue === pMapSkip) {
292326
// If `preserveOrder: true`, resolve to the next inputIndex's promise, in case we are already being `await`ed
293327
// NOTE: no chance that `myInputIndex + 1`-spawning code is waiting to be executed in another part of the event loop,
294-
// but currently `promisesIndexFromInputIndex[myInputIndex + 1] === undefined` (so that we incorrectly `mapNext` and
295-
// this potentially-currently-awaited promise resolves to the result of mapping a later element than a different member of
296-
// `promises`, i.e. `promises` resolve out of order), because all `trySpawn`/`mapNext` calls execute the bookkeeping synchronously,
297-
// before any `await`s.
328+
// but currently `promisesIndexFromInputIndex[myInputIndex + 1] === undefined` (so that we incorrectly skip this `if` condition and
329+
// instead call `mapNext`, causing this potentially-currently-awaited promise to resolve to the result of mapping an element
330+
// of the input iterable that was produced later `myInputIndex + 1`, i.e., no chance `promises` resolve out of order, because:
331+
// all `trySpawn`/`mapNext` calls execute their bookkeeping synchronously, before any `await`s, so we cannot observe an intermediate
332+
// state in which input the promise mapping iterable element `myInputIndex + 1` has not been recorded in the `promisesIndexFromInputIndex` ledger.
298333
if (preserveOrder && promisesIndexFromInputIndex[myInputIndex + 1] !== undefined) {
299334
popPromise(myInputIndex);
335+
// Spawn if still below backpressure limit and just dropped below concurrency limit
336+
trySpawn();
300337
return promises[promisesIndexFromInputIndex[myInputIndex + 1]];
301338
}
302339

@@ -321,24 +358,26 @@ export function pMapIterable(
321358
// Reserve index in `promises` array: we don't actually have the promise to save yet,
322359
// but we don't want recursive `trySpawn` calls to use this same index.
323360
// This is safe (i.e., the empty slot won't be `await`ed) because we replace the value immediately,
324-
// without yielding to the event loop, so no consumers (namely `getAndRemoveFromPoolNextPromise`)
361+
// without yielding to the event loop, so no consumers (namely `nextPromise`)
325362
// can observe the intermediate state.
326363
const promisesIndex = promises.length++;
327364
promises[promisesIndex] = mapNext(promisesIndex);
365+
promises[promisesIndex].then(p => promiseEmitter.dispatchEvent(new CustomEvent(promiseEmitterEvent, {detail: p})));
328366
}
329367

368+
// Bootstrap `promises`
330369
trySpawn();
331370

332371
while (promises.length > 0) {
333-
const {result: {error, done, value}, inputIndex} = await nextPromise();// eslint-disable-line no-await-in-loop
372+
const {result: {error, done, value}, inputIndex} = await nextPromise(); // eslint-disable-line no-await-in-loop
334373
popPromise(inputIndex);
335374

336375
if (error) {
337376
throw error;
338377
}
339378

340379
if (done) {
341-
// When `preserveOrder: false`, ignore to consume any remaining pending promises in the pool
380+
// When `preserveOrder: false`, `continue` to consume any remaining pending promises in the pool
342381
if (!preserveOrder) {
343382
continue;
344383
}

test.js

Lines changed: 107 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ class ThrowingIterator {
102102
}
103103
}
104104

105+
function rangeAround(expected) {
106+
return {start: expected - 5, end: expected + 50};
107+
}
108+
105109
test('main', async t => {
106110
const end = timeSpan();
107111
t.deepEqual(await pMap(sharedInput, mapper), [10, 20, 30]);
@@ -622,9 +626,9 @@ test('pMapIterable - concurrency: 2', async t => {
622626

623627
assertInRange(t, times.get(10), {start: 0, end: 50});
624628
assertInRange(t, times.get(20), {start: 0, end: 50});
625-
assertInRange(t, times.get(30), {start: 195, end: 250});
626-
assertInRange(t, times.get(40), {start: 295, end: 350});
627-
assertInRange(t, times.get(50), {start: 295, end: 350});
629+
assertInRange(t, times.get(30), rangeAround(200));
630+
assertInRange(t, times.get(40), rangeAround(300));
631+
assertInRange(t, times.get(50), rangeAround(300));
628632
});
629633

630634
test('pMapIterable - backpressure', async t => {
@@ -716,6 +720,18 @@ test('pMapIterable - complex pMapSkip pattern - concurrency 2 - preserveOrder: f
716720
t.assert(result.length === 8);
717721
});
718722

723+
test('pMapIterable - pMapSkip + preserveOrder: true + next input mapping promise pending - eagerly spawns next promise', async t => {
724+
const end = timeSpan();
725+
const testData = [
726+
[pMapSkip, 100],
727+
[2, 200],
728+
[3, 100], // Ensure 3 is spawned when pMapSkip ends (otherwise, overall runtime will be 300 ms)
729+
];
730+
const result = await collectAsyncIterable(pMapIterable(testData, mapper, {preserveOrder: true, concurrency: 2}));
731+
assertInRange(t, end(), rangeAround(200));
732+
t.deepEqual(result, [2, 3]);
733+
});
734+
719735
test('pMapIterable - async iterable input', async t => {
720736
const result = await collectAsyncIterable(pMapIterable(new AsyncTestData(sharedInput), mapper));
721737
t.deepEqual(result, [10, 20, 30]);
@@ -759,26 +775,47 @@ function * promiseGenerator() {
759775
})();
760776
}
761777

778+
// Differs from `AsyncTestData` because it is not a generator:
779+
// each `next` yields a promise that does not wait for previous `next` promises to finish.
780+
const asyncIterableDoingWorkOnEachNext = (start, stop) => {
781+
let i = start;
782+
return {
783+
[Symbol.asyncIterator]() {
784+
return {
785+
async next() {
786+
const me = i++;
787+
if (me > stop) {
788+
return {done: true};
789+
}
790+
791+
await delay(100);
792+
return {done: false, value: me};
793+
},
794+
};
795+
},
796+
};
797+
};
798+
762799
test('pMapIterable - eager spawn when input iterable returns promise', async t => {
763800
const end = timeSpan();
764-
await collectAsyncIterable(pMapIterable(promiseGenerator(), value => delay(100, {value}), {concurrency: 3}));
765-
assertInRange(t, end(), {start: 195, end: 250});
801+
await collectAsyncIterable(pMapIterable(asyncIterableDoingWorkOnEachNext(1, 3), value => value, /* value => delay(100, {value}), */{concurrency: 5}));
802+
assertInRange(t, end(), rangeAround(100));
766803
});
767804

768805
test('pMapIterable - eager spawn when input iterable returns promise incurs little overhead', async t => {
769806
const end = timeSpan();
770807
await collectAsyncIterable(pMapIterable(promiseGenerator(), value => delay(100, {value}), {concurrency: 100}));
771-
assertInRange(t, end(), {start: 195, end: 250});
808+
assertInRange(t, end(), rangeAround(200));
772809
});
773810

774811
test('pMapIterable - preserveOrder: false - yields mappings as they resolve', async t => {
775812
const end = timeSpan();
776813
const result = await collectAsyncIterable(pMapIterable(sharedInput, mapper, {preserveOrder: false}));
777814
t.deepEqual(result, [30, 20, 10]);
778-
assertInRange(t, end(), {start: 295, end: 350});
815+
assertInRange(t, end(), rangeAround(300));
779816
});
780817

781-
test('pMapIterable - preserveOrder: false - more complex example', async t => {
818+
test('pMapIterable - preserveOrder: false - more complex example - sync iterable and bounded concurrency', async t => {
782819
t.deepEqual(await collectAsyncIterable(pMapIterable([
783820
[1, 200],
784821
[2, 100],
@@ -789,6 +826,38 @@ test('pMapIterable - preserveOrder: false - more complex example', async t => {
789826
], mapper, {concurrency: 3, preserveOrder: false})), [2, 3, 1, 5, 6, 4]);
790827
});
791828

829+
test('pMapIterable - preserveOrder: false - more complex example - async iterable and unbounded concurrency', async t => {
830+
const testData = [
831+
[1, 200],
832+
[2, 125],
833+
[3, 150],
834+
[4, 200],
835+
[5, 100],
836+
[6, 75],
837+
];
838+
async function * asyncIterable() {
839+
yield * testData;
840+
}
841+
842+
t.deepEqual(await collectAsyncIterable(pMapIterable(asyncIterable(), mapper, {concurrency: Number.POSITIVE_INFINITY, preserveOrder: false})), testData.toSorted(([_aId, aMs], [_bId, bMs]) => aMs - bMs).map(([id, _ms]) => id));
843+
});
844+
845+
test('pMapIterable - preserveOrder: false - more complex example - sync promise-returning iterable and unbounded concurrency', async t => {
846+
const testData = [
847+
[1, 200],
848+
[2, 125],
849+
[3, 150],
850+
[4, 225],
851+
[5, 100],
852+
[6, 75],
853+
];
854+
function * syncPromiseReturningIterable() {
855+
yield * testData.map(d => Promise.resolve(d));
856+
}
857+
858+
t.deepEqual(await collectAsyncIterable(pMapIterable(syncPromiseReturningIterable(), mapper, {concurrency: Number.POSITIVE_INFINITY, preserveOrder: false})), testData.toSorted(([_aId, aMs], [_bId, bMs]) => aMs - bMs).map(([id, _ms]) => id));
859+
});
860+
792861
test('pMapIterable - preserveOrder: false - concurrency: 2', async t => {
793862
const input = [100, 200, 10, 36, 13, 45];
794863
const times = new Map();
@@ -799,12 +868,12 @@ test('pMapIterable - preserveOrder: false - concurrency: 2', async t => {
799868
return delay(value, {value});
800869
}, {concurrency: 2, backpressure: Number.POSITIVE_INFINITY, preserveOrder: false})), [100, 10, 36, 13, 200, 45]);
801870

802-
assertInRange(t, times.get(100), {start: 0, end: 50});
803-
assertInRange(t, times.get(200), {start: 0, end: 50});
804-
assertInRange(t, times.get(10), {start: times.get(100) + 100 - 5, end: times.get(100) + 100 + 50});
805-
assertInRange(t, times.get(36), {start: times.get(10) + 10 - 5, end: times.get(10) + 10 + 50});
806-
assertInRange(t, times.get(13), {start: times.get(36) + 36 - 5, end: times.get(36) + 36 + 50});
807-
assertInRange(t, times.get(45), {start: times.get(13) + 13 - 5, end: times.get(13) + 13 + 50});
871+
assertInRange(t, times.get(100), rangeAround(0));
872+
assertInRange(t, times.get(200), rangeAround(0));
873+
assertInRange(t, times.get(10), rangeAround(times.get(100) + 100));
874+
assertInRange(t, times.get(36), rangeAround(times.get(10) + 10));
875+
assertInRange(t, times.get(13), rangeAround(times.get(36) + 36));
876+
assertInRange(t, times.get(45), rangeAround(times.get(13) + 13));
808877
});
809878

810879
test('pMapIterable - preserveOrder: false - backpressure', async t => {
@@ -846,3 +915,27 @@ test('pMapIterable - preserveOrder: false - throws first error to settle', async
846915
}, 10],
847916
], mapper, {preserveOrder: false, concurrency: 2})), {message: 'bar'});
848917
});
918+
919+
test('pMapIterable - {concurrency: 1, backpressure: 2} => no concurrent mappers (#76)', async t => {
920+
const theLog = [];
921+
const log = message => theLog.push(message);
922+
const startLog = n => `${n}: mapper start`;
923+
const endLog = n => `${n}: mapper end`;
924+
925+
async function * source() {
926+
yield 1;
927+
yield 2;
928+
yield 3;
929+
}
930+
931+
await collectAsyncIterable(pMapIterable(source(), async n => {
932+
log(startLog(n));
933+
await delay(100);
934+
log(endLog(n));
935+
}, {
936+
concurrency: 1,
937+
backpressure: 2,
938+
}));
939+
t.deepEqual(theLog, [startLog(1), endLog(1), startLog(2), endLog(2), startLog(3), endLog(3)]);
940+
});
941+

0 commit comments

Comments
 (0)