Skip to content

Commit 8dd32ea

Browse files
authored
feat: add teardown option (#115)
1 parent 09034bc commit 8dd32ea

File tree

5 files changed

+110
-3
lines changed

5 files changed

+110
-3
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ We have a similar API to Piscina, so for more information, you can read Piscina'
180180
- `runtime`: Used to pick worker runtime. Default value is `worker_threads`.
181181
- `worker_threads`: Runs workers in [`node:worker_threads`](https://nodejs.org/api/worker_threads.html). For `main thread <-> worker thread` communication you can use [`MessagePort`](https://nodejs.org/api/worker_threads.html#class-messageport) in the `pool.run()` method's [`transferList` option](https://nodejs.org/api/worker_threads.html#portpostmessagevalue-transferlist). See [example](#main-thread---worker-thread-communication).
182182
- `child_process`: Runs workers in [`node:child_process`](https://nodejs.org/api/child_process.html). For `main thread <-> worker process` communication you can use `TinypoolChannel` in the `pool.run()` method's `channel` option. For filtering out the Tinypool's internal messages see `TinypoolWorkerMessage`. See [example](#main-process---worker-process-communication).
183+
- `teardown`: name of the function in file that should be called before worker is terminated. Must be named exported.
183184
184185
#### Pool methods
185186

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
"scripts": {
3838
"test": "vitest",
3939
"bench": "vitest bench",
40-
"dev": "tsdown --watch",
40+
"dev": "tsdown --watch ./src",
4141
"build": "tsdown",
4242
"publish": "clean-publish",
4343
"lint": "eslint --max-warnings=0",

src/index.ts

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ interface Options {
152152
taskQueue?: TaskQueue
153153
trackUnmanagedFds?: boolean
154154
isolateWorkers?: boolean
155+
teardown?: string
155156
}
156157

157158
interface FilledOptions extends Options {
@@ -454,18 +455,24 @@ class WorkerInfo extends AsynchronouslyCreatedResource {
454455
usedMemory?: number
455456
onMessage: ResponseCallback
456457
shouldRecycle?: boolean
458+
filename?: string | null
459+
teardown?: string
457460

458461
constructor(
459462
worker: TinypoolWorker,
460463
port: MessagePort,
461464
workerId: number,
462465
freeWorkerId: () => void,
463-
onMessage: ResponseCallback
466+
onMessage: ResponseCallback,
467+
filename?: string | null,
468+
teardown?: string
464469
) {
465470
super()
466471
this.worker = worker
467472
this.workerId = workerId
468473
this.freeWorkerId = freeWorkerId
474+
this.teardown = teardown
475+
this.filename = filename
469476
this.port = port
470477
this.port.on('message', (message: ResponseMessage) =>
471478
this._handleResponse(message)
@@ -486,6 +493,25 @@ class WorkerInfo extends AsynchronouslyCreatedResource {
486493
reject = rej
487494
})
488495

496+
if (this.teardown && this.filename) {
497+
const { teardown, filename } = this
498+
499+
await new Promise((resolve, reject) => {
500+
this.postTask(
501+
new TaskInfo(
502+
{},
503+
[],
504+
filename,
505+
teardown,
506+
(error, result) => (error ? reject(error) : resolve(result)),
507+
null,
508+
1,
509+
undefined
510+
)
511+
)
512+
})
513+
}
514+
489515
const timer = timeout
490516
? setTimeout(
491517
() => reject(new Error('Failed to terminate worker')),
@@ -743,7 +769,9 @@ class ThreadPool {
743769
port1,
744770
workerId!,
745771
() => workerIds.set(workerId, true),
746-
onMessage
772+
onMessage,
773+
this.options.filename,
774+
this.options.teardown
747775
)
748776
if (this.startingUp) {
749777
// There is no point in waiting for the initial set of Workers to indicate

test/fixtures/teardown.mjs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import { setTimeout } from 'node:timers/promises'
2+
3+
let state = 0
4+
5+
/** @type {import("node:worker_threads").MessagePort } */
6+
let port
7+
8+
export default function task(options) {
9+
port ||= options?.port
10+
state++
11+
12+
return `Output of task #${state}`
13+
}
14+
15+
export async function namedTeardown() {
16+
await setTimeout(50)
17+
18+
port?.postMessage(`Teardown of task #${state}`)
19+
}

test/teardown.test.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import { dirname, resolve } from 'node:path'
2+
import { Tinypool } from 'tinypool'
3+
import { fileURLToPath } from 'node:url'
4+
import { MessageChannel } from 'node:worker_threads'
5+
6+
const __dirname = dirname(fileURLToPath(import.meta.url))
7+
8+
test('isolated workers call teardown on worker recycle', async () => {
9+
const pool = new Tinypool({
10+
filename: resolve(__dirname, 'fixtures/teardown.mjs'),
11+
minThreads: 1,
12+
maxThreads: 1,
13+
isolateWorkers: true,
14+
teardown: 'namedTeardown',
15+
})
16+
17+
for (const _ of [1, 2, 3, 4, 5]) {
18+
const { port1, port2 } = new MessageChannel()
19+
const promise = new Promise((resolve) => port2.on('message', resolve))
20+
21+
const output = await pool.run({ port: port1 }, { transferList: [port1] })
22+
expect(output).toBe('Output of task #1')
23+
24+
await expect(promise).resolves.toBe('Teardown of task #1')
25+
}
26+
})
27+
28+
test('non-isolated workers call teardown on worker recycle', async () => {
29+
const pool = new Tinypool({
30+
filename: resolve(__dirname, 'fixtures/teardown.mjs'),
31+
minThreads: 1,
32+
maxThreads: 1,
33+
isolateWorkers: false,
34+
teardown: 'namedTeardown',
35+
})
36+
37+
function unexpectedTeardown(message: string) {
38+
assert.fail(
39+
`Teardown should not have been called yet. Received "${message}"`
40+
)
41+
}
42+
43+
const { port1, port2 } = new MessageChannel()
44+
45+
for (const index of [1, 2, 3, 4, 5]) {
46+
port2.on('message', unexpectedTeardown)
47+
48+
const transferList = index === 1 ? [port1] : []
49+
50+
const output = await pool.run({ port: transferList[0] }, { transferList })
51+
expect(output).toBe(`Output of task #${index}`)
52+
}
53+
54+
port2.off('message', unexpectedTeardown)
55+
const promise = new Promise((resolve) => port2.on('message', resolve))
56+
57+
await pool.destroy()
58+
await expect(promise).resolves.toMatchInlineSnapshot(`"Teardown of task #5"`)
59+
})

0 commit comments

Comments
 (0)