Skip to content

Commit b93e98b

Browse files
committed
refactor: code cleanups
Signed-off-by: Jérôme Benoit <[email protected]>
1 parent 6df6912 commit b93e98b

File tree

10 files changed

+51
-79
lines changed

10 files changed

+51
-79
lines changed

src/circular-buffer.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
export const defaultBufferSize = 2048
55

66
/**
7-
* Circular buffer designed for positive numbers.
7+
* Circular buffer designed for numbers.
88
*
99
* @internal
1010
*/
@@ -25,7 +25,7 @@ export class CircularBuffer {
2525
this.writeIdx = 0
2626
this.maxArrayIdx = size - 1
2727
this.size = 0
28-
this.items = new Float32Array(size).fill(-1)
28+
this.items = new Float32Array(size)
2929
}
3030

3131
/**
@@ -71,7 +71,6 @@ export class CircularBuffer {
7171
return
7272
}
7373
const number = this.items[this.readIdx]
74-
this.items[this.readIdx] = -1
7574
this.readIdx = this.readIdx === this.maxArrayIdx ? 0 : this.readIdx + 1
7675
--this.size
7776
return number

src/pools/abstract-pool.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ export abstract class AbstractPool<
312312
...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
313313
.runTime.aggregate === true &&
314314
this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
315-
.waitTime.aggregate &&
315+
.waitTime.aggregate === true &&
316316
{
317317
utilization: round(this.utilization),
318318
}),

src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -142,16 +142,12 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy<
142142
return true
143143
}
144144
if (
145-
this.workerNodeId === workerNodeKey &&
146-
this.workerNodeId > this.pool.workerNodes.length - 1
145+
this.nextWorkerNodeKey != null &&
146+
this.nextWorkerNodeKey >= workerNodeKey
147147
) {
148-
this.workerNodeId = this.pool.workerNodes.length - 1
149-
}
150-
if (
151-
this.previousWorkerNodeKey === workerNodeKey &&
152-
this.previousWorkerNodeKey > this.pool.workerNodes.length - 1
153-
) {
154-
this.previousWorkerNodeKey = this.pool.workerNodes.length - 1
148+
this.nextWorkerNodeKey =
149+
(this.nextWorkerNodeKey - 1 + this.pool.workerNodes.length) %
150+
this.pool.workerNodes.length
155151
}
156152
return true
157153
}

src/pools/worker-node.ts

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
4242
public readonly tasksQueue: PriorityQueue<Task<Data>>
4343
/** @inheritdoc */
4444
public tasksQueueBackPressureSize: number
45-
private setBackPressureFlag: boolean
4645
private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4746

4847
/**
@@ -84,7 +83,6 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
8483
opts.tasksQueueBucketSize,
8584
opts.tasksQueuePriority,
8685
)
87-
this.setBackPressureFlag = false
8886
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
8987
}
9088

@@ -101,34 +99,22 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
10199
/** @inheritdoc */
102100
public enqueueTask(task: Task<Data>): number {
103101
const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority)
104-
if (
105-
!this.setBackPressureFlag &&
106-
this.hasBackPressure() &&
107-
!this.info.backPressure
108-
) {
109-
this.setBackPressureFlag = true
102+
if (this.hasBackPressure() && !this.info.backPressure) {
110103
this.info.backPressure = true
111104
this.dispatchEvent(
112105
new CustomEvent<WorkerNodeEventDetail>('backPressure', {
113106
detail: { workerId: this.info.id },
114107
}),
115108
)
116-
this.setBackPressureFlag = false
117109
}
118110
return tasksQueueSize
119111
}
120112

121113
/** @inheritdoc */
122114
public dequeueTask(bucket?: number): Task<Data> | undefined {
123115
const task = this.tasksQueue.dequeue(bucket)
124-
if (
125-
!this.setBackPressureFlag &&
126-
!this.hasBackPressure() &&
127-
this.info.backPressure
128-
) {
129-
this.setBackPressureFlag = true
116+
if (!this.hasBackPressure() && this.info.backPressure) {
130117
this.info.backPressure = false
131-
this.setBackPressureFlag = false
132118
}
133119
return task
134120
}

src/queues/abstract-fixed-queue.ts

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -72,23 +72,19 @@ export abstract class AbstractFixedQueue<T> implements IFixedQueue<T> {
7272
}
7373
}
7474
if (logicalIndex !== -1) {
75-
let toShiftIndex = this.start + logicalIndex
76-
if (toShiftIndex >= this.capacity) {
77-
toShiftIndex -= this.capacity
75+
let physicalShiftIndex = this.start + logicalIndex
76+
if (physicalShiftIndex >= this.capacity) {
77+
physicalShiftIndex -= this.capacity
7878
}
7979
for (let i = logicalIndex; i < this.size - 1; i++) {
80-
let nextIndex = toShiftIndex + 1
81-
if (nextIndex === this.capacity) {
82-
nextIndex = 0
80+
let nextPhysicalIndex = physicalShiftIndex + 1
81+
if (nextPhysicalIndex === this.capacity) {
82+
nextPhysicalIndex = 0
8383
}
84-
this.nodeArray[toShiftIndex] = this.nodeArray[nextIndex]
85-
toShiftIndex = nextIndex
84+
this.nodeArray[physicalShiftIndex] = this.nodeArray[nextPhysicalIndex]
85+
physicalShiftIndex = nextPhysicalIndex
8686
}
87-
let end = this.start + this.size - 1
88-
if (end >= this.capacity) {
89-
end -= this.capacity
90-
}
91-
this.nodeArray[end] = undefined
87+
this.nodeArray[physicalShiftIndex] = undefined
9288
--this.size
9389
return true
9490
}
@@ -101,11 +97,11 @@ export abstract class AbstractFixedQueue<T> implements IFixedQueue<T> {
10197
return undefined
10298
}
10399
const index = this.start
104-
--this.size
105100
++this.start
106101
if (this.start === this.capacity) {
107102
this.start = 0
108103
}
104+
--this.size
109105
return this.nodeArray[index]!.data
110106
}
111107

src/queues/priority-queue.ts

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -146,20 +146,19 @@ export class PriorityQueue<T> {
146146
let prev: PriorityQueueNode<T> | undefined
147147
if (bucket != null && bucket > 0) {
148148
let currentBucket = 1
149-
let searchNode: PriorityQueueNode<T> | undefined = this.tail
150-
while (searchNode != null) {
151-
if (currentBucket === bucket) {
152-
targetNode = searchNode
153-
break
154-
}
149+
while (targetNode?.next != null && currentBucket < bucket) {
150+
prev = targetNode
151+
targetNode = targetNode.next
155152
++currentBucket
156-
prev = searchNode
157-
searchNode = searchNode.next
158153
}
159-
}
160-
while (targetNode?.empty() === true && targetNode !== this.head) {
161-
prev = targetNode
162-
targetNode = targetNode.next
154+
if (currentBucket < bucket || targetNode?.empty() === true) {
155+
return undefined
156+
}
157+
} else {
158+
while (targetNode?.empty() === true && targetNode !== this.head) {
159+
prev = targetNode
160+
targetNode = targetNode.next
161+
}
163162
}
164163
if (targetNode?.empty() === true) {
165164
return undefined
@@ -212,11 +211,11 @@ export class PriorityQueue<T> {
212211

213212
const value = node.get(index)
214213
if (value == null) {
215-
index++
214+
++index
216215
return getNextValue()
217216
}
218217

219-
index++
218+
++index
220219
return { done: false, value }
221220
}
222221
return {

src/worker/utils.ts

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,8 @@ export const checkValidWorkerOptions = (
2222
}
2323
if (
2424
opts?.maxInactiveTime != null &&
25-
!Number.isSafeInteger(opts.maxInactiveTime)
25+
(!Number.isSafeInteger(opts.maxInactiveTime) || opts.maxInactiveTime < 5)
2626
) {
27-
throw new TypeError('maxInactiveTime option is not an integer')
28-
}
29-
if (opts?.maxInactiveTime != null && opts.maxInactiveTime < 5) {
3027
throw new TypeError(
3128
'maxInactiveTime option is not a positive integer greater or equal than 5',
3229
)
@@ -43,14 +40,7 @@ export const checkValidTaskFunctionObjectEntry = <
4340
name: string,
4441
fnObj: TaskFunctionObject<Data, Response>,
4542
): void => {
46-
if (typeof name !== 'string') {
47-
throw new TypeError('A taskFunctions parameter object key is not a string')
48-
}
49-
if (typeof name === 'string' && name.trim().length === 0) {
50-
throw new TypeError(
51-
'A taskFunctions parameter object key is an empty string',
52-
)
53-
}
43+
checkTaskFunctionName(name)
5444
if (typeof fnObj.taskFunction !== 'function') {
5545
throw new TypeError(
5646
`taskFunction object 'taskFunction' property '${fnObj.taskFunction}' is not a function`,
@@ -64,7 +54,7 @@ export const checkTaskFunctionName = (name: string): void => {
6454
if (typeof name !== 'string') {
6555
throw new TypeError('name parameter is not a string')
6656
}
67-
if (typeof name === 'string' && name.trim().length === 0) {
57+
if (name.trim().length === 0) {
6858
throw new TypeError('name parameter is an empty string')
6959
}
7060
}

tests/circular-buffer.test.mjs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,17 @@ describe('Circular buffer test suite', () => {
4141
it('Verify that circular buffer put() works as intended', () => {
4242
const circularBuffer = new CircularBuffer(4)
4343
circularBuffer.put(1)
44-
expect(circularBuffer.items).toStrictEqual(
45-
new Float32Array([1, -1, -1, -1]),
46-
)
44+
expect(circularBuffer.items).toStrictEqual(new Float32Array([1, 0, 0, 0]))
4745
expect(circularBuffer.readIdx).toBe(0)
4846
expect(circularBuffer.writeIdx).toBe(1)
4947
expect(circularBuffer.size).toBe(1)
5048
circularBuffer.put(2)
51-
expect(circularBuffer.items).toStrictEqual(new Float32Array([1, 2, -1, -1]))
49+
expect(circularBuffer.items).toStrictEqual(new Float32Array([1, 2, 0, 0]))
5250
expect(circularBuffer.readIdx).toBe(0)
5351
expect(circularBuffer.writeIdx).toBe(2)
5452
expect(circularBuffer.size).toBe(2)
5553
circularBuffer.put(3)
56-
expect(circularBuffer.items).toStrictEqual(new Float32Array([1, 2, 3, -1]))
54+
expect(circularBuffer.items).toStrictEqual(new Float32Array([1, 2, 3, 0]))
5755
expect(circularBuffer.readIdx).toBe(0)
5856
expect(circularBuffer.writeIdx).toBe(3)
5957
expect(circularBuffer.size).toBe(3)
@@ -155,10 +153,15 @@ describe('Circular buffer test suite', () => {
155153
it('Verify that circular buffer toArray() works as intended', () => {
156154
const circularBuffer = new CircularBuffer(4)
157155
circularBuffer.put(1)
156+
expect(circularBuffer.toArray()).toStrictEqual([1])
158157
circularBuffer.put(2)
158+
expect(circularBuffer.toArray()).toStrictEqual([1, 2])
159159
circularBuffer.put(3)
160+
expect(circularBuffer.toArray()).toStrictEqual([1, 2, 3])
160161
circularBuffer.put(4)
162+
expect(circularBuffer.toArray()).toStrictEqual([1, 2, 3, 4])
161163
circularBuffer.put(5)
164+
expect(circularBuffer.toArray()).toStrictEqual([2, 3, 4, 5])
162165
circularBuffer.put(6)
163166
expect(circularBuffer.toArray()).toStrictEqual([3, 4, 5, 6])
164167
})

tests/pools/worker-node.test.mjs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,6 @@ describe('Worker node test suite', () => {
286286
expect(threadWorkerNode.tasksQueueSize()).toBe(
287287
threadWorkerNode.tasksQueue.size,
288288
)
289-
expect(threadWorkerNode.setBackPressureFlag).toBe(false)
290289
expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
291290
})
292291

tests/worker/abstract-worker.test.mjs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,14 @@ describe('Abstract worker test suite', () => {
2929
new TypeError("killBehavior option '0' is not valid"),
3030
)
3131
expect(() => new ThreadWorker(() => {}, { maxInactiveTime: '' })).toThrow(
32-
new TypeError('maxInactiveTime option is not an integer'),
32+
new TypeError(
33+
'maxInactiveTime option is not a positive integer greater or equal than 5',
34+
),
3335
)
3436
expect(() => new ThreadWorker(() => {}, { maxInactiveTime: 0.5 })).toThrow(
35-
new TypeError('maxInactiveTime option is not an integer'),
37+
new TypeError(
38+
'maxInactiveTime option is not a positive integer greater or equal than 5',
39+
),
3640
)
3741
expect(() => new ThreadWorker(() => {}, { maxInactiveTime: 0 })).toThrow(
3842
new TypeError(
@@ -143,7 +147,7 @@ describe('Abstract worker test suite', () => {
143147
}
144148
const fn2 = ''
145149
expect(() => new ThreadWorker({ '': fn1 })).toThrow(
146-
new TypeError('A taskFunctions parameter object key is an empty string'),
150+
new TypeError('name parameter is an empty string'),
147151
)
148152
expect(() => new ThreadWorker({ fn1, fn2 })).toThrow(
149153
new TypeError(

0 commit comments

Comments
 (0)