Skip to content

Commit be6499f

Browse files
Merge pull request #38 from poolifier/fix-worker-node-readiness
fix: wait for worker node readiness
2 parents b3d46cf + d725d79 commit be6499f

20 files changed

+427
-440
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ and this project adheres to
88

99
## [Unreleased]
1010

11+
### Fixed
12+
13+
- Ensure worker choice strategies implementation wait for worker node readiness.
14+
1115
## [0.1.6] - 2023-12-18
1216

1317
### Fixed

docs/api.md

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,6 @@ An object with these properties:
151151
object to use in this pool.\
152152
Properties:
153153

154-
- `retries` (optional) - The number of retries to perform if no worker is
155-
eligible.
156154
- `measurement` (optional) - The measurement to use in worker choice
157155
strategies: `runTime`, `waitTime` <!-- or `elu`. -->
158156
- `runTime` (optional) - Use the tasks
@@ -170,9 +168,8 @@ An object with these properties:
170168
- `weights` (optional) - The worker weights to use in weighted round robin
171169
worker choice strategies: `{ 0: 200, 1: 300, ..., n: 100 }`.
172170

173-
Default:
174-
`{ retries: 6, runTime: { median: false }, waitTime: { median: false } }`
175-
<!-- `{ retries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }` -->
171+
Default: `{ runTime: { median: false }, waitTime: { median: false } }`
172+
<!-- `{ runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }` -->
176173

177174
- `startWorkers` (optional) - Start the minimum number of workers at pool
178175
initialization.\

src/pools/abstract-pool.ts

Lines changed: 51 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import type {
88
import {
99
average,
1010
DEFAULT_TASK_NAME,
11-
DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
1211
EMPTY_FUNCTION,
1312
exponentialDelay,
1413
isKillBehavior,
@@ -78,11 +77,6 @@ export abstract class AbstractPool<
7877
/** @inheritDoc */
7978
public emitter?: EventEmitter
8079

81-
/**
82-
* Dynamic pool maximum size property placeholder.
83-
*/
84-
protected readonly max?: number
85-
8680
/**
8781
* The task execution response promise map:
8882
* - `key`: The message id of each submitted task.
@@ -133,22 +127,25 @@ export abstract class AbstractPool<
133127
/**
134128
* Constructs a new poolifier pool.
135129
*
136-
* @param numberOfWorkers - Number of workers that this pool should manage.
130+
* @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
137131
* @param fileURL - URL to the worker file.
138132
* @param opts - Options for the pool.
133+
* @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
139134
*/
140135
public constructor(
141-
protected readonly numberOfWorkers: number,
136+
protected readonly minimumNumberOfWorkers: number,
142137
protected readonly fileURL: URL,
143138
protected readonly opts: PoolOptions<Data>,
139+
protected readonly maximumNumberOfWorkers?: number,
144140
) {
145141
if (!this.isMain()) {
146142
throw new Error(
147143
'Cannot start a pool from a worker with the same type as the pool',
148144
)
149145
}
146+
this.checkPoolType()
150147
checkFileURL(this.fileURL)
151-
this.checkNumberOfWorkers(this.numberOfWorkers)
148+
this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
152149
this.checkPoolOptions(this.opts)
153150

154151
this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
@@ -183,20 +180,28 @@ export abstract class AbstractPool<
183180
this.startTimestamp = performance.now()
184181
}
185182

186-
private checkNumberOfWorkers(numberOfWorkers: number): void {
187-
if (numberOfWorkers == null) {
183+
private checkPoolType(): void {
184+
if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
185+
throw new Error(
186+
'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization',
187+
)
188+
}
189+
}
190+
191+
private checkMinimumNumberOfWorkers(minimumNumberOfWorkers: number): void {
192+
if (minimumNumberOfWorkers == null) {
188193
throw new Error(
189194
'Cannot instantiate a pool without specifying the number of workers',
190195
)
191-
} else if (!Number.isSafeInteger(numberOfWorkers)) {
196+
} else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
192197
throw new TypeError(
193198
'Cannot instantiate a pool with a non safe integer number of workers',
194199
)
195-
} else if (numberOfWorkers < 0) {
200+
} else if (minimumNumberOfWorkers < 0) {
196201
throw new RangeError(
197202
'Cannot instantiate a pool with a negative number of workers',
198203
)
199-
} else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
204+
} else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
200205
throw new RangeError('Cannot instantiate a fixed pool with zero worker')
201206
}
202207
}
@@ -212,9 +217,8 @@ export abstract class AbstractPool<
212217
this.checkValidWorkerChoiceStrategyOptions(
213218
opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions,
214219
)
215-
this.opts.workerChoiceStrategyOptions = {
216-
...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
217-
...opts.workerChoiceStrategyOptions,
220+
if (opts.workerChoiceStrategyOptions != null) {
221+
this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
218222
}
219223
this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
220224
this.opts.enableEvents = opts.enableEvents ?? true
@@ -243,25 +247,10 @@ export abstract class AbstractPool<
243247
'Invalid worker choice strategy options: must be a plain object',
244248
)
245249
}
246-
if (
247-
workerChoiceStrategyOptions?.retries != null &&
248-
!Number.isSafeInteger(workerChoiceStrategyOptions.retries)
249-
) {
250-
throw new TypeError(
251-
'Invalid worker choice strategy options: retries must be an integer',
252-
)
253-
}
254-
if (
255-
workerChoiceStrategyOptions?.retries != null &&
256-
workerChoiceStrategyOptions.retries < 0
257-
) {
258-
throw new RangeError(
259-
`Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`,
260-
)
261-
}
262250
if (
263251
workerChoiceStrategyOptions?.weights != null &&
264-
Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
252+
Object.keys(workerChoiceStrategyOptions.weights).length !==
253+
(this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
265254
) {
266255
throw new Error(
267256
'Invalid worker choice strategy options: must have a weight for each worker node',
@@ -292,11 +281,11 @@ export abstract class AbstractPool<
292281
started: this.started,
293282
ready: this.ready,
294283
strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
295-
minSize: this.minSize,
296-
maxSize: this.maxSize,
297-
...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
284+
minSize: this.minimumNumberOfWorkers,
285+
maxSize: (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers),
286+
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
298287
.runTime.aggregate &&
299-
this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
288+
this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
300289
.waitTime.aggregate &&
301290
{ utilization: round(this.utilization) }),
302291
workerNodes: this.workerNodes.length,
@@ -351,7 +340,7 @@ export abstract class AbstractPool<
351340
accumulator + workerNode.usage.tasks.failed,
352341
0,
353342
),
354-
...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
343+
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
355344
.runTime.aggregate && {
356345
runTime: {
357346
minimum: round(
@@ -368,7 +357,7 @@ export abstract class AbstractPool<
368357
),
369358
),
370359
),
371-
...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
360+
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
372361
.runTime.average && {
373362
average: round(
374363
average(
@@ -380,7 +369,7 @@ export abstract class AbstractPool<
380369
),
381370
),
382371
}),
383-
...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
372+
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
384373
.runTime.median && {
385374
median: round(
386375
median(
@@ -394,7 +383,7 @@ export abstract class AbstractPool<
394383
}),
395384
},
396385
}),
397-
...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
386+
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
398387
.waitTime.aggregate && {
399388
waitTime: {
400389
minimum: round(
@@ -411,7 +400,7 @@ export abstract class AbstractPool<
411400
),
412401
),
413402
),
414-
...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
403+
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
415404
.waitTime.average && {
416405
average: round(
417406
average(
@@ -423,7 +412,7 @@ export abstract class AbstractPool<
423412
),
424413
),
425414
}),
426-
...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
415+
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
427416
.waitTime.median && {
428417
median: round(
429418
median(
@@ -451,7 +440,7 @@ export abstract class AbstractPool<
451440
? accumulator + 1
452441
: accumulator,
453442
0,
454-
) >= this.minSize
443+
) >= this.minimumNumberOfWorkers
455444
)
456445
}
457446

@@ -462,7 +451,7 @@ export abstract class AbstractPool<
462451
*/
463452
private get utilization(): number {
464453
const poolTimeCapacity = (performance.now() - this.startTimestamp) *
465-
this.maxSize
454+
(this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
466455
const totalTasksRunTime = this.workerNodes.reduce(
467456
(accumulator, workerNode) =>
468457
accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
@@ -488,20 +477,6 @@ export abstract class AbstractPool<
488477
*/
489478
protected abstract get worker(): WorkerType
490479

491-
/**
492-
* The pool minimum size.
493-
*/
494-
protected get minSize(): number {
495-
return this.numberOfWorkers
496-
}
497-
498-
/**
499-
* The pool maximum size.
500-
*/
501-
protected get maxSize(): number {
502-
return this.max ?? this.numberOfWorkers
503-
}
504-
505480
/**
506481
* Checks if the worker id sent in the received message from a worker is valid.
507482
*
@@ -559,11 +534,11 @@ export abstract class AbstractPool<
559534
workerChoiceStrategyOptions: WorkerChoiceStrategyOptions,
560535
): void {
561536
this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
562-
this.opts.workerChoiceStrategyOptions = {
563-
...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
564-
...workerChoiceStrategyOptions,
537+
if (workerChoiceStrategyOptions != null) {
538+
this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
565539
}
566540
this.workerChoiceStrategyContext.setOptions(
541+
this,
567542
this.opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions,
568543
)
569544
}
@@ -611,7 +586,9 @@ export abstract class AbstractPool<
611586
tasksQueueOptions: TasksQueueOptions,
612587
): TasksQueueOptions {
613588
return {
614-
...getDefaultTasksQueueOptions(this.maxSize),
589+
...getDefaultTasksQueueOptions(
590+
this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
591+
),
615592
...tasksQueueOptions,
616593
}
617594
}
@@ -664,7 +641,8 @@ export abstract class AbstractPool<
664641
* The pool filling boolean status.
665642
*/
666643
protected get full(): boolean {
667-
return this.workerNodes.length >= this.maxSize
644+
return this.workerNodes.length >=
645+
(this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
668646
}
669647

670648
/**
@@ -961,7 +939,7 @@ export abstract class AbstractPool<
961939
(accumulator, workerNode) =>
962940
!workerNode.info.dynamic ? accumulator + 1 : accumulator,
963941
0,
964-
) < this.numberOfWorkers
942+
) < this.minimumNumberOfWorkers
965943
) {
966944
this.createAndSetupWorkerNode()
967945
}
@@ -993,9 +971,7 @@ export abstract class AbstractPool<
993971
this.started = false
994972
}
995973

996-
protected async sendKillMessageToWorker(
997-
workerNodeKey: number,
998-
): Promise<void> {
974+
private async sendKillMessageToWorker(workerNodeKey: number): Promise<void> {
999975
await new Promise<void>((resolve, reject) => {
1000976
if (this.workerNodes?.[workerNodeKey] == null) {
1001977
// FIXME: should reject with an error
@@ -1034,7 +1010,9 @@ export abstract class AbstractPool<
10341010
'taskFinished',
10351011
flushedTasks,
10361012
this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
1037-
getDefaultTasksQueueOptions(this.maxSize).tasksFinishedTimeout,
1013+
getDefaultTasksQueueOptions(
1014+
this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
1015+
).tasksFinishedTimeout,
10381016
)
10391017
await this.sendKillMessageToWorker(workerNodeKey)
10401018
workerNode.terminate()
@@ -1764,7 +1742,9 @@ export abstract class AbstractPool<
17641742
{
17651743
workerOptions: this.opts.workerOptions,
17661744
tasksQueueBackPressureSize: this.opts.tasksQueueOptions?.size ??
1767-
getDefaultTasksQueueOptions(this.maxSize).size,
1745+
getDefaultTasksQueueOptions(
1746+
this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
1747+
).size,
17681748
},
17691749
)
17701750
// Flag the worker node as ready at pool startup.

0 commit comments

Comments
 (0)