Skip to content

Commit 5bfb3e8

Browse files
committed
fix: fix worker node cross tasks stealing
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
1 parent e7fc096 commit 5bfb3e8

File tree

8 files changed

+59
-20
lines changed

8 files changed

+59
-20
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@ and this project adheres to
88

99
## [Unreleased]
1010

11+
### Fixed
12+
13+
- Avoid worker node cross tasks stealing.
14+
- Ensure only half the pool worker nodes can steal tasks.
15+
1116
## [0.1.9] - 2023-12-22
1217

1318
### Changed

src/pools/abstract-pool.ts

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,14 @@ export abstract class AbstractPool<
296296
: accumulator,
297297
0,
298298
),
299+
...(this.opts.enableTasksQueue === true &&
300+
{
301+
stealingWorkerNodes: this.workerNodes.reduce(
302+
(accumulator, workerNode) =>
303+
workerNode.info.stealing ? accumulator + 1 : accumulator,
304+
0,
305+
),
306+
}),
299307
busyWorkerNodes: this.workerNodes.reduce(
300308
(accumulator, _workerNode, workerNodeKey) =>
301309
this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
@@ -1375,6 +1383,10 @@ export abstract class AbstractPool<
13751383
})
13761384
}
13771385

1386+
private cannotStealTask(): boolean {
1387+
return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
1388+
}
1389+
13781390
private handleTask(workerNodeKey: number, task: Task<Data>): void {
13791391
if (this.shallExecuteTask(workerNodeKey)) {
13801392
this.executeTask(workerNodeKey, task)
@@ -1387,7 +1399,7 @@ export abstract class AbstractPool<
13871399
if (workerNodeKey === -1) {
13881400
return
13891401
}
1390-
if (this.workerNodes.length <= 1) {
1402+
if (this.cannotStealTask()) {
13911403
return
13921404
}
13931405
while (this.tasksQueueSize(workerNodeKey) > 0) {
@@ -1481,22 +1493,29 @@ export abstract class AbstractPool<
14811493
event: CustomEvent<WorkerNodeEventDetail>,
14821494
previousStolenTask?: Task<Data>,
14831495
): void => {
1484-
if (this.workerNodes.length <= 1) {
1485-
return
1486-
}
14871496
const { workerNodeKey } = event.detail
14881497
if (workerNodeKey == null) {
14891498
throw new Error(
1490-
'WorkerNode event detail workerNodeKey attribute must be defined',
1499+
'WorkerNode event detail workerNodeKey property must be defined',
14911500
)
14921501
}
1502+
if (
1503+
this.cannotStealTask() || (this.info.stealingWorkerNodes as number) >
1504+
Math.floor(this.workerNodes.length / 2)
1505+
) {
1506+
if (previousStolenTask != null) {
1507+
this.getWorkerInfo(workerNodeKey).stealing = false
1508+
}
1509+
return
1510+
}
14931511
const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
14941512
if (
14951513
previousStolenTask != null &&
14961514
workerNodeTasksUsage.sequentiallyStolen > 0 &&
14971515
(workerNodeTasksUsage.executing > 0 ||
14981516
this.tasksQueueSize(workerNodeKey) > 0)
14991517
) {
1518+
this.getWorkerInfo(workerNodeKey).stealing = false
15001519
for (
15011520
const taskName of this.workerNodes[workerNodeKey].info
15021521
.taskFunctionNames as string[]
@@ -1511,6 +1530,7 @@ export abstract class AbstractPool<
15111530
)
15121531
return
15131532
}
1533+
this.getWorkerInfo(workerNodeKey).stealing = true
15141534
const stolenTask = this.workerNodeStealTask(workerNodeKey)
15151535
if (
15161536
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
@@ -1556,6 +1576,7 @@ export abstract class AbstractPool<
15561576
const sourceWorkerNode = workerNodes.find(
15571577
(sourceWorkerNode, sourceWorkerNodeKey) =>
15581578
sourceWorkerNode.info.ready &&
1579+
!sourceWorkerNode.info.stealing &&
15591580
sourceWorkerNodeKey !== workerNodeKey &&
15601581
sourceWorkerNode.usage.tasks.queued > 0,
15611582
)
@@ -1576,7 +1597,10 @@ export abstract class AbstractPool<
15761597
private readonly handleBackPressureEvent = (
15771598
event: CustomEvent<WorkerNodeEventDetail>,
15781599
): void => {
1579-
if (this.workerNodes.length <= 1) {
1600+
if (
1601+
this.cannotStealTask() || (this.info.stealingWorkerNodes as number) >
1602+
Math.floor(this.workerNodes.length / 2)
1603+
) {
15801604
return
15811605
}
15821606
const { workerId } = event.detail
@@ -1596,16 +1620,19 @@ export abstract class AbstractPool<
15961620
if (
15971621
sourceWorkerNode.usage.tasks.queued > 0 &&
15981622
workerNode.info.ready &&
1623+
!workerNode.info.stealing &&
15991624
workerNode.info.id !== workerId &&
16001625
workerNode.usage.tasks.queued <
16011626
(this.opts.tasksQueueOptions?.size as number) - sizeOffset
16021627
) {
1628+
this.getWorkerInfo(workerNodeKey).stealing = true
16031629
const task = sourceWorkerNode.popTask() as Task<Data>
16041630
this.handleTask(workerNodeKey, task)
16051631
this.updateTaskStolenStatisticsWorkerUsage(
16061632
workerNodeKey,
16071633
task.name as string,
16081634
)
1635+
this.getWorkerInfo(workerNodeKey).stealing = false
16091636
}
16101637
}
16111638
}

src/pools/pool.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ export interface PoolInfo {
7171
readonly workerNodes: number
7272
/** Pool idle worker nodes. */
7373
readonly idleWorkerNodes: number
74+
/** Pool stealing worker nodes. */
75+
readonly stealingWorkerNodes?: number
7476
/** Pool busy worker nodes. */
7577
readonly busyWorkerNodes: number
7678
readonly executedTasks: number

src/pools/worker-node.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
196196
type: getWorkerType(worker) as WorkerType,
197197
dynamic: false,
198198
ready: false,
199+
stealing: false,
199200
}
200201
}
201202

src/pools/worker.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,11 @@ export interface WorkerInfo {
146146
* Ready flag.
147147
*/
148148
ready: boolean
149+
/**
150+
* Stealing flag.
151+
* This flag is set to `true` when worker node is stealing tasks from another worker node.
152+
*/
153+
stealing: boolean
149154
/**
150155
* Task function names.
151156
*/

tests/pools/abstract-pool.test.mjs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,6 +1028,7 @@ Deno.test({
10281028
type: WorkerTypes.web,
10291029
dynamic: false,
10301030
ready: true,
1031+
stealing: false,
10311032
})
10321033
}
10331034
await pool.destroy()
@@ -1046,6 +1047,7 @@ Deno.test({
10461047
type: WorkerTypes.web,
10471048
dynamic: false,
10481049
ready: true,
1050+
stealing: false,
10491051
})
10501052
}
10511053
await pool.destroy()
@@ -1447,7 +1449,7 @@ Deno.test({
14471449
stub(
14481450
pool,
14491451
'hasBackPressure',
1450-
returnsNext(Array(5).fill(true)),
1452+
returnsNext(Array(7).fill(true)),
14511453
)
14521454
expect(pool.emitter.eventNames()).toStrictEqual([])
14531455
const promises = new Set()
@@ -1476,6 +1478,7 @@ Deno.test({
14761478
maxSize: expect.any(Number),
14771479
workerNodes: expect.any(Number),
14781480
idleWorkerNodes: expect.any(Number),
1481+
stealingWorkerNodes: expect.any(Number),
14791482
busyWorkerNodes: expect.any(Number),
14801483
executedTasks: expect.any(Number),
14811484
executingTasks: expect.any(Number),
@@ -1485,7 +1488,7 @@ Deno.test({
14851488
stolenTasks: expect.any(Number),
14861489
failedTasks: expect.any(Number),
14871490
})
1488-
assertSpyCalls(pool.hasBackPressure, 5)
1491+
assertSpyCalls(pool.hasBackPressure, 7)
14891492
pool.hasBackPressure.restore()
14901493
await pool.destroy()
14911494
},

tests/pools/worker-node.test.mjs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ Deno.test({
120120
type: WorkerTypes.web,
121121
dynamic: false,
122122
ready: false,
123+
stealing: false,
123124
})
124125
expect(threadWorkerNode.usage).toStrictEqual({
125126
tasks: {

tests/worker/thread-worker.test.mjs

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ Deno.test('Thread worker test suite', async (t) => {
3232
return 2
3333
}
3434
const worker = new ThreadWorker({ fn1, fn2 })
35+
worker.port = {
36+
postMessage: stub(() => {}),
37+
}
3538
expect(worker.removeTaskFunction(0, fn1)).toStrictEqual({
3639
status: false,
3740
error: new TypeError('name parameter is not a string'),
@@ -40,9 +43,6 @@ Deno.test('Thread worker test suite', async (t) => {
4043
status: false,
4144
error: new TypeError('name parameter is an empty string'),
4245
})
43-
worker.port = {
44-
postMessage: stub(() => {}),
45-
}
4646
expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(
4747
Function,
4848
)
@@ -85,17 +85,12 @@ Deno.test('Thread worker test suite', async (t) => {
8585
})
8686

8787
await t.step(
88-
'Verify worker invokes the postMessage() method on port property',
88+
'Verify that sendToMainWorker() method invokes the port property postMessage() method',
8989
() => {
90-
class SpyWorker extends ThreadWorker {
91-
constructor(fn) {
92-
super(fn)
93-
this.port = {
94-
postMessage: stub(() => {}),
95-
}
96-
}
90+
const worker = new ThreadWorker(() => {})
91+
worker.port = {
92+
postMessage: stub(() => {}),
9793
}
98-
const worker = new SpyWorker(() => {})
9994
worker.sendToMainWorker({ ok: 1 })
10095
assertSpyCalls(worker.port.postMessage, 1)
10196
worker.port.postMessage.restore()

0 commit comments

Comments
 (0)