Skip to content

Commit 28b194f

Browse files
committed
fix: close potential event listeners leak
Signed-off-by: Jérôme Benoit <[email protected]>
1 parent 77b9118 commit 28b194f

File tree

3 files changed

+78
-42
lines changed

3 files changed

+78
-42
lines changed

benchmarks/benchmarks-utils.mjs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,10 +240,16 @@ const jsonIntegerSerialization = (n) => {
240240
* @returns {number} - The nth fibonacci number.
241241
*/
242242
const fibonacci = (n) => {
243+
if (n === 0) {
244+
return 0n
245+
}
246+
if (n === 1) {
247+
return 1n
248+
}
243249
n = BigInt(n)
244250
let current = 1n
245251
let previous = 0n
246-
while (--n) {
252+
while (n-- > 1n) {
247253
const tmp = current
248254
current += previous
249255
previous = tmp

src/pools/utils.ts

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -463,33 +463,38 @@ export const waitWorkerNodeEvents = async <
463463
numberOfEventsToWait: number,
464464
timeout: number,
465465
): Promise<number> => {
466-
return await new Promise<number>((resolve) => {
466+
return await new Promise<number>((resolve, reject) => {
467467
let events = 0
468468
if (numberOfEventsToWait === 0) {
469469
resolve(events)
470470
return
471471
}
472+
const listener = () => {
473+
++events
474+
if (events === numberOfEventsToWait) {
475+
if (timeoutHandle != null) clearTimeout(timeoutHandle)
476+
workerNode.removeEventListener(workerNodeEvent, listener)
477+
resolve(events)
478+
}
479+
}
480+
const timeoutHandle = timeout >= 0
481+
? setTimeout(() => {
482+
workerNode.removeEventListener(workerNodeEvent, listener)
483+
resolve(events)
484+
}, timeout)
485+
: undefined
472486
switch (workerNodeEvent) {
473487
case 'message':
474488
case 'messageerror':
475489
case 'taskFinished':
476490
case 'backPressure':
477491
case 'idle':
478492
case 'exit':
479-
workerNode.addEventListener(workerNodeEvent, () => {
480-
++events
481-
if (events === numberOfEventsToWait) {
482-
resolve(events)
483-
}
484-
})
493+
workerNode.addEventListener(workerNodeEvent, listener)
485494
break
486495
default:
487-
throw new Error('Invalid worker node event')
488-
}
489-
if (timeout >= 0) {
490-
setTimeout(() => {
491-
resolve(events)
492-
}, timeout)
496+
if (timeoutHandle != null) clearTimeout(timeoutHandle)
497+
reject(new Error('Invalid worker node event'))
493498
}
494499
})
495500
}

tests/test-utils.mjs

Lines changed: 53 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,50 +4,69 @@ export const waitWorkerNodeEvents = async (
44
pool,
55
workerNodeEvent,
66
numberOfEventsToWait,
7+
timeoutMs = 5000,
78
) => {
8-
return await new Promise((resolve) => {
9+
return await new Promise((resolve, reject) => {
910
let events = 0
1011
if (numberOfEventsToWait === 0) {
1112
resolve(events)
1213
return
1314
}
14-
const eventHandler = () => {
15-
++events
15+
const listeners = []
16+
const timeout = setTimeout(() => {
17+
listeners.forEach(({ workerNode, listener }) => {
18+
workerNode.removeEventListener(workerNodeEvent, listener)
19+
})
20+
reject(
21+
new Error(
22+
`Timed out after ${timeoutMs}ms waiting for ${numberOfEventsToWait} '${workerNodeEvent}' events. Received ${events}.`,
23+
),
24+
)
25+
}, timeoutMs)
26+
const listener = () => {
27+
events++
1628
if (events === numberOfEventsToWait) {
29+
clearTimeout(timeout)
30+
listeners.forEach(({ workerNode, listener }) => {
31+
workerNode.removeEventListener(workerNodeEvent, listener)
32+
})
1733
resolve(events)
1834
}
1935
}
2036
for (const workerNode of pool.workerNodes) {
21-
switch (workerNodeEvent) {
22-
case 'message':
23-
case 'messageerror':
24-
case 'taskFinished':
25-
case 'backPressure':
26-
case 'idle':
27-
case 'exit':
28-
workerNode.addEventListener(workerNodeEvent, eventHandler)
29-
break
30-
default:
31-
throw new Error('Invalid worker node event')
32-
}
37+
listeners.push({ workerNode, listener })
38+
workerNode.addEventListener(workerNodeEvent, listener)
3339
}
3440
})
3541
}
3642

37-
export const waitPoolEvents = async (pool, poolEvent, numberOfEventsToWait) => {
38-
return await new Promise((resolve) => {
39-
let events = 0
40-
if (numberOfEventsToWait === 0) {
41-
resolve(events)
42-
return
43-
}
44-
pool.eventTarget?.addEventListener(poolEvent, () => {
45-
++events
46-
if (events === numberOfEventsToWait) {
47-
resolve(events)
43+
export const waitPoolEvents = async (
44+
pool,
45+
poolEvent,
46+
numberOfEventsToWait,
47+
timeoutMs = 5000,
48+
) => {
49+
const eventPromises = []
50+
const eventPromise = (eventTarget, event, timeoutMs = 5000) => {
51+
return new Promise((resolve, reject) => {
52+
const timeout = setTimeout(() => {
53+
eventTarget.removeEventListener(event, listener)
54+
reject(new Error(`Event '${event}' timed out after ${timeoutMs}ms`))
55+
}, timeoutMs)
56+
57+
const listener = (evt) => {
58+
clearTimeout(timeout)
59+
eventTarget.removeEventListener(event, listener)
60+
resolve(evt)
4861
}
62+
63+
eventTarget.addEventListener(event, listener)
4964
})
50-
})
65+
}
66+
for (let i = 0; i < numberOfEventsToWait; i++) {
67+
eventPromises.push(eventPromise(pool.eventTarget, poolEvent, timeoutMs))
68+
}
69+
return await Promise.all(eventPromises)
5170
}
5271

5372
export const sleep = async (ms) => {
@@ -86,10 +105,16 @@ export const jsonIntegerSerialization = (n) => {
86105
* @returns {number} - The nth fibonacci number.
87106
*/
88107
export const fibonacci = (n) => {
108+
if (n === 0) {
109+
return 0n
110+
}
111+
if (n === 1) {
112+
return 1n
113+
}
89114
n = BigInt(n)
90115
let current = 1n
91116
let previous = 0n
92-
while (--n) {
117+
while (n-- > 1n) {
93118
const tmp = current
94119
current += previous
95120
previous = tmp

0 commit comments

Comments
 (0)