Skip to content

Commit 7ff05c6

Browse files
feat!: add abortable task support (#81)
* feat!: add abortable task support Signed-off-by: Jérôme Benoit <[email protected]> * fix: properly handle task abortion worker side Signed-off-by: Jérôme Benoit <[email protected]> * fix: do not leak task at abortion Signed-off-by: Jérôme Benoit <[email protected]> --------- Signed-off-by: Jérôme Benoit <[email protected]>
1 parent 4cf71bf commit 7ff05c6

30 files changed

+710
-161
lines changed

.vscode/settings.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
"inheritDoc",
3030
"IWRR",
3131
"jaywcjlove",
32+
"Jérôme",
3233
"lcov",
3334
"libuv",
3435
"loglevel",

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ Please consult our [general guidelines](#general-guidelines).
5555
- Tasks stealing under back pressure ✔
5656
- Tasks redistribution on worker error ✔
5757
- Support for sync and async task function ✔
58+
- Support for abortable task function ✔
5859
- Support for multiple task functions with per task function queuing priority
5960
and tasks distribution strategy ✔
6061
- Support for task functions

benchmarks/benchmarks-utils.mjs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -258,14 +258,13 @@ const fibonacci = (n) => {
258258
const factorial = (n) => {
259259
if (n === 0 || n === 1) {
260260
return 1n
261-
} else {
262-
n = BigInt(n)
263-
let factorial = 1n
264-
for (let i = 1n; i <= n; i++) {
265-
factorial *= i
266-
}
267-
return factorial
268261
}
262+
n = BigInt(n)
263+
let factorial = 1n
264+
for (let i = 1n; i <= n; i++) {
265+
factorial *= i
266+
}
267+
return factorial
269268
}
270269

271270
const readWriteFiles = (

deno.json

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@
33
"version": "0.4.31",
44
"exports": "./src/mod.ts",
55
"compilerOptions": {
6-
"lib": [
7-
"deno.worker"
8-
],
6+
"lib": ["deno.worker"],
97
"strict": true
108
},
119
"tasks": {
@@ -26,9 +24,7 @@
2624
"documentation": "deno doc ./src/mod.ts"
2725
},
2826
"test": {
29-
"include": [
30-
"./tests/**/*.test.mjs"
31-
]
27+
"include": ["./tests/**/*.test.mjs"]
3228
},
3329
"fmt": {
3430
"semiColons": false,
@@ -41,18 +37,8 @@
4137
"@std/testing": "jsr:@std/testing@^1.0.14"
4238
},
4339
"publish": {
44-
"include": [
45-
"LICENSE",
46-
"README.md",
47-
"deno.json",
48-
"src/**/*.ts"
49-
]
40+
"include": ["LICENSE", "README.md", "deno.json", "src/**/*.ts"]
5041
},
5142
"lock": false,
52-
"exclude": [
53-
"./coverage",
54-
"./dist/browser",
55-
"./dist/esm",
56-
"./npm"
57-
]
43+
"exclude": ["./coverage", "./dist/browser", "./dist/esm", "./npm"]
5844
}

docs/api.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
- [Pool](#pool)
66
- [`pool = new FixedThreadPool(numberOfThreads, fileURL, opts)`](#pool--new-fixedthreadpoolnumberofthreads-fileurl-opts)
77
- [`pool = new DynamicThreadPool(min, max, fileURL, opts)`](#pool--new-dynamicthreadpoolmin-max-fileurl-opts)
8-
- [`pool.execute(data, name, transferList)`](#poolexecutedata-name-transferlist)
9-
- [`pool.mapExecute(data, name, transferList)`](#poolmapexecutedata-name-transferlist)
8+
- [`pool.execute(data, name, abortSignal, transferList)`](#poolexecutedata-name-abortsignal-transferlist)
9+
- [`pool.mapExecute(data, name, abortSignals, transferList)`](#poolmapexecutedata-name-abortsignals-transferlist)
1010
- [`pool.start()`](#poolstart)
1111
- [`pool.destroy()`](#pooldestroy)
1212
- [`pool.hasTaskFunction(name)`](#poolhastaskfunctionname)
@@ -41,25 +41,28 @@ override it in your worker implementation).\
4141
`fileURL` (mandatory) URL to a file with a worker implementation.\
4242
`opts` (optional) An object with the pool options properties described below.
4343

44-
### `pool.execute(data, name, transferList)`
44+
### `pool.execute(data, name, abortSignal, transferList)`
4545

4646
`data` (optional) An object that you want to pass to your worker task function
4747
implementation.\
4848
`name` (optional) A string with the task function name that you want to execute
4949
on the worker. Default: `'default'`\
50+
`abortSignal` (optional) An abort signal to abort the task function execution.\
5051
`transferList` (optional) An array of transferable objects that you want to
5152
transfer to your [`ThreadWorker`](#class-yourworker-extends-threadworker)
5253
implementation.
5354

5455
This method is available on both pool implementations and returns a promise with
5556
the task function execution response.
5657

57-
### `pool.mapExecute(data, name, transferList)`
58+
### `pool.mapExecute(data, name, abortSignals, transferList)`
5859

59-
`data` Iterable objects that you want to pass to your worker task function
60+
`data` An iterable of objects that you want to pass to your worker task function
6061
implementation.\
6162
`name` (optional) A string with the task function name that you want to execute
6263
on the worker. Default: `'default'`\
64+
`abortSignals` (optional) An iterable of AbortSignal to abort the matching
65+
object task function execution.\
6366
`transferList` (optional) An array of transferable objects that you want to
6467
transfer to your [`ThreadWorker`](#class-yourworker-extends-threadworker))
6568
worker implementation.
@@ -129,7 +132,6 @@ An object with these properties:
129132

130133
- `workerChoiceStrategy` (optional) - The default worker choice strategy to use
131134
in this pool:
132-
133135
- `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in a round
134136
robin fashion
135137
- `WorkerChoiceStrategies.LEAST_USED`: Submit tasks to the worker with the
@@ -159,7 +161,6 @@ An object with these properties:
159161
- `workerChoiceStrategyOptions` (optional) - The worker choice strategy options
160162
object to use in this pool.\
161163
Properties:
162-
163164
- `measurement` (optional) - The measurement to use in worker choice
164165
strategies: `runTime`, `waitTime` <!-- or `elu`. -->
165166
- `runTime` (optional) - Use the tasks
@@ -195,7 +196,6 @@ An object with these properties:
195196
- `tasksQueueOptions` (optional) - The worker tasks queue options object to use
196197
in this pool.\
197198
Properties:
198-
199199
- `size` (optional) - The maximum number of tasks that can be queued on a
200200
worker before flagging it as back pressured. It must be a positive integer.
201201
- `concurrency` (optional) - The maximum number of tasks that can be executed

src/mod.ts

Lines changed: 39 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1+
export type { CircularBuffer } from './circular-buffer.ts'
12
export type { AbstractPool } from './pools/abstract-pool.ts'
2-
export { PoolEvents, PoolTypes } from './pools/pool.ts'
33
export type {
44
IPool,
55
PoolEvent,
@@ -8,7 +8,25 @@ export type {
88
PoolType,
99
TasksQueueOptions,
1010
} from './pools/pool.ts'
11-
export { WorkerTypes } from './pools/worker.ts'
11+
export { PoolEvents, PoolTypes } from './pools/pool.ts'
12+
export type {
13+
IWorkerChoiceStrategy,
14+
Measurement,
15+
MeasurementOptions,
16+
MeasurementStatisticsRequirements,
17+
StrategyPolicy,
18+
TaskStatisticsRequirements,
19+
WorkerChoiceStrategy,
20+
WorkerChoiceStrategyOptions,
21+
} from './pools/selection-strategies/selection-strategies-types.ts'
22+
export {
23+
Measurements,
24+
WorkerChoiceStrategies,
25+
} from './pools/selection-strategies/selection-strategies-types.ts'
26+
export type { WorkerChoiceStrategiesContext } from './pools/selection-strategies/worker-choice-strategies-context.ts'
27+
export { DynamicThreadPool } from './pools/thread/dynamic.ts'
28+
export type { ThreadPoolOptions } from './pools/thread/fixed.ts'
29+
export { FixedThreadPool } from './pools/thread/fixed.ts'
1230
export type {
1331
ErrorEventHandler,
1432
EventLoopUtilizationMeasurementStatistics,
@@ -25,40 +43,9 @@ export type {
2543
WorkerType,
2644
WorkerUsage,
2745
} from './pools/worker.ts'
28-
export {
29-
Measurements,
30-
WorkerChoiceStrategies,
31-
} from './pools/selection-strategies/selection-strategies-types.ts'
32-
export type {
33-
IWorkerChoiceStrategy,
34-
Measurement,
35-
MeasurementOptions,
36-
MeasurementStatisticsRequirements,
37-
StrategyPolicy,
38-
TaskStatisticsRequirements,
39-
WorkerChoiceStrategy,
40-
WorkerChoiceStrategyOptions,
41-
} from './pools/selection-strategies/selection-strategies-types.ts'
42-
export type { WorkerChoiceStrategiesContext } from './pools/selection-strategies/worker-choice-strategies-context.ts'
43-
export { DynamicThreadPool } from './pools/thread/dynamic.ts'
44-
export { FixedThreadPool } from './pools/thread/fixed.ts'
45-
export type { ThreadPoolOptions } from './pools/thread/fixed.ts'
46-
export type { AbstractWorker } from './worker/abstract-worker.ts'
47-
export { ThreadWorker } from './worker/thread-worker.ts'
48-
export { KillBehaviors } from './worker/worker-options.ts'
49-
export type {
50-
KillBehavior,
51-
KillHandler,
52-
WorkerOptions,
53-
} from './worker/worker-options.ts'
54-
export type {
55-
TaskAsyncFunction,
56-
TaskFunction,
57-
TaskFunctionObject,
58-
TaskFunctionOperationResult,
59-
TaskFunctions,
60-
TaskSyncFunction,
61-
} from './worker/task-functions.ts'
46+
export { WorkerTypes } from './pools/worker.ts'
47+
export type { PriorityQueue } from './queues/priority-queue.ts'
48+
export type { FixedQueueNode, IFixedQueue } from './queues/queue-types.ts'
6249
export type {
6350
MessageValue,
6451
PromiseResponseWrapper,
@@ -69,7 +56,20 @@ export type {
6956
WorkerStatistics,
7057
Writable,
7158
} from './utility-types.ts'
72-
export type { CircularBuffer } from './circular-buffer.ts'
73-
export type { PriorityQueue } from './queues/priority-queue.ts'
74-
export type { FixedQueueNode, IFixedQueue } from './queues/queue-types.ts'
7559
export { availableParallelism } from './utils.ts'
60+
export type { AbstractWorker } from './worker/abstract-worker.ts'
61+
export type {
62+
TaskAsyncFunction,
63+
TaskFunction,
64+
TaskFunctionObject,
65+
TaskFunctionOperationResult,
66+
TaskFunctions,
67+
TaskSyncFunction,
68+
} from './worker/task-functions.ts'
69+
export { ThreadWorker } from './worker/thread-worker.ts'
70+
export type {
71+
KillBehavior,
72+
KillHandler,
73+
WorkerOptions,
74+
} from './worker/worker-options.ts'
75+
export { KillBehaviors } from './worker/worker-options.ts'

0 commit comments

Comments
 (0)