Skip to content

Commit f63850b

Browse files
1. add promiseEmitter to preserveOrder: false Promise.race to address case of: infinite concurrency + async iterable producing >1 element
2. use `!isSyncIterator` as shortcut for `isPromiseLike(next)` (`next` is promise iff iterator is async) 3. add `trySpawn` to the `returnValue === pMapSkip && preserveOrder && (promise mapping next input iterable element is pending` branch 4. add tests for changes (1) and (3) 5. tests `rangeAround` helper 6. extra `pMapSkip` tests 7. test for #76
1 parent ba50244 commit f63850b

File tree

2 files changed

+165
-15
lines changed

2 files changed

+165
-15
lines changed

index.js

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -193,28 +193,58 @@ export function pMapIterable(
193193

194194
return {
195195
async * [Symbol.asyncIterator]() {
196-
const iterator = iterable[Symbol.asyncIterator] === undefined ? iterable[Symbol.iterator]() : iterable[Symbol.asyncIterator]();
196+
const isSyncIterator = iterable[Symbol.asyncIterator] === undefined;
197+
const iterator = isSyncIterator ? iterable[Symbol.iterator]() : iterable[Symbol.asyncIterator]();
197198

198199
const promises = [];
199200
const promisesIndexFromInputIndex = {};
200201
const inputIndexFromPromisesIndex = [];
201202
let runningMappersCount = 0;
202203
let isDone = false;
203204
let inputIndex = 0;
204-
let outputIndex = 0; // Only used when `preserveOrder: false`
205+
let outputIndex = 0; // Only used when `preserveOrder: true`
206+
207+
// This event emitter prevents the race conditions that arises when:
208+
// - `preserveOrder: false`
209+
// - `promises` are added after `Promise.race` is invoked, since `Promise.race` only races the promises that existed in its input array at call time
210+
// More specifically, this occurs when (in addition to `preserveOrder: false`):
211+
// - `concurrency === Number.PositiveInfinity && Number.PositiveInfinity === backpressure`
212+
// - this forces us to forgo eagerly filling the `promises` pool to avoid infinite recursion
213+
// - IMO this is the root of this problem, and a problem in and of itself: we should consider requiring a finite concurrency & backpressure
214+
// - given the inability to eagerly filing the `promises` pool with infinite concurrency & backpressure, there are some situations in which specifying
215+
// a finite concurrency & backpressure will be faster than specifying the otherwise faster-sounding infinite concurrency & backpressure
216+
// - an async iterator input iterable
217+
// - `mapNext` can't `trySpawn` until it `await`s its `next`, since the input iterable might be done
218+
// - the initial `trySpawn` thus ends when the execution of `mapNext` is suspended to `await next`
219+
// - the input iterable produces more than one element
220+
// - the (single) running `mapNext`'s `trySpawn` _will necessarily_ (since concurrency and backpressure are infinite)
221+
// start another `mapNext` promise that `trySpawn` adds to `promises`
222+
// - this additional promise does not partake in the already-running `nextPromise`, because its underlying `Promise.race` began without it,
223+
// when the initial `trySpawn` returned and `nextPromise` was invoked from the main loop
224+
const promiseEmitter = new EventTarget(); // Only used when `preserveOrder: false`
225+
const promiseEmitterEvent = 'promiseFulfilled';
205226

206227
const nextPromise = preserveOrder
207228
// Treat `promises` as a queue
208229
? () => {
209-
// May be undefined bc of `pMapSkip`s
230+
// May be `undefined` bc of `pMapSkip`s
210231
while (promisesIndexFromInputIndex[outputIndex] === undefined) {
211232
outputIndex += 1;
212233
}
213234

214235
return promises[promisesIndexFromInputIndex[outputIndex++]];
215236
}
216237
// Treat `promises` as a pool (order doesn't matter)
217-
: () => Promise.race(promises);
238+
: () => Promise.race([
239+
// Ensures correctness in the case that mappers resolve between the time that one `await nextPromise()` resolves and the next `nextPromise` call is made
240+
// (these promises would otherwise be lost if an event emitter is not listening - the `promises` pool buffers resolved promises to be processed)
241+
// (I wonder if it may be actually be possible to convert the `preserveOrder: false` case to _exclusively_ event-based,
242+
// but such a solution may get messy since we'd want to `yield` from a callback, likely requiring a resolved promises buffer anyway...)
243+
Promise.race(promises),
244+
// Ensures correctness in the case that more promises are added to `promises` after the initial `nextPromise` call is made
245+
// (these additional promises are not be included in the above `Promise.race`)
246+
new Promise(resolve => promiseEmitter.addEventListener(promiseEmitterEvent, r => resolve(r.detail) , { once: true }))
247+
]);
218248

219249
function popPromise(inputIndex) {
220250
// Swap the fulfilled promise with the last element to avoid an O(n) shift to the `promises` array
@@ -239,7 +269,7 @@ export function pMapIterable(
239269
let next;
240270
try {
241271
next = iterator.next();
242-
if (isPromiseLike(next)) {
272+
if (!isSyncIterator) { // `!isSyncIterator` iff `isPromiseLike(next)`, but former is already computed
243273
// Optimization: if our concurrency and/or backpressure is bounded (so that we won't infinitely recurse),
244274
// and we need to `await` the next `iterator` element, we first eagerly spawn more `mapNext` promises,
245275
// so that these promises can begin `await`ing their respective `iterator` elements (if needed) and `mapper` results in parallel.
@@ -250,6 +280,7 @@ export function pMapIterable(
250280
// However, the time needed to `await` and ignore these `done` promises is presumed to be small relative to the time needed to perform common
251281
// `async` operations like disk reads, network requests, etc.
252282
// Overall, this can reduce the total time taken to process all elements.
283+
// TODO: in the `concurrency === Number.POSITIVE_INFINITY` case, we could potentially still optimize here by eagerly spawning some # of promises.
253284
if (backpressure !== Number.POSITIVE_INFINITY) {
254285
// Spawn if still below concurrency and backpressure limit
255286
trySpawn();
@@ -291,12 +322,15 @@ export function pMapIterable(
291322
if (returnValue === pMapSkip) {
292323
// If `preserveOrder: true`, resolve to the next inputIndex's promise, in case we are already being `await`ed
293324
// NOTE: no chance that `myInputIndex + 1`-spawning code is waiting to be executed in another part of the event loop,
294-
// but currently `promisesIndexFromInputIndex[myInputIndex + 1] === undefined` (so that we incorrectly `mapNext` and
295-
// this potentially-currently-awaited promise resolves to the result of mapping a later element than a different member of
296-
// `promises`, i.e. `promises` resolve out of order), because all `trySpawn`/`mapNext` calls execute the bookkeeping synchronously,
297-
// before any `await`s.
325+
// but currently `promisesIndexFromInputIndex[myInputIndex + 1] === undefined` (so that we incorrectly skip this `if` condition and
326+
// instead call `mapNext`, causing this potentially-currently-awaited promise to resolve to the result of mapping an element
327+
// of the input iterable that was produced later `myInputIndex + 1`, i.e., no chance `promises` resolve out of order, because:
328+
// all `trySpawn`/`mapNext` calls execute their bookkeeping synchronously, before any `await`s, so we cannot observe an intermediate
329+
// state in which input the promise mapping iterable element `myInputIndex + 1` has not been recorded in the `promisesIndexFromInputIndex` ledger.
298330
if (preserveOrder && promisesIndexFromInputIndex[myInputIndex + 1] !== undefined) {
299331
popPromise(myInputIndex);
332+
// Spawn if still below backpressure limit and just dropped below concurrency limit
333+
trySpawn();
300334
return promises[promisesIndexFromInputIndex[myInputIndex + 1]];
301335
}
302336

@@ -321,24 +355,26 @@ export function pMapIterable(
321355
// Reserve index in `promises` array: we don't actually have the promise to save yet,
322356
// but we don't want recursive `trySpawn` calls to use this same index.
323357
// This is safe (i.e., the empty slot won't be `await`ed) because we replace the value immediately,
324-
// without yielding to the event loop, so no consumers (namely `getAndRemoveFromPoolNextPromise`)
358+
// without yielding to the event loop, so no consumers (namely `nextPromise`)
325359
// can observe the intermediate state.
326360
const promisesIndex = promises.length++;
327361
promises[promisesIndex] = mapNext(promisesIndex);
362+
promises[promisesIndex].then(p => promiseEmitter.dispatchEvent(new CustomEvent(promiseEmitterEvent, {detail: p})));
328363
}
329364

365+
// bootstrap `promises`
330366
trySpawn();
331367

332368
while (promises.length > 0) {
333-
const {result: {error, done, value}, inputIndex} = await nextPromise();// eslint-disable-line no-await-in-loop
369+
const {result: {error, done, value}, inputIndex} = await nextPromise(); // eslint-disable-line no-await-in-loop
334370
popPromise(inputIndex);
335371

336372
if (error) {
337373
throw error;
338374
}
339375

340376
if (done) {
341-
// When `preserveOrder: false`, ignore to consume any remaining pending promises in the pool
377+
// When `preserveOrder: false`, `continue` to consume any remaining pending promises in the pool
342378
if (!preserveOrder) {
343379
continue;
344380
}

test.js

Lines changed: 117 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -716,6 +716,18 @@ test('pMapIterable - complex pMapSkip pattern - concurrency 2 - preserveOrder: f
716716
t.assert(result.length === 8);
717717
});
718718

719+
test('pMapIterable - pMapSkip + preserveOrder: true + next input mapping promise pending - eagerly spawns next promise', async t => {
720+
const end = timeSpan();
721+
const testData = [
722+
[pMapSkip, 100],
723+
[2, 200],
724+
[3, 100] // ensure 3 is spawned when pMapSkip ends (otherwise, overall runtime will be 300 ms)
725+
];
726+
const result = await collectAsyncIterable(pMapIterable(testData, mapper, {preserveOrder: true, concurrency: 2}));
727+
assertInRange(t, end(), range(200, 5));
728+
t.deepEqual(result, [2, 3]);
729+
});
730+
719731
test('pMapIterable - async iterable input', async t => {
720732
const result = await collectAsyncIterable(pMapIterable(new AsyncTestData(sharedInput), mapper));
721733
t.deepEqual(result, [10, 20, 30]);
@@ -759,10 +771,57 @@ function * promiseGenerator() {
759771
})();
760772
}
761773

774+
const asyncIterableDoingWorkOnEachNext = (start, stop) => {
775+
let i = start;
776+
return {
777+
[Symbol.asyncIterator](){
778+
return {
779+
async next() {
780+
const me = i++;
781+
// console.log(`[${me}] next`);
782+
// if (i === start){
783+
// await delay(100);
784+
// }
785+
// return i > stop ? {done: true} : {done: false, value: i++};
786+
787+
// await delay(100);
788+
// const ids = [1, 2, 3];
789+
// return
790+
if(me > stop) {
791+
// console.log(`[${me}] done`);
792+
return {done: true};
793+
}
794+
// console.log(`spawning ${me}`)
795+
796+
// console.log(`[${me}] start delay`);
797+
await delay(100);
798+
// console.log(`[${me}] end delay`);
799+
return {done: false, value: me };
800+
}
801+
}
802+
}
803+
}
804+
};
805+
806+
async function *nPlusOne() {
807+
// fetch ids
808+
await delay(100);
809+
const ids = [1, 2, 3];
810+
// map ids
811+
yield * ids.map(async id => {
812+
await delay(50);
813+
return id
814+
});
815+
}
816+
817+
function range(median, tolerance){
818+
return {start: median - tolerance, end: median + tolerance}
819+
}
820+
762821
test('pMapIterable - eager spawn when input iterable returns promise', async t => {
763822
const end = timeSpan();
764-
await collectAsyncIterable(pMapIterable(promiseGenerator(), value => delay(100, {value}), {concurrency: 3}));
765-
assertInRange(t, end(), {start: 195, end: 250});
823+
await collectAsyncIterable(pMapIterable(asyncIterableDoingWorkOnEachNext(1, 3), value => value, /*value => delay(100, {value}), */{concurrency: 5}));
824+
assertInRange(t, end(), range(100, 5));
766825
});
767826

768827
test('pMapIterable - eager spawn when input iterable returns promise incurs little overhead', async t => {
@@ -778,7 +837,7 @@ test('pMapIterable - preserveOrder: false - yields mappings as they resolve', as
778837
assertInRange(t, end(), {start: 295, end: 350});
779838
});
780839

781-
test('pMapIterable - preserveOrder: false - more complex example', async t => {
840+
test('pMapIterable - preserveOrder: false - more complex example - sync iterable and bounded concurrency', async t => {
782841
t.deepEqual(await collectAsyncIterable(pMapIterable([
783842
[1, 200],
784843
[2, 100],
@@ -789,6 +848,36 @@ test('pMapIterable - preserveOrder: false - more complex example', async t => {
789848
], mapper, {concurrency: 3, preserveOrder: false})), [2, 3, 1, 5, 6, 4]);
790849
});
791850

851+
test('pMapIterable - preserveOrder: false - more complex example - async iterable and unbounded concurrency', async t => {
852+
const testData = [
853+
[1, 200],
854+
[2, 125],
855+
[3, 150],
856+
[4, 200],
857+
[5, 100],
858+
[6, 75],
859+
];
860+
async function * asyncIterable() {
861+
yield * testData;
862+
}
863+
t.deepEqual(await collectAsyncIterable(pMapIterable(asyncIterable(), mapper, {concurrency: Number.POSITIVE_INFINITY, preserveOrder: false})), testData.toSorted(([_aId, aMs], [_bId, bMs]) => aMs - bMs).map(([id, _ms]) => id));
864+
});
865+
866+
test('pMapIterable - preserveOrder: false - more complex example - sync promise-returning iterable and unbounded concurrency', async t => {
867+
const testData = [
868+
[1, 200],
869+
[2, 125],
870+
[3, 150],
871+
[4, 225],
872+
[5, 100],
873+
[6, 75],
874+
];
875+
function * syncPromiseReturningIterable() {
876+
yield * testData.map(d => Promise.resolve(d));
877+
}
878+
t.deepEqual(await collectAsyncIterable(pMapIterable(syncPromiseReturningIterable(), mapper, {concurrency: Number.POSITIVE_INFINITY, preserveOrder: false})), testData.toSorted(([_aId, aMs], [_bId, bMs]) => aMs - bMs).map(([id, _ms]) => id));
879+
});
880+
792881
test('pMapIterable - preserveOrder: false - concurrency: 2', async t => {
793882
const input = [100, 200, 10, 36, 13, 45];
794883
const times = new Map();
@@ -846,3 +935,28 @@ test('pMapIterable - preserveOrder: false - throws first error to settle', async
846935
}, 10],
847936
], mapper, {preserveOrder: false, concurrency: 2})), {message: 'bar'});
848937
});
938+
939+
940+
test('pMapIterable - {concurrency: 1, backpressure: 2} => no concurrent mappers (#76)', async t => {
941+
const theLog = [];
942+
const log = (msg) => theLog.push(msg);
943+
const startLog = (n) => `${n}: mapper start`
944+
const endLog = (n) => `${n}: mapper end`
945+
946+
async function* source() {
947+
yield 1;
948+
yield 2;
949+
yield 3;
950+
}
951+
952+
await collectAsyncIterable(pMapIterable(source(), async n => {
953+
log(startLog(n));
954+
await delay(100);
955+
log(endLog(n));
956+
}, {
957+
concurrency: 1,
958+
backpressure: 2
959+
}));
960+
t.deepEqual(theLog, [startLog(1), endLog(1), startLog(2), endLog(2), startLog(3), endLog(3)]);
961+
})
962+

0 commit comments

Comments
 (0)