Skip to content

Commit 8c46b36

Browse files
fix: tasks queuing fixes (#83)
* fix: tasks queuing fixes Signed-off-by: Jérôme Benoit <[email protected]> * refactor: cleanup worker selection code Signed-off-by: Jérôme Benoit <[email protected]> * refactor: code cleanups Signed-off-by: Jérôme Benoit <[email protected]> * refactor: code cleanups Signed-off-by: Jérôme Benoit <[email protected]> * docs(README.md): formatting Signed-off-by: Jérôme Benoit <[email protected]> * refactor: code cleanups Signed-off-by: Jérôme Benoit <[email protected]> --------- Signed-off-by: Jérôme Benoit <[email protected]>
1 parent 6fc2246 commit 8c46b36

22 files changed

+347
-296
lines changed

src/circular-buffer.ts

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
export const defaultBufferSize = 2048
55

66
/**
7-
* Circular buffer designed for positive numbers.
7+
* Circular buffer designed for numbers.
88
*
99
* @internal
1010
*/
@@ -25,7 +25,7 @@ export class CircularBuffer {
2525
this.writeIdx = 0
2626
this.maxArrayIdx = size - 1
2727
this.size = 0
28-
this.items = new Float32Array(size).fill(-1)
28+
this.items = new Float32Array(size)
2929
}
3030

3131
/**
@@ -52,11 +52,13 @@ export class CircularBuffer {
5252
* @param number - Number to put into buffer.
5353
*/
5454
public put(number: number): void {
55-
this.items[this.writeIdx] = number
56-
this.writeIdx = this.writeIdx === this.maxArrayIdx ? 0 : this.writeIdx + 1
57-
if (this.size < this.items.length) {
55+
if (this.full()) {
56+
this.readIdx = this.readIdx === this.maxArrayIdx ? 0 : this.readIdx + 1
57+
} else {
5858
++this.size
5959
}
60+
this.items[this.writeIdx] = number
61+
this.writeIdx = this.writeIdx === this.maxArrayIdx ? 0 : this.writeIdx + 1
6062
}
6163

6264
/**
@@ -65,11 +67,10 @@ export class CircularBuffer {
6567
* @returns Number from buffer.
6668
*/
6769
public get(): number | undefined {
68-
const number = this.items[this.readIdx]
69-
if (number === -1) {
70-
return
70+
if (this.empty()) {
71+
return undefined
7172
}
72-
this.items[this.readIdx] = -1
73+
const number = this.items[this.readIdx]
7374
this.readIdx = this.readIdx === this.maxArrayIdx ? 0 : this.readIdx + 1
7475
--this.size
7576
return number
@@ -81,7 +82,16 @@ export class CircularBuffer {
8182
* @returns Numbers' array.
8283
*/
8384
public toArray(): number[] {
84-
return Array.from(this.items.filter((item) => item !== -1))
85+
const array: number[] = []
86+
if (this.empty()) {
87+
return array
88+
}
89+
let currentIdx = this.readIdx
90+
for (let i = 0; i < this.size; i++) {
91+
array.push(this.items[currentIdx])
92+
currentIdx = currentIdx === this.maxArrayIdx ? 0 : currentIdx + 1
93+
}
94+
return array
8595
}
8696

8797
/**

src/pools/abstract-pool.ts

Lines changed: 47 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -829,50 +829,59 @@ export abstract class AbstractPool<
829829
private async sendTaskFunctionOperationToWorkers(
830830
message: MessageValue<Data>,
831831
): Promise<boolean> {
832-
return await new Promise<boolean>((resolve, reject) => {
833-
const responsesReceived: MessageValue<Response>[] = []
834-
const taskFunctionOperationsListener = (
835-
message: MessageValue<Response>,
836-
): void => {
837-
this.checkMessageWorkerId(message)
838-
if (message.taskFunctionOperationStatus != null) {
839-
responsesReceived.push(message)
840-
if (responsesReceived.length === this.workerNodes.length) {
841-
if (
842-
responsesReceived.every(
843-
(message) => message.taskFunctionOperationStatus === true,
844-
)
845-
) {
846-
resolve(true)
847-
} else if (
848-
responsesReceived.some(
849-
(message) => message.taskFunctionOperationStatus === false,
850-
)
851-
) {
852-
const errorResponse = responsesReceived.find(
853-
(response) => response.taskFunctionOperationStatus === false,
854-
)
855-
reject(
856-
new Error(
857-
`Task function operation '${message.taskFunctionOperation}' failed on worker ${errorResponse?.workerId?.toString()} with error: '${errorResponse?.workerError?.error.message}'`,
858-
),
859-
)
860-
}
861-
this.deregisterWorkerMessageListener(
862-
this.getWorkerNodeKeyByWorkerId(message.workerId),
863-
taskFunctionOperationsListener,
832+
const taskFunctionOperationsListener = (
833+
message: MessageValue<Response>,
834+
resolve: (value: boolean | PromiseLike<boolean>) => void,
835+
reject: (reason?: unknown) => void,
836+
responsesReceived: MessageValue<Response>[],
837+
): void => {
838+
this.checkMessageWorkerId(message)
839+
if (message.taskFunctionOperationStatus != null) {
840+
responsesReceived.push(message)
841+
if (responsesReceived.length >= this.workerNodes.length) {
842+
if (
843+
responsesReceived.every(
844+
(msg) => msg.taskFunctionOperationStatus === true,
845+
)
846+
) {
847+
resolve(true)
848+
} else {
849+
const errorResponse = responsesReceived.find(
850+
(msg) => msg.taskFunctionOperationStatus === false,
851+
)
852+
reject(
853+
new Error(
854+
`Task function operation '${message.taskFunctionOperation}' failed on worker ${errorResponse?.workerId?.toString()} with error: '${
855+
errorResponse?.workerError?.error?.message ?? 'Unknown error'
856+
}'`,
857+
),
864858
)
865859
}
866860
}
867861
}
862+
}
863+
let listener: (message: MessageValue<Response>) => void
864+
try {
865+
return await new Promise<boolean>((resolve, reject) => {
866+
const responsesReceived: MessageValue<Response>[] = []
867+
listener = (message: MessageValue<Response>) => {
868+
taskFunctionOperationsListener(
869+
message,
870+
resolve,
871+
reject,
872+
responsesReceived,
873+
)
874+
}
875+
for (const workerNodeKey of this.workerNodes.keys()) {
876+
this.registerWorkerMessageListener(workerNodeKey, listener)
877+
this.sendToWorker(workerNodeKey, message)
878+
}
879+
})
880+
} finally {
868881
for (const workerNodeKey of this.workerNodes.keys()) {
869-
this.registerWorkerMessageListener(
870-
workerNodeKey,
871-
taskFunctionOperationsListener,
872-
)
873-
this.sendToWorker(workerNodeKey, message)
882+
this.deregisterWorkerMessageListener(workerNodeKey, listener!)
874883
}
875-
})
884+
}
876885
}
877886

878887
/** @inheritDoc */

src/pools/selection-strategies/abstract-worker-choice-strategy.ts

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,16 +116,21 @@ export abstract class AbstractWorkerChoiceStrategy<
116116
}
117117

118118
/**
119-
* Check the next worker node key.
119+
* Check the worker node key.
120+
* @param workerNodeKey - The worker node key to check.
121+
* @returns The worker node key if it is valid, otherwise undefined.
120122
*/
121-
protected checkNextWorkerNodeKey(): void {
123+
protected checkWorkerNodeKey(
124+
workerNodeKey: number | undefined,
125+
): number | undefined {
122126
if (
123-
this.nextWorkerNodeKey != null &&
124-
(this.nextWorkerNodeKey < 0 ||
125-
!this.isWorkerNodeReady(this.nextWorkerNodeKey))
127+
workerNodeKey == null ||
128+
workerNodeKey < 0 ||
129+
workerNodeKey >= this.pool.workerNodes.length
126130
) {
127-
delete this.nextWorkerNodeKey
131+
return undefined
128132
}
133+
return workerNodeKey
129134
}
130135

131136
/**

src/pools/selection-strategies/fair-share-worker-choice-strategy.ts

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,28 +76,44 @@ export class FairShareWorkerChoiceStrategy<
7676
}
7777

7878
/** @inheritDoc */
79-
public remove(): boolean {
79+
public remove(workerNodeKey: number): boolean {
80+
if (
81+
this.pool.workerNodes[workerNodeKey]?.strategyData
82+
?.virtualTaskEndTimestamp != null
83+
) {
84+
delete this.pool.workerNodes[workerNodeKey].strategyData
85+
.virtualTaskEndTimestamp
86+
}
8087
return true
8188
}
8289

8390
private fairShareNextWorkerNodeKey(): number | undefined {
84-
return this.pool.workerNodes.reduce(
85-
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
91+
const chosenWorkerNodeKey = this.pool.workerNodes.reduce(
92+
(minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => {
93+
if (!this.isWorkerNodeReady(workerNodeKey)) {
94+
return minWorkerNodeKey
95+
}
96+
if (minWorkerNodeKey === -1) {
97+
workerNode.strategyData = {
98+
virtualTaskEndTimestamp: this
99+
.computeWorkerNodeVirtualTaskEndTimestamp(workerNodeKey),
100+
}
101+
return workerNodeKey
102+
}
86103
if (workerNode.strategyData?.virtualTaskEndTimestamp == null) {
87104
workerNode.strategyData = {
88105
virtualTaskEndTimestamp: this
89106
.computeWorkerNodeVirtualTaskEndTimestamp(workerNodeKey),
90107
}
91108
}
92-
return this.isWorkerNodeReady(workerNodeKey) &&
93-
workerNode.strategyData.virtualTaskEndTimestamp! <
94-
workerNodes[minWorkerNodeKey].strategyData!
95-
.virtualTaskEndTimestamp!
109+
return workerNode.strategyData.virtualTaskEndTimestamp! <
110+
workerNodes[minWorkerNodeKey].strategyData!.virtualTaskEndTimestamp!
96111
? workerNodeKey
97112
: minWorkerNodeKey
98113
},
99-
0,
114+
-1,
100115
)
116+
return chosenWorkerNodeKey === -1 ? undefined : chosenWorkerNodeKey
101117
}
102118

103119
/**

src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -142,16 +142,17 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy<
142142
return true
143143
}
144144
if (
145-
this.workerNodeId === workerNodeKey &&
146-
this.workerNodeId > this.pool.workerNodes.length - 1
145+
this.nextWorkerNodeKey != null &&
146+
this.nextWorkerNodeKey >= workerNodeKey
147147
) {
148-
this.workerNodeId = this.pool.workerNodes.length - 1
148+
this.nextWorkerNodeKey =
149+
(this.nextWorkerNodeKey - 1 + this.pool.workerNodes.length) %
150+
this.pool.workerNodes.length
149151
}
150-
if (
151-
this.previousWorkerNodeKey === workerNodeKey &&
152-
this.previousWorkerNodeKey > this.pool.workerNodes.length - 1
153-
) {
154-
this.previousWorkerNodeKey = this.pool.workerNodes.length - 1
152+
if (this.workerNodeId >= workerNodeKey) {
153+
this.workerNodeId =
154+
(this.workerNodeId - 1 + this.pool.workerNodes.length) %
155+
this.pool.workerNodes.length
155156
}
156157
return true
157158
}

src/pools/selection-strategies/least-busy-worker-choice-strategy.ts

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,17 +69,23 @@ export class LeastBusyWorkerChoiceStrategy<
6969
}
7070

7171
private leastBusyNextWorkerNodeKey(): number | undefined {
72-
return this.pool.workerNodes.reduce(
73-
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
74-
return this.isWorkerNodeReady(workerNodeKey) &&
75-
(workerNode.usage.waitTime.aggregate ?? 0) +
76-
(workerNode.usage.runTime.aggregate ?? 0) <
77-
(workerNodes[minWorkerNodeKey].usage.waitTime.aggregate ?? 0) +
78-
(workerNodes[minWorkerNodeKey].usage.runTime.aggregate ?? 0)
72+
const chosenWorkerNodeKey = this.pool.workerNodes.reduce(
73+
(minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => {
74+
if (!this.isWorkerNodeReady(workerNodeKey)) {
75+
return minWorkerNodeKey
76+
}
77+
if (minWorkerNodeKey === -1) {
78+
return workerNodeKey
79+
}
80+
return (workerNode.usage.waitTime.aggregate ?? 0) +
81+
(workerNode.usage.runTime.aggregate ?? 0) <
82+
(workerNodes[minWorkerNodeKey].usage.waitTime.aggregate ?? 0) +
83+
(workerNodes[minWorkerNodeKey].usage.runTime.aggregate ?? 0)
7984
? workerNodeKey
8085
: minWorkerNodeKey
8186
},
82-
0,
87+
-1,
8388
)
89+
return chosenWorkerNodeKey === -1 ? undefined : chosenWorkerNodeKey
8490
}
8591
}

src/pools/selection-strategies/least-elu-worker-choice-strategy.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,21 @@ export class LeastEluWorkerChoiceStrategy<
6565
}
6666

6767
private leastEluNextWorkerNodeKey(): number | undefined {
68-
return this.pool.workerNodes.reduce(
69-
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
70-
return this.isWorkerNodeReady(workerNodeKey) &&
71-
(workerNode.usage.elu.active.aggregate ?? 0) <
72-
(workerNodes[minWorkerNodeKey].usage.elu.active.aggregate ?? 0)
68+
const chosenWorkerNodeKey = this.pool.workerNodes.reduce(
69+
(minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => {
70+
if (!this.isWorkerNodeReady(workerNodeKey)) {
71+
return minWorkerNodeKey
72+
}
73+
if (minWorkerNodeKey === -1) {
74+
return workerNodeKey
75+
}
76+
return (workerNode.usage.elu.active.aggregate ?? 0) <
77+
(workerNodes[minWorkerNodeKey].usage.elu.active.aggregate ?? 0)
7378
? workerNodeKey
7479
: minWorkerNodeKey
7580
},
76-
0,
81+
-1,
7782
)
83+
return chosenWorkerNodeKey === -1 ? undefined : chosenWorkerNodeKey
7884
}
7985
}

src/pools/selection-strategies/least-used-worker-choice-strategy.ts

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,23 @@ export class LeastUsedWorkerChoiceStrategy<
5050
}
5151

5252
private leastUsedNextWorkerNodeKey(): number | undefined {
53-
return this.pool.workerNodes.reduce(
54-
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
55-
return this.isWorkerNodeReady(workerNodeKey) &&
56-
workerNode.usage.tasks.executing + workerNode.usage.tasks.queued <
57-
workerNodes[minWorkerNodeKey].usage.tasks.executing +
58-
workerNodes[minWorkerNodeKey].usage.tasks.queued
53+
const chosenWorkerNodeKey = this.pool.workerNodes.reduce(
54+
(minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => {
55+
if (!this.isWorkerNodeReady(workerNodeKey)) {
56+
return minWorkerNodeKey
57+
}
58+
if (minWorkerNodeKey === -1) {
59+
return workerNodeKey
60+
}
61+
return workerNode.usage.tasks.executing +
62+
workerNode.usage.tasks.queued <
63+
workerNodes[minWorkerNodeKey].usage.tasks.executing +
64+
workerNodes[minWorkerNodeKey].usage.tasks.queued
5965
? workerNodeKey
6066
: minWorkerNodeKey
6167
},
62-
0,
68+
-1,
6369
)
70+
return chosenWorkerNodeKey === -1 ? undefined : chosenWorkerNodeKey
6471
}
6572
}

0 commit comments

Comments
 (0)