Skip to content

Commit e42390e

Browse files
kanongilMarsup
authored andcommitted
Properly handle multiple iterators
1 parent c5567f6 commit e42390e

File tree

2 files changed

+196
-25
lines changed

2 files changed

+196
-25
lines changed

lib/index.js

Lines changed: 60 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,12 @@ exports.Team = class {
7777

7878
exports.Events = class {
7979

80-
#pending = null;
81-
#queue = [];
80+
#iterators = new Set();
81+
82+
static _iterators(instance) {
83+
84+
return instance.#iterators;
85+
}
8286

8387
static isIterator(iterator) {
8488

@@ -87,45 +91,40 @@ exports.Events = class {
8791

8892
iterator() {
8993

90-
return new internals.EventsIterator(this);
91-
}
94+
const iterator = new internals.EventsIterator(this);
9295

93-
emit(value) {
96+
this.#iterators.add(iterator);
9497

95-
this._queue({ value, done: false });
98+
return iterator;
9699
}
97100

98-
end() {
101+
emit(value) {
99102

100-
this._queue({ done: true });
103+
for (const iterator of this.#iterators) {
104+
iterator._queue({ value, done: false });
105+
}
101106
}
102107

103-
_next() {
108+
end() {
104109

105-
if (this.#queue.length) {
106-
return Promise.resolve(this.#queue.shift());
110+
for (const iterator of this.#iterators) {
111+
iterator._queue({ done: true });
107112
}
108-
109-
this.#pending = new exports.Team();
110-
return this.#pending.work;
111113
}
112114

113-
_queue(item) {
115+
_remove(iterator) {
114116

115-
if (this.#pending) {
116-
this.#pending.attend(item);
117-
this.#pending = null;
118-
}
119-
else {
120-
this.#queue.push(item);
121-
}
117+
this.#iterators.delete(iterator);
122118
}
123119
};
124120

125121

126122
internals.EventsIterator = class {
127123

128-
#events = null;
124+
#events;
125+
126+
#pending = null;
127+
#queue = [];
129128

130129
constructor(events) {
131130

@@ -139,6 +138,43 @@ internals.EventsIterator = class {
139138

140139
next() {
141140

142-
return this.#events._next();
141+
if (this.#queue.length) {
142+
return Promise.resolve(this.#queue.shift());
143+
}
144+
145+
if (!this.#events) {
146+
return { done: true };
147+
}
148+
149+
this.#pending = new exports.Team();
150+
return this.#pending.work;
151+
}
152+
153+
return() {
154+
155+
this._cleanup();
156+
157+
return { done: true };
158+
}
159+
160+
_cleanup() {
161+
162+
this.#events?._remove(this);
163+
this.#events = null;
164+
}
165+
166+
_queue(item) {
167+
168+
if (item.done) {
169+
this._cleanup();
170+
}
171+
172+
if (this.#pending) {
173+
this.#pending.attend(item);
174+
this.#pending = null;
175+
}
176+
else {
177+
this.#queue.push(item);
178+
}
143179
}
144180
};

test/index.js

Lines changed: 136 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,21 +235,156 @@ describe('Events', () => {
235235
events.end();
236236

237237
expect(await collect).to.equal([1, 2, 3]);
238+
expect(Teamwork.Events._iterators(events)).to.equal(new Set());
238239
});
239240

240241
it('iterates over events (queued)', async () => {
241242

242243
const events = new Teamwork.Events();
244+
const iterator = events.iterator();
245+
243246
events.emit(1);
244247
events.emit(2);
245248
events.emit(3);
246249
events.end();
247250

248251
const items = [];
249-
for await (const item of events.iterator()) {
252+
for await (const item of iterator) {
250253
items.push(item);
251254
}
252255

253256
expect(items).to.equal([1, 2, 3]);
257+
expect(Teamwork.Events._iterators(events)).to.equal(new Set());
258+
});
259+
260+
it('only iterates over new events after iterator() call', async () => {
261+
262+
const events = new Teamwork.Events();
263+
264+
events.emit(1);
265+
266+
const iterator = events.iterator();
267+
events.emit(2);
268+
events.emit(3);
269+
events.end();
270+
271+
const items = [];
272+
for await (const item of iterator) {
273+
items.push(item);
274+
}
275+
276+
expect(items).to.equal([2, 3]);
277+
expect(Teamwork.Events._iterators(events)).to.equal(new Set());
278+
});
279+
280+
it('returns done for consumed iterators', async () => {
281+
282+
const events = new Teamwork.Events();
283+
const iterator = events.iterator();
284+
285+
events.emit(1);
286+
events.emit(2);
287+
events.emit(3);
288+
events.end();
289+
290+
const items = [];
291+
for await (const item of iterator) {
292+
items.push(item);
293+
}
294+
295+
expect(iterator.next()).to.equal({ done: true });
296+
297+
expect(items).to.equal([1, 2, 3]);
298+
expect(Teamwork.Events._iterators(events)).to.equal(new Set());
299+
});
300+
301+
it('can use break without leaking', async () => {
302+
303+
const events = new Teamwork.Events();
304+
const iterator = events.iterator();
305+
306+
events.emit(1);
307+
events.emit(2);
308+
309+
const items = [];
310+
for await (const item of iterator) {
311+
items.push(item);
312+
break;
313+
}
314+
315+
expect(items).to.equal([1]);
316+
expect(Teamwork.Events._iterators(events)).to.equal(new Set());
317+
});
318+
319+
it('can throw without leaking', async () => {
320+
321+
const events = new Teamwork.Events();
322+
const iterator = events.iterator();
323+
324+
events.emit(1);
325+
events.emit(2);
326+
327+
const items = [];
328+
await expect((async () => {
329+
330+
for await (const item of iterator) {
331+
items.push(item);
332+
throw new Error('fail');
333+
}
334+
})()).to.reject('fail');
335+
336+
expect(items).to.equal([1]);
337+
expect(Teamwork.Events._iterators(events)).to.equal(new Set());
338+
});
339+
340+
it('works with multiple iterators (serial)', async () => {
341+
342+
const events = new Teamwork.Events();
343+
const iter1 = events.iterator();
344+
const iter2 = events.iterator();
345+
346+
events.emit(1);
347+
events.emit(2);
348+
events.emit(3);
349+
events.end();
350+
351+
const items1 = [];
352+
for await (const item1 of iter1) {
353+
items1.push(item1);
354+
}
355+
356+
const items2 = [];
357+
for await (const item2 of iter2) {
358+
items2.push(item2);
359+
}
360+
361+
expect(items1).to.equal([1, 2, 3]);
362+
expect(items2).to.equal([1, 2, 3]);
363+
});
364+
365+
it('works with multiple iterators (interleaved)', async () => {
366+
367+
const events = new Teamwork.Events();
368+
const iter1 = events.iterator();
369+
const iter2 = events.iterator();
370+
371+
events.emit(1);
372+
events.emit(2);
373+
events.emit(3);
374+
events.end();
375+
376+
const items1 = [];
377+
const items2 = [];
378+
for await (const item1 of iter1) {
379+
items1.push(item1);
380+
if (items2.length === 0) {
381+
for await (const item2 of iter2) {
382+
items2.push(item2);
383+
}
384+
}
385+
}
386+
387+
expect(items1).to.equal([1, 2, 3]);
388+
expect(items2).to.equal([1, 2, 3]);
254389
});
255390
});

0 commit comments

Comments
 (0)