Skip to content

Commit c987b14

Browse files
committed
fix: fixed promise leak and event emitter warnings
1 parent d0a83f5 commit c987b14

File tree

3 files changed

+69
-37
lines changed

3 files changed

+69
-37
lines changed

src/nodes/NodeConnectionQueue.ts

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -188,29 +188,35 @@ export class NodeConnectionQueue {
188188
queue: Array<unknown>,
189189
ctx: ContextCancellable,
190190
): Promise<void> {
191-
const abortP = utils.signalPromise(ctx.signal).catch(() => {});
192-
while (
193-
!this.connectionMade &&
194-
queue.length === 0 &&
195-
(this.nodesRunningSignal.size > 0 ||
196-
this.nodesRunningDirect.size > 0 ||
197-
this.queueSignal.length > 0 ||
198-
this.queueDirect.length > 0)
199-
) {
200-
if (ctx.signal.aborted) return;
201-
if (this.nodesRunningSignal.size + this.nodesRunningDirect.size === 0) {
202-
// Yield to the event loop to allow queued attempts to start
203-
await utils.sleep(0);
204-
continue;
205-
}
206-
const runningPs: Array<Promise<void>> = [];
207-
for (const P of this.nodesRunningSignal) {
208-
runningPs.push(P);
209-
}
210-
for (const P of this.nodesRunningDirect) {
211-
runningPs.push(P);
191+
const abortP = utils.signalPromise(ctx.signal);
192+
try {
193+
while (
194+
!this.connectionMade &&
195+
queue.length === 0 &&
196+
(this.nodesRunningSignal.size > 0 ||
197+
this.nodesRunningDirect.size > 0 ||
198+
this.queueSignal.length > 0 ||
199+
this.queueDirect.length > 0)
200+
) {
201+
if (ctx.signal.aborted) return;
202+
if (this.nodesRunningSignal.size + this.nodesRunningDirect.size === 0) {
203+
// Yield to the event loop to allow queued attempts to start
204+
await utils.sleep(0);
205+
continue;
206+
}
207+
const runningPs: Array<Promise<void>> = [];
208+
for (const P of this.nodesRunningSignal) {
209+
runningPs.push(P);
210+
}
211+
for (const P of this.nodesRunningDirect) {
212+
runningPs.push(P);
213+
}
214+
await Promise.any([...runningPs, abortP]);
212215
}
213-
await Promise.any([...runningPs, abortP]);
216+
} finally {
217+
// Clean up abort promise when done
218+
abortP.cancel();
219+
await abortP;
214220
}
215221
}
216222
}

src/nodes/NodeManager.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,8 +281,14 @@ class NodeManager {
281281
const task = await this.updateRefreshBucketDelay(i, 0, false);
282282
refreshBuckets.push(task.promise());
283283
}
284-
const signalProm = utils.signalPromise(ctx.signal).catch(() => {});
285-
await Promise.race([Promise.all(refreshBuckets), signalProm]);
284+
const signalProm = utils.signalPromise(ctx.signal);
285+
await Promise.race([Promise.all(refreshBuckets), signalProm]).finally(
286+
async () => {
287+
// Clean up signal promise when done
288+
signalProm.cancel();
289+
await signalProm;
290+
},
291+
);
286292
};
287293

288294
public readonly syncNodeGraphHandlerId: TaskHandlerId =
@@ -652,6 +658,7 @@ class NodeManager {
652658
): Promise<[[Host, Port], NodeContactAddressData]> {
653659
// Setting up intermediate signal
654660
const abortController = new AbortController();
661+
utils.setMaxListeners(abortController.signal);
655662
const newCtx = {
656663
timer: ctx.timer,
657664
signal: abortController.signal,
@@ -787,6 +794,7 @@ class NodeManager {
787794
): Promise<[[Host, Port], NodeContactAddressData]> {
788795
// Setting up intermediate signal
789796
const abortController = new AbortController();
797+
utils.setMaxListeners(abortController.signal);
790798
const newCtx = {
791799
timer: ctx.timer,
792800
signal: abortController.signal,

src/utils/utils.ts

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import type {
88
import os from 'os';
99
import process from 'process';
1010
import path from 'path';
11+
import nodesEvents from 'events';
1112
import lexi from 'lexicographic-integer';
13+
import { PromiseCancellable } from '@matrixai/async-cancellable';
1214
import * as utilsErrors from './errors';
1315

1416
const AsyncFunction = (async () => {}).constructor;
@@ -237,19 +239,20 @@ function promise<T = void>(): PromiseDeconstructed<T> {
237239
* Promise constructed from signal
238240
* This rejects when the signal is aborted
239241
*/
240-
function signalPromise(signal: AbortSignal): Promise<void> {
241-
return new Promise<void>((_, reject) => {
242-
if (signal.aborted) {
243-
reject(signal.reason);
244-
return;
245-
}
246-
signal.addEventListener(
247-
'abort',
248-
() => {
249-
reject(signal.reason);
250-
},
251-
{ once: true },
252-
);
242+
// fixme: There is also a one signal to many `signalPromise` relationship in the NM connection queue that needs to be fixed.
243+
function signalPromise(signal: AbortSignal): PromiseCancellable<void> {
244+
return new PromiseCancellable((resolve, _, signalCancel) => {
245+
// Short circuit if signal already aborted
246+
if (signal.aborted) return resolve();
247+
// Short circuit if promise is already cancelled
248+
if (signalCancel.aborted) return resolve();
249+
const handler = () => {
250+
signalCancel.removeEventListener('abort', handler);
251+
signal.removeEventListener('abort', handler);
252+
resolve();
253+
};
254+
signal.addEventListener('abort', handler, { once: true });
255+
signalCancel.addEventListener('abort', handler, { once: true });
253256
});
254257
}
255258

@@ -465,6 +468,20 @@ async function yieldMicro() {
465468
return await new Promise<void>((r) => queueMicrotask(r));
466469
}
467470

471+
/**
472+
* Increases the total number of registered event handlers before a node warning is emitted.
473+
* In most cases this is not needed but in the case where you have one event emitter for multiple handlers you'll need
474+
* to increase the limit.
475+
* @param target - The specific `EventTarget` or `EventEmitter` to increase the warning for.
476+
* @param limit - The limit before the warning is emitted, defaults to 100000.
477+
*/
478+
function setMaxListeners(
479+
target: EventTarget | NodeJS.EventEmitter,
480+
limit: number = 100000,
481+
) {
482+
nodesEvents.setMaxListeners(limit, target);
483+
}
484+
468485
export {
469486
AsyncFunction,
470487
GeneratorFunction,
@@ -503,4 +520,5 @@ export {
503520
bufferWrap,
504521
isBufferSource,
505522
yieldMicro,
523+
setMaxListeners,
506524
};

0 commit comments

Comments
 (0)