Skip to content

Commit 189b2bf

Browse files
committed
fix: fixes to pool and worker initialization
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
1 parent 2f717e7 commit 189b2bf

File tree

9 files changed

+66
-21
lines changed

9 files changed

+66
-21
lines changed

CHANGELOG.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,8 @@ and this project adheres to
1010

1111
### Fixed
1212

13-
- Ensure worker ready response can be received only once.
14-
15-
<!-- - Ensure pool ready event can be emitted only once. -->
13+
- Ensure pool cannot be initialized from a worker.
14+
- Ensure worker initialization is done once.
1615

1716
## [0.0.6] - 2023-10-20
1817

deno.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
{
22
"$schema": "https://raw.githubusercontent.com/denoland/deno/master/cli/schemas/config-file.v1.json",
33
"compilerOptions": {
4+
"lib": ["deno.worker"],
45
"strict": true
56
},
67
"tasks": {

examples/javascript/multiFunctionExample.js

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,6 @@ import {
66
const pool = new FixedThreadPool(
77
availableParallelism(),
88
new URL('./multiFunctionWorker.js', import.meta.url),
9-
{
10-
errorHandler: (e) => console.error(e),
11-
onlineHandler: () => console.info('worker is online'),
12-
},
139
)
1410

1511
pool

src/pools/abstract-pool.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,11 @@ export abstract class AbstractPool<
133133
protected readonly fileURL: URL,
134134
protected readonly opts: PoolOptions<Data>,
135135
) {
136+
if (!this.isMain()) {
137+
throw new Error(
138+
'Cannot start a pool from a worker with the same type as the pool',
139+
)
140+
}
136141
checkFileURL(this.fileURL)
137142
this.checkNumberOfWorkers(this.numberOfWorkers)
138143
this.checkPoolOptions(this.opts)
@@ -998,6 +1003,11 @@ export abstract class AbstractPool<
9981003
/* Intentionally empty */
9991004
}
10001005

1006+
/**
1007+
* Should return whether the worker is the main worker or not.
1008+
*/
1009+
protected abstract isMain(): boolean
1010+
10011011
/**
10021012
* Hook executed before the worker task execution.
10031013
* Can be overridden.
@@ -1536,11 +1546,6 @@ export abstract class AbstractPool<
15361546
const workerInfo = this.getWorkerInfo(
15371547
this.getWorkerNodeKeyByWorkerId(message.workerId),
15381548
)
1539-
if (!this.started && workerInfo.ready) {
1540-
throw new Error(
1541-
`Ready response already received by worker ${message.workerId}`,
1542-
)
1543-
}
15441549
workerInfo.ready = message.ready as boolean
15451550
workerInfo.taskFunctionNames = message.taskFunctionNames
15461551
if (this.ready) {

src/pools/thread/fixed.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { MessageValue } from '../../utility-types.ts'
2+
import { isWebWorker } from '../../utils.ts'
23
import { AbstractPool } from '../abstract-pool.ts'
34
import { type PoolOptions, type PoolType, PoolTypes } from '../pool.ts'
45
import { type WorkerType, WorkerTypes } from '../worker.ts'
@@ -41,6 +42,10 @@ export class FixedThreadPool<
4142
) {
4243
super(numberOfThreads, fileURL, opts)
4344
}
45+
/** @inheritDoc */
46+
protected isMain(): boolean {
47+
return !isWebWorker()
48+
}
4449

4550
/** @inheritDoc */
4651
protected async destroyWorkerNode(workerNodeKey: number): Promise<void> {

src/utils.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,3 +258,10 @@ export const once = <T, A extends any[], R>(
258258
return result
259259
}
260260
}
261+
262+
export const isWebWorker = () => {
263+
return (
264+
typeof WorkerGlobalScope !== 'undefined' &&
265+
self instanceof WorkerGlobalScope
266+
)
267+
}

src/worker/abstract-worker.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ export abstract class AbstractWorker<
5858
/**
5959
* Worker id.
6060
*/
61-
protected abstract id: string
61+
protected abstract id?: string
6262
/**
6363
* Task function(s) processed by the worker when the pool's `execution` function is invoked.
6464
*/
@@ -77,23 +77,29 @@ export abstract class AbstractWorker<
7777
protected activeInterval?: number
7878
/**
7979
* Constructs a new poolifier worker.
80-
*
80+
* @param isMain - Whether this is the main worker or not.
8181
* @param mainWorker - Reference to main worker.
8282
* @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked. The first function is the default function.
8383
* @param opts - Options for the worker.
8484
*/
8585
public constructor(
86+
private readonly isMain: boolean,
8687
private readonly mainWorker: MainWorker,
8788
taskFunctions: TaskFunction<Data, Response> | TaskFunctions<Data, Response>,
8889
protected opts: WorkerOptions = DEFAULT_WORKER_OPTIONS,
8990
) {
91+
if (this.isMain == null) {
92+
throw new Error('isMain parameter is mandatory')
93+
}
9094
this.checkTaskFunctions(taskFunctions)
9195
this.checkWorkerOptions(this.opts)
92-
this.getMainWorker().addEventListener(
93-
'message',
94-
this.handleReadyMessageEvent.bind(this),
95-
{ once: true },
96-
)
96+
if (!this.isMain) {
97+
this.getMainWorker().addEventListener(
98+
'message',
99+
this.handleReadyMessageEvent.bind(this),
100+
{ once: true },
101+
)
102+
}
97103
}
98104

99105
private checkWorkerOptions(opts: WorkerOptions): void {

src/worker/thread-worker.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { MessageValue, MsgEvent } from '../utility-types.ts'
22
import { AbstractWorker } from './abstract-worker.ts'
33
import type { WorkerOptions } from './worker-options.ts'
44
import type { TaskFunction, TaskFunctions } from './task-functions.ts'
5+
import { isWebWorker } from '../utils.ts'
56

67
/**
78
* A thread worker used by a poolifier `ThreadPool`.
@@ -26,7 +27,7 @@ export class ThreadWorker<
2627
*/
2728
private port!: MessagePort
2829
/** @inheritdoc */
29-
public id!: string
30+
public id?: string
3031
/**
3132
* Constructs a new poolifier thread worker.
3233
*
@@ -38,6 +39,7 @@ export class ThreadWorker<
3839
opts: WorkerOptions = {},
3940
) {
4041
super(
42+
!isWebWorker(),
4143
self,
4244
taskFunctions,
4345
opts,
@@ -48,7 +50,9 @@ export class ThreadWorker<
4850
protected handleReadyMessageEvent(
4951
message: MsgEvent<MessageValue<Data>>,
5052
): void {
51-
if (
53+
if (this.id != null) {
54+
return
55+
} else if (
5256
message.data?.workerId != null &&
5357
message.data?.ready === false &&
5458
message.data?.port != null

tests/pools/abstract-pool.test.mjs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ Deno.test({
2424
name: 'Abstract pool test suite',
2525
fn: async (t) => {
2626
const numberOfWorkers = 2
27+
class StubPoolWithIsMain extends FixedThreadPool {
28+
isMain() {
29+
return false
30+
}
31+
}
2732

2833
await t.step('Verify that pool can be created and destroyed', async () => {
2934
const pool = new FixedThreadPool(
@@ -37,6 +42,23 @@ Deno.test({
3742
await pool.destroy()
3843
})
3944

45+
await t.step(
46+
'Verify that pool cannot be created from a non main thread/process',
47+
() => {
48+
expect(
49+
() =>
50+
new StubPoolWithIsMain(
51+
numberOfWorkers,
52+
'./tests/worker-files/thread/testWorker.mjs',
53+
),
54+
).toThrow(
55+
new Error(
56+
'Cannot start a pool from a worker with the same type as the pool',
57+
),
58+
)
59+
},
60+
)
61+
4062
await t.step('Verify that pool statuses properties are set', async () => {
4163
const pool = new FixedThreadPool(
4264
numberOfWorkers,

0 commit comments

Comments
 (0)