Skip to content

Commit bec7b83

Browse files
committed
fix: ensure dynamic worker can't be created at pool destroy
Signed-off-by: Jérôme Benoit <[email protected]>
1 parent 5214130 commit bec7b83

File tree

6 files changed

+80
-37
lines changed

6 files changed

+80
-37
lines changed

SECURITY.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Security matrix, currently there are no security vulnerabilities.
66

77
| Version | Supported |
88
| ------- | --------- |
9-
| 1.x.x ||
9+
| 0.5.x ||
1010

1111
## Reporting a Vulnerability
1212

src/pools/abstract-pool.ts

Lines changed: 64 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,19 @@ export abstract class AbstractPool<
104104
Response
105105
>
106106

107+
/**
108+
* Whether the pool is started or not.
109+
*/
110+
protected started: boolean
111+
/**
112+
* Whether the pool is starting or not.
113+
*/
114+
protected starting: boolean
115+
/**
116+
* Whether the pool is destroying or not.
117+
*/
118+
protected destroying: boolean
119+
107120
/**
108121
* The task functions added at runtime map:
109122
* - `key`: The task function name.
@@ -114,18 +127,6 @@ export abstract class AbstractPool<
114127
TaskFunctionObject<Data, Response>
115128
>
116129

117-
/**
118-
* Whether the pool is started or not.
119-
*/
120-
private started: boolean
121-
/**
122-
* Whether the pool is starting or not.
123-
*/
124-
private starting: boolean
125-
/**
126-
* Whether the pool is destroying or not.
127-
*/
128-
private destroying: boolean
129130
/**
130131
* Whether the minimum number of workers is starting or not.
131132
*/
@@ -757,11 +758,17 @@ export abstract class AbstractPool<
757758

758759
private isWorkerNodeBackPressured(workerNodeKey: number): boolean {
759760
const workerNode = this.workerNodes[workerNodeKey]
761+
if (workerNode == null) {
762+
return false
763+
}
760764
return workerNode.info.ready && workerNode.info.backPressure
761765
}
762766

763767
private isWorkerNodeBusy(workerNodeKey: number): boolean {
764768
const workerNode = this.workerNodes[workerNodeKey]
769+
if (workerNode == null) {
770+
return false
771+
}
765772
if (this.opts.enableTasksQueue === true) {
766773
return (
767774
workerNode.info.ready &&
@@ -774,6 +781,9 @@ export abstract class AbstractPool<
774781

775782
private isWorkerNodeIdle(workerNodeKey: number): boolean {
776783
const workerNode = this.workerNodes[workerNodeKey]
784+
if (workerNode == null) {
785+
return false
786+
}
777787
if (this.opts.enableTasksQueue === true) {
778788
return (
779789
workerNode.info.ready &&
@@ -786,6 +796,9 @@ export abstract class AbstractPool<
786796

787797
private isWorkerNodeStealing(workerNodeKey: number): boolean {
788798
const workerNode = this.workerNodes[workerNodeKey]
799+
if (workerNode == null) {
800+
return false
801+
}
789802
return (
790803
workerNode.info.ready &&
791804
(workerNode.info.continuousStealing ||
@@ -802,10 +815,6 @@ export abstract class AbstractPool<
802815
| undefined
803816
try {
804817
return await new Promise<boolean>((resolve, reject) => {
805-
if (this.workerNodes[workerNodeKey] == null) {
806-
resolve(true)
807-
return
808-
}
809818
taskFunctionOperationListener = (
810819
message: MessageValue<Response>,
811820
): void => {
@@ -885,10 +894,6 @@ export abstract class AbstractPool<
885894
let listener: ((message: MessageValue<Response>) => void) | undefined
886895
try {
887896
return await new Promise<boolean>((resolve, reject) => {
888-
if (targetWorkerNodeKeys.length === 0) {
889-
resolve(true)
890-
return
891-
}
892897
const responsesReceived: MessageValue<Response>[] = []
893898
listener = (message: MessageValue<Response>) => {
894899
taskFunctionOperationsListener(
@@ -1333,18 +1338,29 @@ export abstract class AbstractPool<
13331338
this.started = false
13341339
}
13351340

1336-
private async sendKillMessageToWorker(workerNodeKey: number): Promise<void> {
1341+
private async sendKillMessageToWorker(
1342+
workerNodeKey: number,
1343+
timeout = 1000,
1344+
): Promise<void> {
1345+
let timeoutHandle: number | undefined
13371346
let killMessageListener:
13381347
| ((message: MessageValue<Response>) => void)
13391348
| undefined
13401349
try {
13411350
await new Promise<void>((resolve, reject) => {
1342-
if (this.workerNodes[workerNodeKey] == null) {
1343-
resolve()
1344-
return
1345-
}
1351+
timeoutHandle = timeout >= 0
1352+
? setTimeout(() => {
1353+
resolve()
1354+
}, timeout)
1355+
: undefined
13461356
killMessageListener = (message: MessageValue<Response>): void => {
1347-
this.checkMessageWorkerId(message)
1357+
if (
1358+
this.workerNodes.length === 0 ||
1359+
this.workerNodes[workerNodeKey] == null
1360+
) {
1361+
resolve()
1362+
return
1363+
}
13481364
if (message.kill === 'success') {
13491365
resolve()
13501366
} else if (message.kill === 'failure') {
@@ -1359,6 +1375,9 @@ export abstract class AbstractPool<
13591375
this.sendToWorker(workerNodeKey, { kill: true })
13601376
})
13611377
} finally {
1378+
if (timeoutHandle != null) {
1379+
clearTimeout(timeoutHandle)
1380+
}
13621381
if (killMessageListener != null) {
13631382
this.deregisterWorkerMessageListener(workerNodeKey, killMessageListener)
13641383
}
@@ -1819,7 +1838,12 @@ export abstract class AbstractPool<
18191838
}
18201839

18211840
private cannotStealTask(): boolean {
1822-
return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
1841+
return (
1842+
!this.started ||
1843+
this.destroying ||
1844+
this.workerNodes.length <= 1 ||
1845+
this.info.queuedTasks === 0
1846+
)
18231847
}
18241848

18251849
private handleTask(workerNodeKey: number, task: Task<Data>): void {
@@ -2051,6 +2075,9 @@ export abstract class AbstractPool<
20512075
const { workerId } = event.detail
20522076
const sourceWorkerNode =
20532077
this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
2078+
if (sourceWorkerNode == null) {
2079+
return
2080+
}
20542081
const workerNodes = this.workerNodes
20552082
.slice()
20562083
.sort(
@@ -2088,12 +2115,18 @@ export abstract class AbstractPool<
20882115
protected readonly workerMessageListener = (
20892116
message: MessageValue<Response>,
20902117
): void => {
2091-
if (message.kill != null) {
2118+
const { kill, ready, taskFunctionsProperties, taskId, workerId } = message
2119+
const workerReadyMessage = ready != null && taskFunctionsProperties != null
2120+
// Late worker ready message received
2121+
if (this.destroying && workerReadyMessage) {
2122+
return
2123+
}
2124+
// Kill messages response are handled in dedicated listeners
2125+
if (kill != null) {
20922126
return
20932127
}
20942128
this.checkMessageWorkerId(message)
2095-
const { workerId, ready, taskId, taskFunctionsProperties } = message
2096-
if (ready != null && taskFunctionsProperties != null) {
2129+
if (workerReadyMessage) {
20972130
// Worker ready response received from worker
20982131
this.handleWorkerReadyResponse(message)
20992132
} else if (taskFunctionsProperties != null) {
@@ -2282,7 +2315,7 @@ export abstract class AbstractPool<
22822315
private readonly abortTask = (
22832316
event: CustomEvent<WorkerNodeEventDetail>,
22842317
): void => {
2285-
if (!this.started || this.destroying) {
2318+
if (!this.started) {
22862319
return
22872320
}
22882321
const { taskId, workerId } = event.detail

src/pools/selection-strategies/abstract-worker-choice-strategy.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ export abstract class AbstractWorkerChoiceStrategy<
117117

118118
/**
119119
* Check the worker node key.
120+
*
120121
* @param workerNodeKey - The worker node key to check.
121122
* @returns The worker node key if it is valid, otherwise undefined.
122123
*/

src/pools/thread/dynamic.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,11 @@ export class DynamicThreadPool<
5252

5353
/** @inheritDoc */
5454
protected override shallCreateDynamicWorker(): boolean {
55-
return (!this.full && this.internalBusy()) || this.empty
55+
return (
56+
this.started &&
57+
!this.destroying &&
58+
((!this.full && this.internalBusy()) || this.empty)
59+
)
5660
}
5761

5862
/** @inheritDoc */

src/pools/thread/fixed.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ export class FixedThreadPool<
7676
workerNodeKey: number,
7777
listener: (message: MessageValue<Message>) => void,
7878
): void {
79-
this.workerNodes[workerNodeKey].addEventListener(
79+
this.workerNodes[workerNodeKey]?.addEventListener(
8080
'message',
8181
messageListenerToEventListener<Message>(listener),
8282
)
@@ -87,7 +87,7 @@ export class FixedThreadPool<
8787
workerNodeKey: number,
8888
listener: (message: MessageValue<Message>) => void,
8989
): void {
90-
this.workerNodes[workerNodeKey].addEventListener(
90+
this.workerNodes[workerNodeKey]?.addEventListener(
9191
'message',
9292
messageListenerToEventListener<Message>(listener),
9393
{

tests/pools/abstract-pool.test.mjs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1440,7 +1440,10 @@ describe({
14401440
const elapsedTime = performance.now() - startTime
14411441
expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
14421442
expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1443-
expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
1443+
// Worker kill message response timeout is 1000ms
1444+
expect(elapsedTime).toBeLessThanOrEqual(
1445+
tasksFinishedTimeout + 1000 * tasksFinished + 100,
1446+
)
14441447
})
14451448

14461449
it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
@@ -1468,7 +1471,9 @@ describe({
14681471
await pool.destroy()
14691472
const elapsedTime = performance.now() - startTime
14701473
expect(tasksFinished).toBe(0)
1471-
expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
1474+
expect(elapsedTime).toBeLessThanOrEqual(
1475+
tasksFinishedTimeout + 1000 * tasksFinished + 100,
1476+
)
14721477
})
14731478

14741479
it('Verify that hasTaskFunction() is working', async () => {

0 commit comments

Comments
 (0)