Skip to content

Commit 2551b77

Browse files
committed
fix: fix potential ressource leaks
Signed-off-by: Jérôme Benoit <[email protected]>
1 parent 28b194f commit 2551b77

File tree

2 files changed

+80
-48
lines changed

2 files changed

+80
-48
lines changed

src/pools/abstract-pool.ts

Lines changed: 79 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -793,52 +793,70 @@ export abstract class AbstractPool<
793793
workerNodeKey: number,
794794
message: MessageValue<Data>,
795795
): Promise<boolean> {
796-
return await new Promise<boolean>((resolve, reject) => {
797-
const taskFunctionOperationListener = (
798-
message: MessageValue<Response>,
799-
): void => {
800-
this.checkMessageWorkerId(message)
801-
const workerId = this.getWorkerInfo(workerNodeKey)?.id
802-
if (
803-
message.taskFunctionOperationStatus != null &&
804-
message.workerId === workerId
805-
) {
806-
if (message.taskFunctionOperationStatus) {
807-
resolve(true)
808-
} else {
796+
let taskFunctionOperationListener:
797+
| ((message: MessageValue<Response>) => void)
798+
| undefined
799+
try {
800+
return await new Promise<boolean>((resolve, reject) => {
801+
if (this.workerNodes[workerNodeKey] == null) {
802+
resolve(true)
803+
return
804+
}
805+
taskFunctionOperationListener = (
806+
message: MessageValue<Response>,
807+
): void => {
808+
this.checkMessageWorkerId(message)
809+
const workerId = this.getWorkerInfo(workerNodeKey)?.id
810+
if (
811+
message.taskFunctionOperationStatus != null &&
812+
message.workerId === workerId
813+
) {
814+
if (message.taskFunctionOperationStatus) {
815+
resolve(true)
816+
return
817+
}
809818
reject(
810819
new Error(
811820
`Task function operation '${message.taskFunctionOperation?.toString()}' failed on worker ${message.workerId?.toString()} with error: '${message.workerError?.error.message}'`,
812821
),
813822
)
814823
}
815-
this.deregisterWorkerMessageListener(
816-
this.getWorkerNodeKeyByWorkerId(message.workerId),
817-
taskFunctionOperationListener,
818-
)
819824
}
825+
this.registerWorkerMessageListener(
826+
workerNodeKey,
827+
taskFunctionOperationListener,
828+
)
829+
this.sendToWorker(workerNodeKey, message)
830+
})
831+
} finally {
832+
if (taskFunctionOperationListener != null) {
833+
this.deregisterWorkerMessageListener(
834+
workerNodeKey,
835+
taskFunctionOperationListener,
836+
)
820837
}
821-
this.registerWorkerMessageListener(
822-
workerNodeKey,
823-
taskFunctionOperationListener,
824-
)
825-
this.sendToWorker(workerNodeKey, message)
826-
})
838+
}
827839
}
828840

829841
private async sendTaskFunctionOperationToWorkers(
830842
message: MessageValue<Data>,
831843
): Promise<boolean> {
844+
const targetWorkerNodeKeys = [...this.workerNodes.keys()]
832845
const taskFunctionOperationsListener = (
833846
message: MessageValue<Response>,
834847
resolve: (value: boolean | PromiseLike<boolean>) => void,
835848
reject: (reason?: unknown) => void,
836849
responsesReceived: MessageValue<Response>[],
837850
): void => {
838851
this.checkMessageWorkerId(message)
839-
if (message.taskFunctionOperationStatus != null) {
852+
if (
853+
message.taskFunctionOperationStatus != null &&
854+
targetWorkerNodeKeys.includes(
855+
this.getWorkerNodeKeyByWorkerId(message.workerId),
856+
)
857+
) {
840858
responsesReceived.push(message)
841-
if (responsesReceived.length >= this.workerNodes.length) {
859+
if (responsesReceived.length >= targetWorkerNodeKeys.length) {
842860
if (
843861
responsesReceived.every(
844862
(msg) => msg.taskFunctionOperationStatus === true,
@@ -860,9 +878,13 @@ export abstract class AbstractPool<
860878
}
861879
}
862880
}
863-
let listener: (message: MessageValue<Response>) => void
881+
let listener: ((message: MessageValue<Response>) => void) | undefined
864882
try {
865883
return await new Promise<boolean>((resolve, reject) => {
884+
if (targetWorkerNodeKeys.length === 0) {
885+
resolve(true)
886+
return
887+
}
866888
const responsesReceived: MessageValue<Response>[] = []
867889
listener = (message: MessageValue<Response>) => {
868890
taskFunctionOperationsListener(
@@ -872,14 +894,16 @@ export abstract class AbstractPool<
872894
responsesReceived,
873895
)
874896
}
875-
for (const workerNodeKey of this.workerNodes.keys()) {
897+
for (const workerNodeKey of targetWorkerNodeKeys) {
876898
this.registerWorkerMessageListener(workerNodeKey, listener)
877899
this.sendToWorker(workerNodeKey, message)
878900
}
879901
})
880902
} finally {
881-
for (const workerNodeKey of this.workerNodes.keys()) {
882-
this.deregisterWorkerMessageListener(workerNodeKey, listener!)
903+
if (listener != null) {
904+
for (const workerNodeKey of targetWorkerNodeKeys) {
905+
this.deregisterWorkerMessageListener(workerNodeKey, listener)
906+
}
883907
}
884908
}
885909
}
@@ -1306,27 +1330,35 @@ export abstract class AbstractPool<
13061330
}
13071331

13081332
private async sendKillMessageToWorker(workerNodeKey: number): Promise<void> {
1309-
await new Promise<void>((resolve, reject) => {
1310-
if (this.workerNodes[workerNodeKey] == null) {
1311-
resolve()
1312-
return
1313-
}
1314-
const killMessageListener = (message: MessageValue<Response>): void => {
1315-
this.checkMessageWorkerId(message)
1316-
if (message.kill === 'success') {
1333+
let killMessageListener:
1334+
| ((message: MessageValue<Response>) => void)
1335+
| undefined
1336+
try {
1337+
await new Promise<void>((resolve, reject) => {
1338+
if (this.workerNodes[workerNodeKey] == null) {
13171339
resolve()
1318-
} else if (message.kill === 'failure') {
1319-
reject(
1320-
new Error(
1321-
`Kill message handling failed on worker ${message.workerId?.toString()}`,
1322-
),
1323-
)
1340+
return
13241341
}
1342+
killMessageListener = (message: MessageValue<Response>): void => {
1343+
this.checkMessageWorkerId(message)
1344+
if (message.kill === 'success') {
1345+
resolve()
1346+
} else if (message.kill === 'failure') {
1347+
reject(
1348+
new Error(
1349+
`Kill message handling failed on worker ${message.workerId?.toString()}`,
1350+
),
1351+
)
1352+
}
1353+
}
1354+
this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
1355+
this.sendToWorker(workerNodeKey, { kill: true })
1356+
})
1357+
} finally {
1358+
if (killMessageListener != null) {
1359+
this.deregisterWorkerMessageListener(workerNodeKey, killMessageListener)
13251360
}
1326-
// FIXME: should be registered only once
1327-
this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
1328-
this.sendToWorker(workerNodeKey, { kill: true })
1329-
})
1361+
}
13301362
}
13311363

13321364
/**

src/pools/thread/fixed.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ export class FixedThreadPool<
101101
workerNodeKey: number,
102102
listener: (message: MessageValue<Message>) => void,
103103
): void {
104-
this.workerNodes[workerNodeKey].removeEventListener(
104+
this.workerNodes[workerNodeKey]?.removeEventListener(
105105
'message',
106106
messageListenerToEventListener<Message>(listener),
107107
)

0 commit comments

Comments
 (0)