Skip to content

Commit 9c389cd

Browse files
committed
Revert: stop failing iterator on first task error
1 parent 7ae0070 commit 9c389cd

File tree

2 files changed

+49
-40
lines changed

2 files changed

+49
-40
lines changed

src/parallelPipe.ts

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ export function parallelPipe<TInput, TOutput>(
1818
const runningTasks: Array<Promise<IteratorResult<TOutput>> | IteratorResult<TOutput>> = [];
1919
let inputDone = false;
2020
let inputIndex = 0;
21-
let iteratorFailure: unknown;
2221

2322
async function executeActionOnIterableInput(input: IteratorResult<TInput, any>): Promise<
2423
{ done: false, value: TOutput } |
@@ -54,9 +53,6 @@ export function parallelPipe<TInput, TOutput>(
5453

5554
return {
5655
async next() {
57-
if (iteratorFailure)
58-
throw iteratorFailure;
59-
6056
// start processing input once output is requested
6157
if (!runningTasks.length)
6258
pullFromInput();
@@ -65,19 +61,12 @@ export function parallelPipe<TInput, TOutput>(
6561
if (!firstRunningTask)
6662
return { done: true, value: undefined };
6763

68-
try {
69-
const { done, value } = await firstRunningTask;
64+
const { done, value } = await firstRunningTask;
7065

71-
if (!done)
72-
pullFromInput();
66+
if (!done)
67+
pullFromInput();
7368

74-
return { done, value };
75-
}
76-
catch (err: unknown) {
77-
iteratorFailure = err; // switch to failed state
78-
inputDone = true; // stop pulling new tasks
79-
throw err;
80-
}
69+
return { done, value };
8170
},
8271
[Symbol.asyncIterator]() {
8372
return this;

tests/unit/parallelPipe.test.ts

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -78,41 +78,61 @@ describe('parallelPipe', () => {
7878
caughtError = err;
7979
}
8080
expect(caughtError).toBeTruthy();
81-
expect(caughtError.message).toEqual('fail');
81+
expect(caughtError).toHaveProperty('message', 'fail');
8282
});
8383

84-
it('does not consume remaining input after an error', async () => {
85-
const inputValues = [1, 2, 3, 4, 5];
86-
let nextCalls = 0;
87-
const input: AsyncIterableIterator<number> = {
88-
next: async () => {
89-
nextCalls++;
90-
const value = inputValues[nextCalls - 1];
91-
return value === undefined ? { done: true, value: undefined } : { done: false, value };
92-
},
93-
[Symbol.asyncIterator]() {
94-
return this;
95-
}
96-
};
84+
it('propagates upstream pipe errors to downstream consumption', async () => {
85+
const input = asyncGenerator([1, 2, 3]);
86+
const iterator1 = parallelPipe(input, 1, async x => {
87+
if (x === 2)
88+
throw new Error('upstream-fail');
89+
90+
return x;
91+
});
92+
93+
const action2 = jest.fn(async x => x * 10);
94+
const iterator2 = parallelPipe(iterator1, 1, action2);
95+
96+
let caughtError;
97+
try {
98+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
99+
for await (const _ of iterator2) { /* no-op */ }
100+
}
101+
catch (err) {
102+
caughtError = err;
103+
}
104+
expect(caughtError).toBeTruthy();
105+
expect(caughtError).toHaveProperty('message', 'upstream-fail');
106+
expect(action2).toHaveBeenCalledTimes(1);
107+
});
97108

98-
const action = async (x: number) => {
109+
it('continues processing after handled upstream error when consuming via .next()', async () => {
110+
const input = asyncGenerator([1, 2, 3]);
111+
const iterator1 = parallelPipe(input, 1, async x => {
99112
if (x === 2)
100-
throw new Error('fail-2');
113+
throw new Error('upstream-fail');
101114

102115
return x;
103-
};
116+
});
104117

105-
const iterator = parallelPipe(input, 2, action);
118+
const iterator2 = parallelPipe(iterator1, 1, async x => x * 10);
106119

107-
const res1 = await iterator.next();
108-
expect(res1).toEqual({ value: 1, done: false });
120+
const res1 = await iterator2.next();
121+
expect(res1).toEqual({ value: 10, done: false });
109122

110-
await expect(iterator.next()).rejects.toThrow('fail-2');
123+
let caughtError;
124+
try {
125+
await iterator2.next();
126+
}
127+
catch (err) {
128+
caughtError = err;
129+
}
130+
expect(caughtError).toBeTruthy();
131+
expect(caughtError).toHaveProperty('message', 'upstream-fail');
111132

112-
// First `next()` fills the queue with `limit` items, then pulls one more after yielding.
113-
// After the first rejection, no additional input should be consumed.
114-
await Promise.resolve();
115-
expect(nextCalls).toBe(3);
133+
// once the error is handled by the consumer, subsequent inputs can still be processed.
134+
const res3 = await iterator2.next();
135+
expect(res3).toEqual({ value: 30, done: false });
116136
});
117137

118138
it('does not exceed the specified parallel limit', async () => {

0 commit comments

Comments
 (0)